diff --git a/covid19-etl/etl/ds4c.py b/covid19-etl/etl/ds4c.py index 7bc82c88..7398edfc 100644 --- a/covid19-etl/etl/ds4c.py +++ b/covid19-etl/etl/ds4c.py @@ -10,7 +10,7 @@ def harmonize_gender(gender): - {"male": "Male", "female": "Female", "": "Not reported"}[gender] + return {"male": "Male", "female": "Female", "": "Not reported"}[gender] class DS4C(base.BaseETL): diff --git a/covid19-etl/etl/dsci.py b/covid19-etl/etl/dsci.py index cf251dff..722bad98 100644 --- a/covid19-etl/etl/dsci.py +++ b/covid19-etl/etl/dsci.py @@ -11,7 +11,7 @@ def harmonize_gender(gender): - {"male": "Male", "female": "Female", "": "Not reported"}[gender] + return {"male": "Male", "female": "Female", "": "Not reported"}[gender] def format_date(date): diff --git a/covid19-etl/etl/idph.py b/covid19-etl/etl/idph.py index a0e31035..df625b06 100644 --- a/covid19-etl/etl/idph.py +++ b/covid19-etl/etl/idph.py @@ -1,6 +1,5 @@ import datetime import os -import re from contextlib import closing import requests diff --git a/covid19-etl/etl/idph_zipcode.py b/covid19-etl/etl/idph_zipcode.py index 8b2b9a09..04b0dff7 100644 --- a/covid19-etl/etl/idph_zipcode.py +++ b/covid19-etl/etl/idph_zipcode.py @@ -1,6 +1,5 @@ from contextlib import closing import datetime -import re import requests from etl import base diff --git a/covid19-etl/etl/jhu.py b/covid19-etl/etl/jhu.py index ddd4577f..c12c9e15 100644 --- a/covid19-etl/etl/jhu.py +++ b/covid19-etl/etl/jhu.py @@ -42,7 +42,7 @@ def format_summary_clinical_submitter_id(location_submitter_id, date): return "{}_{}".format(sub_id, date) -def format_time_series_date(date): +def time_series_date_to_string(date): return datetime.strptime(date, "%Y-%m-%d").isoformat("T") @@ -128,7 +128,10 @@ def __init__(self, base_url, access_token, s3_bucket): }, }, } - self.existing_data = self.metadata_helper.get_existing_data_jhu() + ( + self.existing_summary_locations, + self.last_date, + ) = self.metadata_helper.get_existing_data_jhu() def files_to_submissions(self): """ @@ -198,17 +201,16 @@ def parse_file(self, file_type, data_type, url): if ( location_submitter_id not in self.location_data # do not re-submit location data that already exist - and location_submitter_id not in self.existing_data + and location_submitter_id not in self.existing_summary_locations ): self.location_data[location_submitter_id] = location for date, value in date_to_value.items(): - date_submitter_id = format_summary_clinical_submitter_id( - location_submitter_id, date - ) - # do not re-submit time_series data that already exist - if date_submitter_id not in self.existing_data.get( - location_submitter_id, [] + # do not re-submit summary_clinical data that + # already exist. Assume anything older than the last + # submitted date has already been submitted + if time_series_date_to_string(date) > time_series_date_to_string( + self.last_date ): self.time_series_data[location_submitter_id][date][ data_type @@ -304,7 +306,6 @@ def submit_metadata(self): `self.location_data already contains Sheepdog records. Batch submits all records in `self.location_data` and `self.time_series_data` """ - print("Submitting summary_location data") for location in self.location_data.values(): record = {"type": "summary_location"} @@ -322,7 +323,7 @@ def submit_metadata(self): "type": "summary_clinical", "submitter_id": submitter_id, "summary_locations": [{"submitter_id": location_submitter_id}], - "date": format_time_series_date(date), + "date": time_series_date_to_string(date), } for data_type, value in data.items(): record[data_type] = value diff --git a/covid19-etl/etl/jhu_country_codes.py b/covid19-etl/etl/jhu_country_codes.py index f7a2e6e5..71f743f3 100644 --- a/covid19-etl/etl/jhu_country_codes.py +++ b/covid19-etl/etl/jhu_country_codes.py @@ -1,9 +1,6 @@ # This only needs to be run once, when new locations are submitted. # The country data was obtained from https://datahub.io/core/country-codes. -import json -import requests - from etl import base from utils.metadata_helper import MetadataHelper from utils.country_codes_utils import get_codes_dictionary, get_codes_for_country_name diff --git a/covid19-etl/etl/jhu_to_s3.py b/covid19-etl/etl/jhu_to_s3.py index 68bfadec..91a13aca 100644 --- a/covid19-etl/etl/jhu_to_s3.py +++ b/covid19-etl/etl/jhu_to_s3.py @@ -7,7 +7,6 @@ import json import os import pathlib -import re import requests import time diff --git a/covid19-etl/etl/vac_tracker.py b/covid19-etl/etl/vac_tracker.py index 3929f0c3..16376b5d 100644 --- a/covid19-etl/etl/vac_tracker.py +++ b/covid19-etl/etl/vac_tracker.py @@ -1,5 +1,3 @@ -import csv -import re from contextlib import closing import requests diff --git a/covid19-etl/utils/async_file_helper.py b/covid19-etl/utils/async_file_helper.py index 0f3ac075..3a1ba625 100644 --- a/covid19-etl/utils/async_file_helper.py +++ b/covid19-etl/utils/async_file_helper.py @@ -70,7 +70,6 @@ async def _async_upload_file(path, url): async with ClientSession() as session: async with session.put(url, data=data) as r: return r.status - return None basename = path.name presigned_url, guid = await self.async_get_presigned_url(basename) diff --git a/covid19-etl/utils/format_helper.py b/covid19-etl/utils/format_helper.py index b8d318a9..640bafcb 100644 --- a/covid19-etl/utils/format_helper.py +++ b/covid19-etl/utils/format_helper.py @@ -16,7 +16,8 @@ def format_submitter_id(node, args): """ submitter_id = node for v in args.values(): - submitter_id += "_{}".format(v) + if v: + submitter_id += "_{}".format(v) submitter_id = submitter_id.lower() submitter_id = re.sub("[^a-z0-9-_]+", "-", submitter_id) diff --git a/covid19-etl/utils/metadata_helper.py b/covid19-etl/utils/metadata_helper.py index 5ce2cbbf..005f4f08 100644 --- a/covid19-etl/utils/metadata_helper.py +++ b/covid19-etl/utils/metadata_helper.py @@ -26,77 +26,54 @@ def __init__(self, base_url, program_name, project_code, access_token): def get_existing_data_jhu(self): """ - Queries Peregrine for the existing `location` and `time_series` data. Returns a dict in format { "location1": [ "date1", "date2" ] } - """ - print("Getting existing data from Peregrine...") - print(" summary_location data...") - query_string = ( - '{ summary_location (first: 0, project_id: "' - + self.project_id - + '") { submitter_id } }' + Queries Guppy for the existing `summary_location` and + `summary_clinical` data. Returns the list of all the existing + summary_location submitter_ids, and the latest submitted date + as a string: + ( + [ + "summary_location_submitter_id1", + "summary_location_submitter_id2", + ... + ], + "2020-11-02" ) - query_res = self.query_peregrine(query_string) - json_res = { - location["submitter_id"]: [] - for location in query_res["data"]["summary_location"] - } - - print(" summary_clinical data...") - - summary_clinicals = [] - data = None - offset = 0 - first = 50000 - max_retries = 3 - while data != []: # don't change, it explicitly checks for empty list - tries = 0 - while tries < max_retries: - print( - " Getting first {} records with offset: {}".format(first, offset) - ) - query_string = ( - "{ summary_clinical (first: " - + str(first) - + ", offset: " - + str(offset) - + ', project_id: "' - + self.project_id - + '") { submitter_id } }' - ) - - query_res = None - try: - query_res = self.query_peregrine(query_string) - except: - print(f"Peregrine did not return JSON: {response.text}") + """ + _filter = {"=": {"project_id": self.project_id}} - if query_res: - data = query_res["data"]["summary_clinical"] - summary_clinicals.extend(data) - offset += first - break - else: - tries += 1 - print(" Trying again (#{})".format(tries)) - sleep(2) # wait 2 seconds - can change to exponential backoff later - assert ( - tries < max_retries - ), " Unable to query Peregrine for existing 'summary_clinical' data" + print("Getting existing summary_location submitter_ids from Guppy...") + res = self.download_from_guppy("location", ["submitter_id"], _filter) + summary_locations = [r["submitter_id"] for r in res] - for sc in summary_clinicals: - sc_id = sc["submitter_id"] - location_id = sc_id.replace("summary_clinical", "summary_location") - location_id = "_".join(location_id.split("_")[:-1]) # remove the date - json_res[location_id].append(sc_id) + print("Getting the latest summary_clinical date from Guppy...") + query_string = """query ($filter: JSON) { + location ( + filter: $filter, + sort: [{date: "desc"}], + first: 1, + accessibility: accessible + ) { + date + } + }""" + variables = {"filter": _filter} + query_res = self.query_guppy(query_string, variables) + if not query_res["data"]["location"]: + raise Exception( + "Did not receive any data from Guppy. Is the token expired?" + ) + loc = query_res["data"]["location"][0] + # from format: %Y-%m-%dT00:00:00 + latest_submitted_date = loc["date"].split("T")[0] - return json_res + return summary_locations, latest_submitted_date def get_latest_submitted_date_idph(self): """ Queries Guppy for the existing `location` data. Returns the latest submitted date as Python "datetime.date" """ - print("Getting the latest date from Guppy...") + print("Getting the latest summary_clinical date from Guppy...") query_string = """query ($filter: JSON) { location ( filter: $filter, @@ -224,3 +201,27 @@ def query_guppy(self, query_string, variables=None): except: print(f"Guppy did not return JSON: {response.text}") raise + + def download_from_guppy(self, _type, fields=None, filter=None): + body = {"type": _type, "accessibility": "accessible"} + if fields: + body["fields"] = fields + if filter: + body["filter"] = filter + + url = f"{self.base_url}/guppy/download" + response = requests.post( + url, + json=body, + headers=self.headers, + ) + try: + response.raise_for_status() + except Exception: + print(f"Unable to download from Guppy.\nBody: {body}") + raise + try: + return response.json() + except: + print(f"Guppy did not return JSON: {response.text}") + raise diff --git a/covid19-notebooks/dashboard_charts/generate_plots.py b/covid19-notebooks/dashboard_charts/generate_plots.py index abe8b635..a3169690 100644 --- a/covid19-notebooks/dashboard_charts/generate_plots.py +++ b/covid19-notebooks/dashboard_charts/generate_plots.py @@ -1,11 +1,6 @@ -import numpy as np import pandas as pd -import matplotlib.pyplot as plt -import seaborn as sns -import gen3 from gen3.auth import Gen3Auth from gen3.submission import Gen3Submission -import itertools from itertools import islice from collections import deque import plotly.graph_objects as go @@ -17,7 +12,6 @@ data_day = ( raw_data_confirmed.groupby(["Country/Region"]).sum().drop(["Lat", "Long"], axis=1) ) -df = data_day.transpose() data = data_day.reset_index().melt(id_vars="Country/Region", var_name="date") # Pivot data to wide & index by date diff --git a/covid19-notebooks/generate_top10_plots.py b/covid19-notebooks/generate_top10_plots.py index dc502a64..6bc50743 100644 --- a/covid19-notebooks/generate_top10_plots.py +++ b/covid19-notebooks/generate_top10_plots.py @@ -1,6 +1,4 @@ -import numpy as np import pandas as pd -import itertools from itertools import islice from collections import deque @@ -10,7 +8,6 @@ data_day = ( raw_data_confirmed.groupby(["Country/Region"]).sum().drop(["Lat", "Long"], axis=1) ) -df = data_day.transpose() data = data_day.reset_index().melt(id_vars="Country/Region", var_name="date") # Pivot data to wide & index by date