From e67163f2faa55fdd358723abf84f7d55ee3dcd16 Mon Sep 17 00:00:00 2001 From: giangbui Date: Wed, 16 Dec 2020 14:51:30 -0600 Subject: [PATCH] Feat/mobility (#160) * feat(mobility): parse csv file * chore(code): refactor * chore(code): remove unused dependencies * feat(last_submit): only submit the new data * fix(typos): fix typos * chore(retries): use limited retry numbers * chore(README): update doc --- README.md | 2 + covid19-etl/etl/com_mobility.py | 270 +++++++++++++++++++++++++++ covid19-etl/etl/ncbi.py | 16 +- covid19-etl/utils/metadata_helper.py | 36 ++++ 4 files changed, 318 insertions(+), 6 deletions(-) create mode 100644 covid19-etl/etl/com_mobility.py diff --git a/README.md b/README.md index 94dfe73d..612c3e00 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ | [COV-521][cov-521] | ATLAS | [here][atlas] | One-time | | [COV-465][cov-465] | NCBI-METADATA | [bucket](https://github.com/uc-cdis/covid19-tools#ncbi) | scheduled| | [COV-482][cov-482] | NCBI-MANIFEST | [bucket](https://github.com/uc-cdis/covid19-tools#ncbi) | scheduled| +| [COV-532][cov-532] | COM-MOBILITY | [bucket](https://github.com/uc-cdis/covid19-tools#com_mobility) | scheduled| ## Deployment @@ -181,3 +182,4 @@ covid19-tools [cov-465]: https://occ-data.atlassian.net/browse/COV-465 [cov-482]: https://occ-data.atlassian.net/browse/COV-482 [cov-454]: https://occ-data.atlassian.net/browse/COV-454 +[cov-532]: https://occ-data.atlassian.net/browse/COV-532 diff --git a/covid19-etl/etl/com_mobility.py b/covid19-etl/etl/com_mobility.py new file mode 100644 index 00000000..5714fb0a --- /dev/null +++ b/covid19-etl/etl/com_mobility.py @@ -0,0 +1,270 @@ +import requests +import csv +import re +from contextlib import closing +from datetime import datetime +from dateutil.parser import parse + +from etl import base +from utils.metadata_helper import MetadataHelper + + +def convert_to_int(s): + try: + return int(s) + except Exception: + return None + + +def convert_to_list(s): + if type(s) == list: + return s + return [s] + + +def get_enum_value(l, default, s): + if s in l: + return s + return default + + +def convert_datetime_to_str(dt): + if type(dt) != datetime: + return None + return dt.strftime("%Y-%m-%d") + + +def process_county_function(county): + return county.replace(" County", "").replace(" county", "").strip() + + +def identity_function(s): + return s + + +# The files need to be handled so that they are compatible +# to gen3 fields +SPECIAL_MAP_FIELDS = { + "country_region_code": ("iso2", identity_function), + "country_region": ("country_region", identity_function), + "metro_area": ("metro_area", identity_function), + "iso_3166_2_code": ("iso_3166_2", identity_function), + "sub_region_1": ("province_state", process_county_function), + "sub_region_2": ("county", process_county_function), + "census_fips_code": ("FIPS", convert_to_int), + "date": ("report_date", identity_function), + "retail_and_recreation_percent_change_from_baseline": ( + "retail_and_recreation_percent_change_from_baseline", + convert_to_int, + ), + "grocery_and_pharmacy_percent_change_from_baseline": ( + "grocery_and_pharmacy_percent_change_from_baseline", + convert_to_int, + ), + "parks_percent_change_from_baseline": ( + "parks_percent_change_from_baseline", + convert_to_int, + ), + "transit_stations_percent_change_from_baseline": ( + "transit_stations_percent_change_from_baseline", + convert_to_int, + ), + "workplaces_percent_change_from_baseline": ( + "workplaces_percent_change_from_baseline", + convert_to_int, + ), + "residential_percent_change_from_baseline": ( + "residential_percent_change_from_baseline", + convert_to_int, + ), +} + + +def format_submitter_id(node_name, *argv): + """Format submitter id""" + submitter_id = node_name + for v in argv: + submitter_id = submitter_id + f"_{v}" + submitter_id = submitter_id.lower().replace(", ", "_") + submitter_id = re.sub("[^a-z0-9-_]+", "-", submitter_id) + return submitter_id.strip("-") + + +class COM_MOBILITY(base.BaseETL): + def __init__(self, base_url, access_token, s3_bucket): + super().__init__(base_url, access_token, s3_bucket) + + self.program_name = "open" + self.project_code = "Com-Mobility" + + self.metadata_helper = MetadataHelper( + base_url=self.base_url, + program_name=self.program_name, + project_code=self.project_code, + access_token=access_token, + ) + + self.expected_file_headers = [ + "country_region_code", + "country_region", + "sub_region_1", + "sub_region_2", + "metro_area", + "iso_3166_2_code", + "census_fips_code", + "date", + "retail_and_recreation_percent_change_from_baseline", + "grocery_and_pharmacy_percent_change_from_baseline", + "parks_percent_change_from_baseline", + "transit_stations_percent_change_from_baseline", + "workplaces_percent_change_from_baseline", + "residential_percent_change_from_baseline", + ] + + self.summary_locations = [] + self.summary_socio_demographics = [] + + def files_to_submissions(self): + """ + Reads CSV files and converts the data to Sheepdog records + """ + url = "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv" + self.parse_file(url) + + def parse_file(self, url): + """ + Converts a CSV file to data we can submit via Sheepdog. Stores the + records to submit in `self.location_data` and `self.time_series_data`. + Ignores any records that are already in Sheepdog (relies on unique + `submitter_id` to check) + + Args: + url (str): URL at which the CSV file is available + """ + + self.last_submission_date_time = self.metadata_helper.get_last_submission() + the_lattest_data_datetime = None + + print("Getting data from {}".format(url)) + + with closing(requests.get(url, stream=True)) as r: + f = (line.decode("utf-8") for line in r.iter_lines()) + reader = csv.reader(f, delimiter=",", quotechar='"') + + headers = next(reader) + + assert ( + headers[0] != "404: Not Found" + ), "Unable to get file contents, received {}.".format(headers) + + assert set(self.expected_file_headers).issubset( + set(headers) + ), "CSV headers have changed (expected {} is a subset of {}). We may need to update the ETL code".format( + self.expected_file_headers, headers + ) + + for row in reader: + # ignore any empty row + if not row: + continue + + row_dict = dict(zip(headers, row)) + if row_dict["country_region_code"] != "US": + continue + + if ( + not self.last_submission_date_time + or parse(row_dict["date"]) > self.last_submission_date_time + ): + if ( + the_lattest_data_datetime is None + or the_lattest_data_datetime < parse(row_dict["date"]) + ): + the_lattest_data_datetime = parse(row_dict["date"]) + + summary_location = {} + summary_socio_demographic = {} + + summary_location_submitter_id = format_submitter_id( + "summary_location", + row_dict["country_region_code"], + row_dict["sub_region_1"].replace("county", "").strip(), + row_dict["sub_region_2"].replace("county", "").strip(), + row_dict["metro_area"], + row_dict["date"], + ) + + summary_socio_demographic_submitter_id = format_submitter_id( + "summary_socio_demographic", + row_dict["country_region_code"], + row_dict["sub_region_1"], + row_dict["sub_region_2"], + row_dict["metro_area"], + row_dict["date"], + ) + + summary_location = { + "submitter_id": summary_location_submitter_id, + "projects": [{"code": self.project_code}], + } + + summary_socio_demographic = { + "submitter_id": summary_location_submitter_id, + "summary_locations": [ + {"submitter_id": summary_location_submitter_id} + ], + } + + for field in [ + "country_region_code", + "country_region", + "sub_region_1", + "sub_region_2", + "metro_area", + "iso_3166_2_code", + "census_fips_code", + ]: + gen3_field, func = SPECIAL_MAP_FIELDS[field] + summary_location[gen3_field] = func(row_dict[field]) + + for field in [ + "retail_and_recreation_percent_change_from_baseline", + "grocery_and_pharmacy_percent_change_from_baseline", + "parks_percent_change_from_baseline", + "transit_stations_percent_change_from_baseline", + "workplaces_percent_change_from_baseline", + "residential_percent_change_from_baseline", + "date", + ]: + gen3_field, func = SPECIAL_MAP_FIELDS[field] + summary_socio_demographic[gen3_field] = func(row_dict[field]) + + self.summary_locations.append(summary_location) + self.summary_socio_demographics.append(summary_socio_demographic) + self.last_submission_date_time = the_lattest_data_datetime + + def submit_metadata(self): + """ + Converts the data in `self.time_series_data` to Sheepdog records. + `self.location_data already contains Sheepdog records. Batch submits + all records in `self.location_data` and `self.time_series_data` + """ + + # Commented + # Only required for one time submission of summary_location + print("Submitting summary_location data") + for loc in self.summary_locations: + loc_record = {"type": "summary_location"} + loc_record.update(loc) + self.metadata_helper.add_record_to_submit(loc_record) + self.metadata_helper.batch_submit_records() + + print("Submitting summary_socio_demographic data") + for sc in self.summary_socio_demographics: + sc_record = {"type": "summary_socio_demographic"} + sc_record.update(sc) + self.metadata_helper.add_record_to_submit(sc_record) + self.metadata_helper.batch_submit_records() + self.metadata_helper.update_last_submission( + self.last_submission_date_time.strftime("%Y-%m-%d") + ) diff --git a/covid19-etl/etl/ncbi.py b/covid19-etl/etl/ncbi.py index dd5094e7..1bf54fe7 100644 --- a/covid19-etl/etl/ncbi.py +++ b/covid19-etl/etl/ncbi.py @@ -24,6 +24,8 @@ FILE_EXTENSION_MAPPING = {"fq": "fastq", "fa": "fasta"} +MAX_RETRIES = 3 + def get_file_extension(filename): """get file extension from the filename""" @@ -303,12 +305,13 @@ async def files_to_virus_sequence_run_taxonomy_submission( assert ( did ), f"file {filename} does not exist in the index, rerun NCBI_FILE ETL" - trying = True - while trying: + + tries = 0 + while tries < MAX_RETRIES: try: await self.file_helper.async_update_authz(did=did, rev=rev) - trying = False except Exception as e: + tries += 1 print(f"Can not update indexd for {did}. Detail {e}. Retrying...") submitted_json["file_size"] = filesize @@ -436,12 +439,13 @@ async def files_to_node_submissions(self, node_name): assert ( did ), f"file {filename} does not exist in the index, rerun NCBI_FILE ETL" - retrying = True - while retrying: + + tries = 0 + while tries < MAX_RETRIES: try: await self.file_helper.async_update_authz(did=did, rev=rev) - retrying = False except Exception as e: + tries += 1 print( f"ERROR: Fail to update indexd for {filename}. Detail {e}. Retrying ..." ) diff --git a/covid19-etl/utils/metadata_helper.py b/covid19-etl/utils/metadata_helper.py index cd649053..7bfc9496 100644 --- a/covid19-etl/utils/metadata_helper.py +++ b/covid19-etl/utils/metadata_helper.py @@ -3,6 +3,7 @@ import json from math import ceil from time import sleep +from dateutil.parser import parse import requests @@ -223,3 +224,38 @@ def download_from_guppy(self, _type, fields=None, filter=None): except: print(f"Guppy did not return JSON: {response.text}") raise + + def get_last_submission(self): + query_string = ( + '{ project (first: 0, dbgap_accession_number: "' + + self.project_code + + '") { last_submission_identifier } }' + ) + try: + response = self.query_peregrine(query_string) + if response["data"]["project"][0]["last_submission_identifier"] is None: + return None + return parse(response["data"]["project"][0]["last_submission_identifier"]) + except Exception as ex: + print( + f"Unable to query peregrine for last_submission_identifier. Detail {ex}" + ) + raise + + def update_last_submission(self, last_submission_date_time): + headers = {"content-type": "application/json"} + headers["Authorization"] = self.headers["Authorization"] + record = { + "code": self.project_code, + "dbgap_accession_number": self.project_code, + "last_submission_identifier": last_submission_date_time, + } + try: + res = requests.put( + "{}/api/v0/submission/{}".format(self.base_url, self.program_name), + headers=headers, + data=json.dumps(record), + ) + except Exception as ex: + print(f"Unable to update last_submission_identifier. Detail {ex}") + raise