Skip to content

Commit

Permalink
Feat/mobility (#160)
Browse files Browse the repository at this point in the history
* feat(mobility): parse csv file

* chore(code): refactor

* chore(code): remove unused dependencies

* feat(last_submit): only submit the new data

* fix(typos): fix typos

* chore(retries): use limited retry numbers

* chore(README): update doc
  • Loading branch information
giangbui authored Dec 16, 2020
1 parent ddceafd commit e67163f
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
| [COV-521][cov-521] | ATLAS | [here][atlas] | One-time |
| [COV-465][cov-465] | NCBI-METADATA | [bucket](https://github.com/uc-cdis/covid19-tools#ncbi) | scheduled|
| [COV-482][cov-482] | NCBI-MANIFEST | [bucket](https://github.com/uc-cdis/covid19-tools#ncbi) | scheduled|
| [COV-532][cov-532] | COM-MOBILITY | [bucket](https://github.com/uc-cdis/covid19-tools#com_mobility) | scheduled|


## Deployment
Expand Down Expand Up @@ -181,3 +182,4 @@ covid19-tools
[cov-465]: https://occ-data.atlassian.net/browse/COV-465
[cov-482]: https://occ-data.atlassian.net/browse/COV-482
[cov-454]: https://occ-data.atlassian.net/browse/COV-454
[cov-532]: https://occ-data.atlassian.net/browse/COV-532
270 changes: 270 additions & 0 deletions covid19-etl/etl/com_mobility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
import requests
import csv
import re
from contextlib import closing
from datetime import datetime
from dateutil.parser import parse

from etl import base
from utils.metadata_helper import MetadataHelper


def convert_to_int(s):
try:
return int(s)
except Exception:
return None


def convert_to_list(s):
if type(s) == list:
return s
return [s]


def get_enum_value(l, default, s):
if s in l:
return s
return default


def convert_datetime_to_str(dt):
if type(dt) != datetime:
return None
return dt.strftime("%Y-%m-%d")


def process_county_function(county):
return county.replace(" County", "").replace(" county", "").strip()


def identity_function(s):
return s


# The files need to be handled so that they are compatible
# to gen3 fields
SPECIAL_MAP_FIELDS = {
"country_region_code": ("iso2", identity_function),
"country_region": ("country_region", identity_function),
"metro_area": ("metro_area", identity_function),
"iso_3166_2_code": ("iso_3166_2", identity_function),
"sub_region_1": ("province_state", process_county_function),
"sub_region_2": ("county", process_county_function),
"census_fips_code": ("FIPS", convert_to_int),
"date": ("report_date", identity_function),
"retail_and_recreation_percent_change_from_baseline": (
"retail_and_recreation_percent_change_from_baseline",
convert_to_int,
),
"grocery_and_pharmacy_percent_change_from_baseline": (
"grocery_and_pharmacy_percent_change_from_baseline",
convert_to_int,
),
"parks_percent_change_from_baseline": (
"parks_percent_change_from_baseline",
convert_to_int,
),
"transit_stations_percent_change_from_baseline": (
"transit_stations_percent_change_from_baseline",
convert_to_int,
),
"workplaces_percent_change_from_baseline": (
"workplaces_percent_change_from_baseline",
convert_to_int,
),
"residential_percent_change_from_baseline": (
"residential_percent_change_from_baseline",
convert_to_int,
),
}


def format_submitter_id(node_name, *argv):
"""Format submitter id"""
submitter_id = node_name
for v in argv:
submitter_id = submitter_id + f"_{v}"
submitter_id = submitter_id.lower().replace(", ", "_")
submitter_id = re.sub("[^a-z0-9-_]+", "-", submitter_id)
return submitter_id.strip("-")


class COM_MOBILITY(base.BaseETL):
def __init__(self, base_url, access_token, s3_bucket):
super().__init__(base_url, access_token, s3_bucket)

self.program_name = "open"
self.project_code = "Com-Mobility"

self.metadata_helper = MetadataHelper(
base_url=self.base_url,
program_name=self.program_name,
project_code=self.project_code,
access_token=access_token,
)

self.expected_file_headers = [
"country_region_code",
"country_region",
"sub_region_1",
"sub_region_2",
"metro_area",
"iso_3166_2_code",
"census_fips_code",
"date",
"retail_and_recreation_percent_change_from_baseline",
"grocery_and_pharmacy_percent_change_from_baseline",
"parks_percent_change_from_baseline",
"transit_stations_percent_change_from_baseline",
"workplaces_percent_change_from_baseline",
"residential_percent_change_from_baseline",
]

self.summary_locations = []
self.summary_socio_demographics = []

def files_to_submissions(self):
"""
Reads CSV files and converts the data to Sheepdog records
"""
url = "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv"
self.parse_file(url)

def parse_file(self, url):
"""
Converts a CSV file to data we can submit via Sheepdog. Stores the
records to submit in `self.location_data` and `self.time_series_data`.
Ignores any records that are already in Sheepdog (relies on unique
`submitter_id` to check)
Args:
url (str): URL at which the CSV file is available
"""

self.last_submission_date_time = self.metadata_helper.get_last_submission()
the_lattest_data_datetime = None

print("Getting data from {}".format(url))

with closing(requests.get(url, stream=True)) as r:
f = (line.decode("utf-8") for line in r.iter_lines())
reader = csv.reader(f, delimiter=",", quotechar='"')

headers = next(reader)

assert (
headers[0] != "404: Not Found"
), "Unable to get file contents, received {}.".format(headers)

assert set(self.expected_file_headers).issubset(
set(headers)
), "CSV headers have changed (expected {} is a subset of {}). We may need to update the ETL code".format(
self.expected_file_headers, headers
)

for row in reader:
# ignore any empty row
if not row:
continue

row_dict = dict(zip(headers, row))
if row_dict["country_region_code"] != "US":
continue

if (
not self.last_submission_date_time
or parse(row_dict["date"]) > self.last_submission_date_time
):
if (
the_lattest_data_datetime is None
or the_lattest_data_datetime < parse(row_dict["date"])
):
the_lattest_data_datetime = parse(row_dict["date"])

summary_location = {}
summary_socio_demographic = {}

summary_location_submitter_id = format_submitter_id(
"summary_location",
row_dict["country_region_code"],
row_dict["sub_region_1"].replace("county", "").strip(),
row_dict["sub_region_2"].replace("county", "").strip(),
row_dict["metro_area"],
row_dict["date"],
)

summary_socio_demographic_submitter_id = format_submitter_id(
"summary_socio_demographic",
row_dict["country_region_code"],
row_dict["sub_region_1"],
row_dict["sub_region_2"],
row_dict["metro_area"],
row_dict["date"],
)

summary_location = {
"submitter_id": summary_location_submitter_id,
"projects": [{"code": self.project_code}],
}

summary_socio_demographic = {
"submitter_id": summary_location_submitter_id,
"summary_locations": [
{"submitter_id": summary_location_submitter_id}
],
}

for field in [
"country_region_code",
"country_region",
"sub_region_1",
"sub_region_2",
"metro_area",
"iso_3166_2_code",
"census_fips_code",
]:
gen3_field, func = SPECIAL_MAP_FIELDS[field]
summary_location[gen3_field] = func(row_dict[field])

for field in [
"retail_and_recreation_percent_change_from_baseline",
"grocery_and_pharmacy_percent_change_from_baseline",
"parks_percent_change_from_baseline",
"transit_stations_percent_change_from_baseline",
"workplaces_percent_change_from_baseline",
"residential_percent_change_from_baseline",
"date",
]:
gen3_field, func = SPECIAL_MAP_FIELDS[field]
summary_socio_demographic[gen3_field] = func(row_dict[field])

self.summary_locations.append(summary_location)
self.summary_socio_demographics.append(summary_socio_demographic)
self.last_submission_date_time = the_lattest_data_datetime

def submit_metadata(self):
"""
Converts the data in `self.time_series_data` to Sheepdog records.
`self.location_data already contains Sheepdog records. Batch submits
all records in `self.location_data` and `self.time_series_data`
"""

# Commented
# Only required for one time submission of summary_location
print("Submitting summary_location data")
for loc in self.summary_locations:
loc_record = {"type": "summary_location"}
loc_record.update(loc)
self.metadata_helper.add_record_to_submit(loc_record)
self.metadata_helper.batch_submit_records()

print("Submitting summary_socio_demographic data")
for sc in self.summary_socio_demographics:
sc_record = {"type": "summary_socio_demographic"}
sc_record.update(sc)
self.metadata_helper.add_record_to_submit(sc_record)
self.metadata_helper.batch_submit_records()
self.metadata_helper.update_last_submission(
self.last_submission_date_time.strftime("%Y-%m-%d")
)
16 changes: 10 additions & 6 deletions covid19-etl/etl/ncbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

FILE_EXTENSION_MAPPING = {"fq": "fastq", "fa": "fasta"}

MAX_RETRIES = 3


def get_file_extension(filename):
"""get file extension from the filename"""
Expand Down Expand Up @@ -303,12 +305,13 @@ async def files_to_virus_sequence_run_taxonomy_submission(
assert (
did
), f"file {filename} does not exist in the index, rerun NCBI_FILE ETL"
trying = True
while trying:

tries = 0
while tries < MAX_RETRIES:
try:
await self.file_helper.async_update_authz(did=did, rev=rev)
trying = False
except Exception as e:
tries += 1
print(f"Can not update indexd for {did}. Detail {e}. Retrying...")

submitted_json["file_size"] = filesize
Expand Down Expand Up @@ -436,12 +439,13 @@ async def files_to_node_submissions(self, node_name):
assert (
did
), f"file {filename} does not exist in the index, rerun NCBI_FILE ETL"
retrying = True
while retrying:

tries = 0
while tries < MAX_RETRIES:
try:
await self.file_helper.async_update_authz(did=did, rev=rev)
retrying = False
except Exception as e:
tries += 1
print(
f"ERROR: Fail to update indexd for {filename}. Detail {e}. Retrying ..."
)
Expand Down
36 changes: 36 additions & 0 deletions covid19-etl/utils/metadata_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
from math import ceil
from time import sleep
from dateutil.parser import parse

import requests

Expand Down Expand Up @@ -223,3 +224,38 @@ def download_from_guppy(self, _type, fields=None, filter=None):
except:
print(f"Guppy did not return JSON: {response.text}")
raise

def get_last_submission(self):
query_string = (
'{ project (first: 0, dbgap_accession_number: "'
+ self.project_code
+ '") { last_submission_identifier } }'
)
try:
response = self.query_peregrine(query_string)
if response["data"]["project"][0]["last_submission_identifier"] is None:
return None
return parse(response["data"]["project"][0]["last_submission_identifier"])
except Exception as ex:
print(
f"Unable to query peregrine for last_submission_identifier. Detail {ex}"
)
raise

def update_last_submission(self, last_submission_date_time):
headers = {"content-type": "application/json"}
headers["Authorization"] = self.headers["Authorization"]
record = {
"code": self.project_code,
"dbgap_accession_number": self.project_code,
"last_submission_identifier": last_submission_date_time,
}
try:
res = requests.put(
"{}/api/v0/submission/{}".format(self.base_url, self.program_name),
headers=headers,
data=json.dumps(record),
)
except Exception as ex:
print(f"Unable to update last_submission_identifier. Detail {ex}")
raise

0 comments on commit e67163f

Please sign in to comment.