-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDDProcess.py
74 lines (68 loc) · 2.5 KB
/
DDProcess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
# Test4-2: merge all the batches into 1 single partition
def merge_parquets(parameters):
fs = pa.hdfs.connect()
batches, pids, hdfs_path = parameters
for pid in pids:
parquets = []
for batch in range(batches):
path = hdfs_path + str(batch) + '/partition_' + str(pid)+'.parquet'
try:
par = pq.read_table(path)
parquets.append(par)
except:
continue
merged_parquet = pa.concat_tables(parquets)
merge_path = hdfs_path + 'merged/partition_' + str(pid)+'.parquet'
fw = fs.open(merge_path, 'wb')
pq.write_table(merged_parquet, fw)
fw.close()
print('exit merge process')
# Test4-1: write to different batches, finaly merge them
def dump_data(parameters):
batch, pid_data_dict, column_names, hdfs_path = parameters
fs = pa.hdfs.connect()
for pid in list(pid_data_dict.keys()):
path = hdfs_path + str(batch) + '/partition_' + str(pid)+'.parquet'
pdf = pd.DataFrame(pid_data_dict[pid], columns=column_names)
adf = pa.Table.from_pandas(pdf)
fw = fs.open(path, 'wb')
pq.write_table(adf, fw)
fw.close()
print('exit dumping process')
# Test3: Using pyarrow to append new dataframe to parquet, which is not implemented in arrow
# def dump_data(parameters):
# pid_data_dict, column_names, hdfs_path = parameters
# fs = pa.hdfs.connect('localhost',9000)
# for pid in list(pid_data_dict.keys()):
# path = hdfs_path + 'partition_' + str(pid)+'.parquet'
# pdf = pd.DataFrame(pid_data_dict[pid], columns=column_names)
# adf = pa.Table.from_pandas(pdf)
# fw = None
# try:
# fw = fs.open(path, 'ab') # this is not implemented
# except:
# fw = fs.open(path, 'wb')
# pq.write_table(adf, fw)
# fw.close()
# print('exit dumping process')
# Test2: create spark context, Cannot run multiple spark context at once
# import findspark
# findspark.init() # this must be executed before the below import
# from pyspark import SparkContext, SparkConf
# from pyspark.sql import SQLContext
# from pyspark.sql import SparkSession
# from pyspark import SparkFiles
# Test1: pass Spark context, which is not allowed
# def dump_data(parameters):
# spark, pid_data_dict, column_names, hdfs_path = parameters
# #fs = pa.hdfs.connect('localhost',9000)
# for pid in list(pid_data_dict.keys()):
# path = hdfs_path + 'partition_' + str(pid)+'.parquet'
# pdf = pd.DataFrame(pid_data_dict[pid], columns=column_names)
# df = spark.createDataFrame(pdf)
# df.write.mode('append').parquet(path)
# print('exit dumping process')