import os as os from pyspark.sql import SparkSession from pyspark.sql import SQLContext import pandas as pd folderpath = "CNN_1hour2level/" listOfFolder = os.listdir(folderpath) parquentFilesAddress = [] for i in listOfFolder: if not (i.endswith('.DS_Store')): address = folderpath + i + "/" listOfFiles = os.listdir(address) for f_name in listOfFiles: if f_name.endswith('.parquet'): addressPar = folderpath + i + "/" + f_name parquentFilesAddress.append(addressPar) parquentFilesAddress.sort(); spark = SparkSession.builder \ .master("local[*]")\ .config("spark.executor.memory", "70g")\ .config("spark.driver.memory", "50g")\ .config("spark.memory.offHeap.enabled", "true")\ .config("spark.memory.offHeap.size", "14g")\ .config("spark.driver.cores", "4")\ .appName("sampleCodeForReference")\ .getOrCreate() spark.conf.set("spark.sql.parquet.enableVectorizedReader","false") sc = spark.sparkContext # using SQLContext to read parquet file sqlContext = SQLContext(sc) archiveData = [] # to read parquet file for addressPar in parquentFilesAddress: zz_new = [] dateFiles = sqlContext.read.parquet(addressPar) print(addressPar) # print(dateFiles.count()) for i in range(1,dateFiles.count()+1): # print(i) currentData = {} currentData['timestamp'] = dateFiles.rdd.take(i)[0]['filename'].split('.')[0].split('-')[1] # currentData['timestamp'] = dateFiles.iloc[i].timestamp currentData['originalUrl'] = dateFiles.rdd.take(i)[0]['originalUrl'] # currentData['mime'] = dateFiles.iloc[i].mime currentData['payload'] = dateFiles.rdd.take(i)[0]['payload'] zz_new.append(currentData) print(addressPar+' Processed') archiveData.append(zz_new) archiveData = pd.DataFrame(archiveData) archiveData.to_pickle('cnnNodeData.pkl') print('Data Processed')