import os as os
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas as pd

folderpath = "CNN_1hour2level/"
listOfFolder = os.listdir(folderpath)
parquentFilesAddress = []
for i in listOfFolder:
    if not (i.endswith('.DS_Store')):
        address = folderpath + i + "/"
        listOfFiles = os.listdir(address)
        for f_name in listOfFiles:
            if f_name.endswith('.parquet'):
                addressPar = folderpath + i + "/" + f_name
                parquentFilesAddress.append(addressPar)
parquentFilesAddress.sort();
spark = SparkSession.builder \
    .master("local[*]")\
    .config("spark.executor.memory", "70g")\
    .config("spark.driver.memory", "50g")\
    .config("spark.memory.offHeap.enabled", "true")\
    .config("spark.memory.offHeap.size", "14g")\
    .config("spark.driver.cores", "4")\
    .appName("sampleCodeForReference")\
    .getOrCreate()
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

sc = spark.sparkContext

# using SQLContext to read parquet file
sqlContext = SQLContext(sc)
archiveData = []

# to read parquet file
for addressPar in parquentFilesAddress:
    zz_new = []
    dateFiles = sqlContext.read.parquet(addressPar)
    print(addressPar)
    # print(dateFiles.count())
    for i in range(1,dateFiles.count()+1):
        # print(i)
        currentData = {}
        currentData['timestamp'] = dateFiles.rdd.take(i)[0]['filename'].split('.')[0].split('-')[1]
        # currentData['timestamp'] = dateFiles.iloc[i].timestamp
        currentData['originalUrl'] = dateFiles.rdd.take(i)[0]['originalUrl']
        # currentData['mime'] = dateFiles.iloc[i].mime
        currentData['payload'] = dateFiles.rdd.take(i)[0]['payload']
        zz_new.append(currentData)
    print(addressPar+' Processed')
    archiveData.append(zz_new)
archiveData = pd.DataFrame(archiveData)
archiveData.to_pickle('cnnNodeData.pkl')
print('Data Processed')