Skip to content

Commit

Permalink
JHU query Guppy (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulineribeyre authored Nov 3, 2020
1 parent e7a3dd3 commit e4c4195
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 93 deletions.
2 changes: 1 addition & 1 deletion covid19-etl/etl/ds4c.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


def harmonize_gender(gender):
{"male": "Male", "female": "Female", "": "Not reported"}[gender]
return {"male": "Male", "female": "Female", "": "Not reported"}[gender]


class DS4C(base.BaseETL):
Expand Down
2 changes: 1 addition & 1 deletion covid19-etl/etl/dsci.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


def harmonize_gender(gender):
{"male": "Male", "female": "Female", "": "Not reported"}[gender]
return {"male": "Male", "female": "Female", "": "Not reported"}[gender]


def format_date(date):
Expand Down
1 change: 0 additions & 1 deletion covid19-etl/etl/idph.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime
import os
import re
from contextlib import closing

import requests
Expand Down
1 change: 0 additions & 1 deletion covid19-etl/etl/idph_zipcode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from contextlib import closing
import datetime
import re
import requests

from etl import base
Expand Down
23 changes: 12 additions & 11 deletions covid19-etl/etl/jhu.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def format_summary_clinical_submitter_id(location_submitter_id, date):
return "{}_{}".format(sub_id, date)


def format_time_series_date(date):
def time_series_date_to_string(date):
return datetime.strptime(date, "%Y-%m-%d").isoformat("T")


Expand Down Expand Up @@ -128,7 +128,10 @@ def __init__(self, base_url, access_token, s3_bucket):
},
},
}
self.existing_data = self.metadata_helper.get_existing_data_jhu()
(
self.existing_summary_locations,
self.last_date,
) = self.metadata_helper.get_existing_data_jhu()

def files_to_submissions(self):
"""
Expand Down Expand Up @@ -198,17 +201,16 @@ def parse_file(self, file_type, data_type, url):
if (
location_submitter_id not in self.location_data
# do not re-submit location data that already exist
and location_submitter_id not in self.existing_data
and location_submitter_id not in self.existing_summary_locations
):
self.location_data[location_submitter_id] = location

for date, value in date_to_value.items():
date_submitter_id = format_summary_clinical_submitter_id(
location_submitter_id, date
)
# do not re-submit time_series data that already exist
if date_submitter_id not in self.existing_data.get(
location_submitter_id, []
# do not re-submit summary_clinical data that
# already exist. Assume anything older than the last
# submitted date has already been submitted
if time_series_date_to_string(date) > time_series_date_to_string(
self.last_date
):
self.time_series_data[location_submitter_id][date][
data_type
Expand Down Expand Up @@ -304,7 +306,6 @@ def submit_metadata(self):
`self.location_data already contains Sheepdog records. Batch submits
all records in `self.location_data` and `self.time_series_data`
"""

print("Submitting summary_location data")
for location in self.location_data.values():
record = {"type": "summary_location"}
Expand All @@ -322,7 +323,7 @@ def submit_metadata(self):
"type": "summary_clinical",
"submitter_id": submitter_id,
"summary_locations": [{"submitter_id": location_submitter_id}],
"date": format_time_series_date(date),
"date": time_series_date_to_string(date),
}
for data_type, value in data.items():
record[data_type] = value
Expand Down
3 changes: 0 additions & 3 deletions covid19-etl/etl/jhu_country_codes.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# This only needs to be run once, when new locations are submitted.
# The country data was obtained from https://datahub.io/core/country-codes.

import json
import requests

from etl import base
from utils.metadata_helper import MetadataHelper
from utils.country_codes_utils import get_codes_dictionary, get_codes_for_country_name
Expand Down
1 change: 0 additions & 1 deletion covid19-etl/etl/jhu_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import json
import os
import pathlib
import re
import requests
import time

Expand Down
2 changes: 0 additions & 2 deletions covid19-etl/etl/vac_tracker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import csv
import re
from contextlib import closing

import requests
Expand Down
1 change: 0 additions & 1 deletion covid19-etl/utils/async_file_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ async def _async_upload_file(path, url):
async with ClientSession() as session:
async with session.put(url, data=data) as r:
return r.status
return None

basename = path.name
presigned_url, guid = await self.async_get_presigned_url(basename)
Expand Down
3 changes: 2 additions & 1 deletion covid19-etl/utils/format_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def format_submitter_id(node, args):
"""
submitter_id = node
for v in args.values():
submitter_id += "_{}".format(v)
if v:
submitter_id += "_{}".format(v)

submitter_id = submitter_id.lower()
submitter_id = re.sub("[^a-z0-9-_]+", "-", submitter_id)
Expand Down
123 changes: 62 additions & 61 deletions covid19-etl/utils/metadata_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,77 +26,54 @@ def __init__(self, base_url, program_name, project_code, access_token):

def get_existing_data_jhu(self):
"""
Queries Peregrine for the existing `location` and `time_series` data. Returns a dict in format { "location1": [ "date1", "date2" ] }
"""
print("Getting existing data from Peregrine...")
print(" summary_location data...")
query_string = (
'{ summary_location (first: 0, project_id: "'
+ self.project_id
+ '") { submitter_id } }'
Queries Guppy for the existing `summary_location` and
`summary_clinical` data. Returns the list of all the existing
summary_location submitter_ids, and the latest submitted date
as a string:
(
[
"summary_location_submitter_id1",
"summary_location_submitter_id2",
...
],
"2020-11-02"
)
query_res = self.query_peregrine(query_string)
json_res = {
location["submitter_id"]: []
for location in query_res["data"]["summary_location"]
}

print(" summary_clinical data...")

summary_clinicals = []
data = None
offset = 0
first = 50000
max_retries = 3
while data != []: # don't change, it explicitly checks for empty list
tries = 0
while tries < max_retries:
print(
" Getting first {} records with offset: {}".format(first, offset)
)
query_string = (
"{ summary_clinical (first: "
+ str(first)
+ ", offset: "
+ str(offset)
+ ', project_id: "'
+ self.project_id
+ '") { submitter_id } }'
)

query_res = None
try:
query_res = self.query_peregrine(query_string)
except:
print(f"Peregrine did not return JSON: {response.text}")
"""
_filter = {"=": {"project_id": self.project_id}}

if query_res:
data = query_res["data"]["summary_clinical"]
summary_clinicals.extend(data)
offset += first
break
else:
tries += 1
print(" Trying again (#{})".format(tries))
sleep(2) # wait 2 seconds - can change to exponential backoff later
assert (
tries < max_retries
), " Unable to query Peregrine for existing 'summary_clinical' data"
print("Getting existing summary_location submitter_ids from Guppy...")
res = self.download_from_guppy("location", ["submitter_id"], _filter)
summary_locations = [r["submitter_id"] for r in res]

for sc in summary_clinicals:
sc_id = sc["submitter_id"]
location_id = sc_id.replace("summary_clinical", "summary_location")
location_id = "_".join(location_id.split("_")[:-1]) # remove the date
json_res[location_id].append(sc_id)
print("Getting the latest summary_clinical date from Guppy...")
query_string = """query ($filter: JSON) {
location (
filter: $filter,
sort: [{date: "desc"}],
first: 1,
accessibility: accessible
) {
date
}
}"""
variables = {"filter": _filter}
query_res = self.query_guppy(query_string, variables)
if not query_res["data"]["location"]:
raise Exception(
"Did not receive any data from Guppy. Is the token expired?"
)
loc = query_res["data"]["location"][0]
# from format: %Y-%m-%dT00:00:00
latest_submitted_date = loc["date"].split("T")[0]

return json_res
return summary_locations, latest_submitted_date

def get_latest_submitted_date_idph(self):
"""
Queries Guppy for the existing `location` data.
Returns the latest submitted date as Python "datetime.date"
"""
print("Getting the latest date from Guppy...")
print("Getting the latest summary_clinical date from Guppy...")
query_string = """query ($filter: JSON) {
location (
filter: $filter,
Expand Down Expand Up @@ -224,3 +201,27 @@ def query_guppy(self, query_string, variables=None):
except:
print(f"Guppy did not return JSON: {response.text}")
raise

def download_from_guppy(self, _type, fields=None, filter=None):
body = {"type": _type, "accessibility": "accessible"}
if fields:
body["fields"] = fields
if filter:
body["filter"] = filter

url = f"{self.base_url}/guppy/download"
response = requests.post(
url,
json=body,
headers=self.headers,
)
try:
response.raise_for_status()
except Exception:
print(f"Unable to download from Guppy.\nBody: {body}")
raise
try:
return response.json()
except:
print(f"Guppy did not return JSON: {response.text}")
raise
6 changes: 0 additions & 6 deletions covid19-notebooks/dashboard_charts/generate_plots.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import gen3
from gen3.auth import Gen3Auth
from gen3.submission import Gen3Submission
import itertools
from itertools import islice
from collections import deque
import plotly.graph_objects as go
Expand All @@ -17,7 +12,6 @@
data_day = (
raw_data_confirmed.groupby(["Country/Region"]).sum().drop(["Lat", "Long"], axis=1)
)
df = data_day.transpose()
data = data_day.reset_index().melt(id_vars="Country/Region", var_name="date")

# Pivot data to wide & index by date
Expand Down
3 changes: 0 additions & 3 deletions covid19-notebooks/generate_top10_plots.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import numpy as np
import pandas as pd
import itertools
from itertools import islice
from collections import deque

Expand All @@ -10,7 +8,6 @@
data_day = (
raw_data_confirmed.groupby(["Country/Region"]).sum().drop(["Lat", "Long"], axis=1)
)
df = data_day.transpose()
data = data_day.reset_index().melt(id_vars="Country/Region", var_name="date")

# Pivot data to wide & index by date
Expand Down

0 comments on commit e4c4195

Please sign in to comment.