-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathingest.py
162 lines (127 loc) · 5.07 KB
/
ingest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import os
import gzip
import shutil
import logging
import os.path
from urllib.request import urlopen
import zipfile
import datetime
import tempfile
import csv
from google.cloud import storage
from google.cloud.storage import Blob
from google.cloud import bigquery
import json
from urllib.parse import urlencode, urlunparse
import sys
"""
Retrieve and construct a BigQuery confiration file.
Return: A configured LoadJobConfig object for data loading.
"""
def get_bq_load_config():
with open('config/job.json') as config_file:
config = json.load(config_file)
load_config = bigquery.LoadJobConfig()
load_config.source_format = config['source_format']
load_config.write_disposition = config['write_disposition']
load_config.ignore_unknown_values = config['ignore_unknown_values']
load_config.time_partitioning = bigquery.table.TimePartitioning(config['partitioning_type'], config['partitioning_field'])
load_config.skip_leading_rows = config['skip_leading_rows']
load_config.schema = [
bigquery.SchemaField(field['name'], field['type']) for field in config['schema']
]
return load_config
def bqload(gcsfile, symbol):
"""
Loads the CSV file in GCS into BigQuery, replacing the existing data in that partition
"""
load_config = get_bq_load_config()
client = bigquery.Client.from_service_account_json("config/account.json")
#client = bigquery.Client()
table_ref = client.dataset(os.environ.get("DATASET")).table('market_data_raw_{}'.format(symbol))
load_job = client.load_table_from_uri(gcsfile, table_ref, job_config=load_config)
load_job.result() # waits for table load to complete
if load_job.state != 'DONE':
raise load_job.exception()
return table_ref, load_job.output_rows
def upload(csvfile, blobname):
"""
Uploads the CSV file into the bucket with the given blobname
"""
bucketname = os.environ.get("BUCKET_NAME")
client = storage.Client()
bucket = client.get_bucket(bucketname)
logging.info(bucket)
logging.debug('Uploading {} ...'.format(csvfile))
blob = Blob(blobname, bucket)
blob.upload_from_filename(csvfile)
gcslocation = 'gs://{}/{}'.format(bucketname, blobname)
logging.info('Uploaded {} ...'.format(gcslocation))
return gcslocation
def get_config(path: str):
with open(path) as config_file:
config = json.load(config_file)
return config
def download(symbol: str, destdir: str):
"""
Downloads the market stock data and returns local filename
"""
logging.info('Requesting data for {}-*'.format(symbol))
config = get_config('config/api.json')
query_params = {
"apikey": config['apikey'],
"datatype": config['datatype'],
"symbol" : symbol
}
url = config['url'] + '&'+urlencode(query_params)
logging.debug("Trying to download {}".format(url))
input_filename = os.path.join(destdir, "input_{}".format(symbol))
output_filename = os.path.join(destdir, "{}".format(symbol))
with open(input_filename, "wb") as fp:
response = urlopen(url)
fp.write(response.read())
# Add column symbol
# Add the "symbol" value to each row
with open(input_filename, "r") as input_file, open(output_filename, "w", newline="") as output_file:
csv_reader = csv.reader(input_file)
csv_writer = csv.writer(output_file)
# Write the modified header
header = next(csv_reader)
header.append("symbol")
csv_writer.writerow(header)
# Write the modified rows
for row in csv_reader:
row.append(symbol)
csv_writer.writerow(row)
logging.debug("{} saved".format(output_filename))
return os.path.join(destdir,output_filename)
def ingest(symbol):
'''
Ingest stock market data from API to Google Cloud Storage
return table, numrows on success.
'''
tempdir = tempfile.mkdtemp(prefix='ingest_market_data')
try:
tempdir = tempfile.mkdtemp(prefix='ingest_market_data')
data_csv = download(symbol, tempdir)
gcsloc = 'data_market/raw/market_data_{}.csv'.format(symbol)
gcsloc = upload(data_csv, gcsloc)
return bqload(gcsloc, symbol)
finally:
logging.debug('Cleaning up by removing {}'.format(tempdir))
shutil.rmtree(tempdir)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Ingest data from API to Google Cloud Storage')
parser.add_argument('--symbol', help='Name of market item', required=True)
parser.add_argument('--debug', dest='debug', action='store_true', help='Specify if you want debug messages')
try:
args = parser.parse_args()
if args.debug:
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG)
else:
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
tableref, numrows = ingest(args.symbol)
#logging.info('Success ... ingested {} rows to {}'.format(numrows, tableref))
except Exception as e:
logging.exception("Try again later?")