From dfe60618a0c9dd3b7be1e41cfd04c1a6eafbf108 Mon Sep 17 00:00:00 2001 From: Ritesh Bansal <riteshobansal@gmail.com> Date: Fri, 6 Dec 2019 12:42:52 -0500 Subject: [PATCH] practice spark --- Practice3.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 Practice3.py diff --git a/Practice3.py b/Practice3.py new file mode 100644 index 0000000..a7be56f --- /dev/null +++ b/Practice3.py @@ -0,0 +1,54 @@ +import os as os +from pyspark.sql import SparkSession +from pyspark.sql import SQLContext +import pandas as pd + +folderpath = "CNN_1hour2level/" +listOfFolder = os.listdir(folderpath) +parquentFilesAddress = [] +for i in listOfFolder: + if not (i.endswith('.DS_Store')): + address = folderpath + i + "/" + listOfFiles = os.listdir(address) + for f_name in listOfFiles: + if f_name.endswith('.parquet'): + addressPar = folderpath + i + "/" + f_name + parquentFilesAddress.append(addressPar) +parquentFilesAddress.sort(); +spark = SparkSession.builder \ + .master("local[*]")\ + .config("spark.executor.memory", "70g")\ + .config("spark.driver.memory", "50g")\ + .config("spark.memory.offHeap.enabled", "true")\ + .config("spark.memory.offHeap.size", "14g")\ + .config("spark.driver.cores", "4")\ + .appName("sampleCodeForReference")\ + .getOrCreate() +spark.conf.set("spark.sql.parquet.enableVectorizedReader","false") + +sc = spark.sparkContext + +# using SQLContext to read parquet file +sqlContext = SQLContext(sc) +archiveData = [] + +# to read parquet file +for addressPar in parquentFilesAddress: + zz_new = [] + dateFiles = sqlContext.read.parquet(addressPar) + print(addressPar) + # print(dateFiles.count()) + for i in range(1,dateFiles.count()+1): + # print(i) + currentData = {} + currentData['timestamp'] = dateFiles.rdd.take(i)[0]['filename'].split('.')[0].split('-')[1] + # currentData['timestamp'] = dateFiles.iloc[i].timestamp + currentData['originalUrl'] = dateFiles.rdd.take(i)[0]['originalUrl'] + # currentData['mime'] = dateFiles.iloc[i].mime + currentData['payload'] = dateFiles.rdd.take(i)[0]['payload'] + zz_new.append(currentData) + print(addressPar+' Processed') + archiveData.append(zz_new) +archiveData = pd.DataFrame(archiveData) +archiveData.to_pickle('cnnNodeData.pkl') +print('Data Processed') -- GitLab