Skip to content
Snippets Groups Projects
Commit dfe60618 authored by Ritesh Bansal's avatar Ritesh Bansal
Browse files

practice spark

parent 32cda435
No related branches found
No related tags found
No related merge requests found
import os as os
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas as pd
folderpath = "CNN_1hour2level/"
listOfFolder = os.listdir(folderpath)
parquentFilesAddress = []
for i in listOfFolder:
if not (i.endswith('.DS_Store')):
address = folderpath + i + "/"
listOfFiles = os.listdir(address)
for f_name in listOfFiles:
if f_name.endswith('.parquet'):
addressPar = folderpath + i + "/" + f_name
parquentFilesAddress.append(addressPar)
parquentFilesAddress.sort();
spark = SparkSession.builder \
.master("local[*]")\
.config("spark.executor.memory", "70g")\
.config("spark.driver.memory", "50g")\
.config("spark.memory.offHeap.enabled", "true")\
.config("spark.memory.offHeap.size", "14g")\
.config("spark.driver.cores", "4")\
.appName("sampleCodeForReference")\
.getOrCreate()
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
sc = spark.sparkContext
# using SQLContext to read parquet file
sqlContext = SQLContext(sc)
archiveData = []
# to read parquet file
for addressPar in parquentFilesAddress:
zz_new = []
dateFiles = sqlContext.read.parquet(addressPar)
print(addressPar)
# print(dateFiles.count())
for i in range(1,dateFiles.count()+1):
# print(i)
currentData = {}
currentData['timestamp'] = dateFiles.rdd.take(i)[0]['filename'].split('.')[0].split('-')[1]
# currentData['timestamp'] = dateFiles.iloc[i].timestamp
currentData['originalUrl'] = dateFiles.rdd.take(i)[0]['originalUrl']
# currentData['mime'] = dateFiles.iloc[i].mime
currentData['payload'] = dateFiles.rdd.take(i)[0]['payload']
zz_new.append(currentData)
print(addressPar+' Processed')
archiveData.append(zz_new)
archiveData = pd.DataFrame(archiveData)
archiveData.to_pickle('cnnNodeData.pkl')
print('Data Processed')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment