Skip to content

Commit

Permalink
fix(ctp): fix ctp (#125)
Browse files Browse the repository at this point in the history
* fix(ctp): fix ctp

* fix(minor): fix minor

* chore(doc): update readme
  • Loading branch information
giangbui authored Oct 7, 2020
1 parent c37a14f commit f2bab8b
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 143 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
| [COV-126][cov-126] | DSCI | [Kaggle][dsci] | One-time |
| [COV-172][cov-172] | DSFSI | [here][dsfsi] | One-time |
| [COV-170][cov-170] | CCMap | [here][ccmap] | One-time |
| [COV-192][cov-192] | OWID | [here][owid] | Scheduled |
| [COV-192][cov-192] | OWID2 | [here][owid] | Scheduled |
| [COV-237][cov-237] | Chicago Neighborhoods Data | [here][chi-nbhd] ([JSON][chi-nbhd-json]) | Scheduled |
| [COV-361][cov-361] | NPI-PRO | [here][npi-pro] | One-time |
| [COV-220][cov-220] | COXRAY | [Kaggle][coxray] | One-time |
| [COV-422][cov-422] | SSR | Controlled data | One-time (for now) |
| [COV-434][cov-434] | STOPLIGHT | [here][stoplight] | scheduled |
| [COV-450][cov-422] | VAC-TRACKER | [here][vac-tracker] | scheduled |
| [COV-453][cov-453] | CHESTX-RAY8 | [here][chestxray8] | One-time |


## Deployment

To deploy the daily/weekly ETLs, use the following setup in adminVM in `crontab`:
Expand All @@ -41,7 +44,7 @@ S3_BUCKET=<name of bucket to upload data to>
20 20 * * * (if [ -f $HOME/cloud-automation/files/scripts/covid19-etl-job.sh ]; then JOB_NAME=idph_facility bash $HOME/cloud-automation/files/scripts/covid19-etl-job.sh; else echo "no codiv19-etl-job.sh"; fi) > $HOME/covid19-etl-$JOB_NAME-cronjob.log 2>&1
30 20 * * * (if [ -f $HOME/cloud-automation/files/scripts/covid19-etl-job.sh ]; then JOB_NAME=idph_hospital bash $HOME/cloud-automation/files/scripts/covid19-etl-job.sh; else echo "no codiv19-etl-job.sh"; fi) > $HOME/logs/covid19-etl-$JOB_NAME-cronjob.log 2>&1
40 20 * * * (if [ -f $HOME/cloud-automation/files/scripts/covid19-etl-job.sh ]; then JOB_NAME=ctp bash $HOME/cloud-automation/files/scripts/covid19-etl-job.sh; else echo "no codiv19-etl-job.sh"; fi) > $HOME/covid19-etl-$JOB_NAME-cronjob.log 2>&1
50 20 * * * (if [ -f $HOME/cloud-automation/files/scripts/covid19-etl-job.sh ]; then JOB_NAME=owid bash $HOME/cloud-automation/files/scripts/covid19-etl-job.sh; else echo "no codiv19-etl-job.sh"; fi) > $HOME/covid19-etl-$JOB_NAME-cronjob.log 2>&1
50 20 * * * (if [ -f $HOME/cloud-automation/files/scripts/covid19-etl-job.sh ]; then JOB_NAME=owid2 bash $HOME/cloud-automation/files/scripts/covid19-etl-job.sh; else echo "no codiv19-etl-job.sh"; fi) > $HOME/covid19-etl-$JOB_NAME-cronjob.log 2>&1
0 21 * * * (if [ -f $HOME/cloud-automation/files/scripts/covid19-etl-job.sh ]; then JOB_NAME=chi_nbhd bash $HOME/cloud-automation/files/scripts/covid19-etl-job.sh; else echo "no codiv19-etl-job.sh"; fi) > $HOME/covid19-etl-$JOB_NAME-cronjob.log 2>&1
0 */3 * * * (if [ -f $HOME/cloud-automation/files/scripts/etl-cronjob.sh ]; then bash $HOME/cloud-automation/files/scripts/etl-cronjob.sh; else echo "no etl-cronjob.sh"; fi) > $HOME/etl-cronjob.log 2>&1
```
Expand Down Expand Up @@ -111,8 +114,11 @@ covid19-tools
[chestxray8]: https://github.com/muhammedtalo/COVID-19
[ccmap]: https://github.com/covidcaremap/covid19-healthsystemcapacity/tree/master/data/published
[ctp]: https://covidtracking.com/data
[race]: https://docs.google.com/spreadsheets/d/e/2PACX-1vR_xmYt4ACPDZCDJcY12kCiMiH0ODyx3E1ZvgOHB8ae1tRcjXbs_yWBOA4j4uoCEADVfC1PS2jYO68B/pub?gid=43720681&single=true&output=csv
[npi-pro]: https://www.arcgis.com/home/item.html?id=7e80baf1773e4fd9b44fe9fb054677db
[ncov2019]: https://www.kaggle.com/sudalairajkumar/novel-corona-virus-2019-dataset?select=COVID19_line_list_data.csv
[vac-tracker]:https://biorender.com/page-data/covid-vaccine-tracker/page-data.json
[stoplight]: https://covidstoplight.org/api/v0/location/US
[cov-12]: https://occ-data.atlassian.net/browse/COV-12
[cov-18]: https://occ-data.atlassian.net/browse/COV-18
[cov-24]: https://occ-data.atlassian.net/browse/COV-24
Expand All @@ -129,4 +135,6 @@ covid19-tools
[cov-345]: https://occ-data.atlassian.net/browse/COV-345
[cov-361]: https://occ-data.atlassian.net/browse/COV-361
[cov-422]: https://occ-data.atlassian.net/browse/COV-422
[cov-434]: https://occ-data.atlassian.net/browse/COV-434
[cov-450]: https://occ-data.atlassian.net/browse/COV-450
[cov-453]: https://occ-data.atlassian.net/browse/COV-453
254 changes: 113 additions & 141 deletions covid19-etl/etl/ctp.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, base_url, access_token, s3_bucket):
super().__init__(base_url, access_token, s3_bucket)
self.summary_locations = []
self.summary_clinicals = []
self.header_to_column = {}

self.program_name = "open"
self.project_code = "CTP"
Expand All @@ -43,101 +44,97 @@ def __init__(self, base_url, access_token, s3_bucket):
access_token=access_token,
)

self.expected_file_headers = [
"date",
"state",
"positive",
"negative",
"pending",
"totalTestResults",
"hospitalizedCurrently",
"hospitalizedCumulative",
"inIcuCurrently",
"inIcuCumulative",
"onVentilatorCurrently",
"onVentilatorCumulative",
"recovered",
"dataQualityGrade",
"lastUpdateEt",
"dateModified",
"checkTimeEt",
"death",
"hospitalized",
"dateChecked",
"totalTestsViral",
"positiveTestsViral",
"negativeTestsViral",
"positiveCasesViral",
"deathConfirmed",
"deathProbable",
"totalTestEncountersViral",
"totalTestsPeopleViral",
"totalTestsAntibody",
"positiveTestsAntibody",
"negativeTestsAntibody",
"totalTestsPeopleAntibody",
"positiveTestsPeopleAntibody",
"negativeTestsPeopleAntibody",
"totalTestsPeopleAntigen",
"positiveTestsPeopleAntigen",
"totalTestsAntigen",
"positiveTestsAntigen",
"fips",
"positiveIncrease",
"negativeIncrease",
"total",
"totalTestResultsSource",
"totalTestResultsIncrease",
"posNeg",
"deathIncrease",
"hospitalizedIncrease",
"hash",
"commercialScore",
"negativeRegularScore",
"negativeScore",
"positiveScore",
"score",
"grade",
]

self.expected_race_headers = [
"Date",
"State",
"Cases_Total",
"Cases_White",
"Cases_Black",
"Cases_LatinX",
"Cases_Asian",
"Cases_AIAN",
"Cases_NHPI",
"Cases_Multiracial",
"Cases_Other",
"Cases_Unknown",
"Cases_Ethnicity_Hispanic",
"Cases_Ethnicity_NonHispanic",
"Cases_Ethnicity_Unknown",
"Deaths_Total",
"Deaths_White",
"Deaths_Black",
"Deaths_LatinX",
"Deaths_Asian",
"Deaths_AIAN",
"Deaths_NHPI",
"Deaths_Multiracial",
"Deaths_Other",
"Deaths_Unknown",
"Deaths_Ethnicity_Hispanic",
"Deaths_Ethnicity_NonHispanic",
"Deaths_Ethnicity_Unknown",
]

self.expected_csv_headers = (
self.expected_file_headers + self.expected_race_headers[3:]
self.expected_file_headers = set(
[
"date",
"state",
"positive",
"negative",
"pending",
"totalTestResults",
"hospitalizedCurrently",
"hospitalizedCumulative",
"inIcuCurrently",
"inIcuCumulative",
"onVentilatorCurrently",
"onVentilatorCumulative",
"recovered",
"dataQualityGrade",
"lastUpdateEt",
"dateModified",
"checkTimeEt",
"death",
"hospitalized",
"dateChecked",
"totalTestsViral",
"positiveTestsViral",
"negativeTestsViral",
"positiveCasesViral",
"deathConfirmed",
"deathProbable",
"totalTestEncountersViral",
"totalTestsPeopleViral",
"totalTestsAntibody",
"positiveTestsAntibody",
"negativeTestsAntibody",
"totalTestsPeopleAntibody",
"positiveTestsPeopleAntibody",
"negativeTestsPeopleAntibody",
"totalTestsPeopleAntigen",
"positiveTestsPeopleAntigen",
"totalTestsAntigen",
"positiveTestsAntigen",
"fips",
"positiveIncrease",
"negativeIncrease",
"total",
"totalTestResultsSource",
"totalTestResultsIncrease",
"posNeg",
"deathIncrease",
"hospitalizedIncrease",
"hash",
"commercialScore",
"negativeRegularScore",
"negativeScore",
"positiveScore",
"score",
"grade",
]
)

self.expected_race_headers = set(
[
"Date",
"State",
"Cases_Total",
"Cases_White",
"Cases_Black",
"Cases_LatinX",
"Cases_Asian",
"Cases_AIAN",
"Cases_NHPI",
"Cases_Multiracial",
"Cases_Other",
"Cases_Unknown",
"Cases_Ethnicity_Hispanic",
"Cases_Ethnicity_NonHispanic",
"Cases_Ethnicity_Unknown",
"Deaths_Total",
"Deaths_White",
"Deaths_Black",
"Deaths_LatinX",
"Deaths_Asian",
"Deaths_AIAN",
"Deaths_NHPI",
"Deaths_Multiracial",
"Deaths_Other",
"Deaths_Unknown",
"Deaths_Ethnicity_Hispanic",
"Deaths_Ethnicity_NonHispanic",
"Deaths_Ethnicity_Unknown",
]
)
self.header_to_column = {
k: self.expected_csv_headers.index(k) for k in self.expected_csv_headers
}
print("The length of the expected headers: ", len(self.header_to_column))

def files_to_submissions(self):
"""
Expand All @@ -162,19 +159,23 @@ def extract_races(self):
print(" Unable to get file contents, received {}.".format(headers))
return

expected_h = self.expected_race_headers
obtained_h = headers[: len(expected_h)]
assert (
obtained_h == expected_h
), "CSV headers have changed (expected {}, got {}). We may need to update the ETL code".format(
expected_h, obtained_h
assert (headers[0], headers[1], headers[2]) == (
"Date",
"State",
"Cases_Total",
), "The first 3 column names of the race data must be Dat, State, Cases_Total"

assert self.expected_race_headers.issubset(
set(headers)
), "CSV headers have changed (expected {} is a subset of {}). We may need to update the ETL code".format(
self.expected_race_headers, headers
)
for row in reader:
try:
races[(row[0], row[1], row[2])] = row[3:]
except Exception as e:
print(f"Error. Detail {e}")
return races
return races, headers

def parse_file(self, url):
"""
Expand All @@ -186,7 +187,7 @@ def parse_file(self, url):
Args:
url (str): URL at which the CSV file is available
"""
races = self.extract_races()
races, race_headers = self.extract_races()
print("Getting data from {}".format(url))
with closing(requests.get(url, stream=True)) as r:
f = (line.decode("utf-8") for line in r.iter_lines())
Expand All @@ -198,14 +199,17 @@ def parse_file(self, url):
print(" Unable to get file contents, received {}.".format(headers))
return

expected_h = self.expected_file_headers
obtained_h = headers[: len(expected_h)]
assert (
obtained_h == expected_h
), "CSV headers have changed (expected {}, got {}). We may need to update the ETL code".format(
expected_h, obtained_h
assert self.expected_file_headers.issubset(
set(headers)
), "CSV headers have changed (expected {} is a subset of {}). We may need to update the ETL code".format(
self.expected_file_headers, headers
)

headers = headers + race_headers[3:]

for i in range(0, len(headers)):
self.header_to_column[headers[i]] = i

summary_location_list = []

for row in reader:
Expand All @@ -228,7 +232,6 @@ def parse_row(self, row):
Converts a row of a CSV file to data we can submit via Sheepdog
Args:
headers (list(str)): CSV file headers (first row of the file)
row (list(str)): row of data
Returns:
Expand Down Expand Up @@ -274,47 +277,16 @@ def parse_row(self, row):
"inIcuCurrently": "inIcuCurrently",
"inIcuCumulative": "inIcuCumulative",
"onVentilatorCurrently": "onVentilatorCurrently",
# "": "onVentilatorCumulative",
"recovered": "recovered",
# "": "dataQualityGrade",
# "": "lastUpdateEt",
# "": "dateModified",
# "": "checkTimeEt",
# "": "death",
# "": "hospitalized",
# "": "dateChecked",
"totalTestsViral": "totalTestsViral",
"positiveTestsViral": "positiveTestsViral",
"negativeTestsViral": "negativeTestsViral",
"positiveCasesViral": "positiveCasesViral",
# "deaths": "deathConfirmed",
# "": "deathProbable",
# "": "totalTestEncountersViral",
# "": "totalTestsPeopleViral",
# "": "totalTestsAntibody",
# "": "positiveTestsAntibody",
# "": "negativeTestsAntibody",
# "": "totalTestsPeopleAntibody",
# "": "positiveTestsPeopleAntibody",
# "": "negativeTestsPeopleAntibody",
# "": "totalTestsPeopleAntigen",
# "": "positiveTestsPeopleAntigen",
# "": "totalTestsAntigen",
# "": "positiveTestsAntigen",
# "": "fips",
"positiveIncrease": "positiveIncrease",
"negativeIncrease": "negativeIncrease",
# "": "total",
# "": "totalTestResults",
"totalTestResultsIncrease": "totalTestResultsIncrease",
# "": "posNeg",
"deathIncrease": "deathIncrease",
"hospitalizedIncrease": "hospitalizedIncrease",
# "": "hash",
# "": "commercialScore",
# "": "negativeRegularScore",
# "": "negativeScore",
# "": "positiveScore",
"race_white_count": "Cases_White",
"race_black_count": "Cases_Black",
"race_hispanic_count": "Cases_LatinX",
Expand Down Expand Up @@ -373,7 +345,7 @@ def submit_metadata(self):
self.metadata_helper.add_record_to_submit(loc_record)
self.metadata_helper.batch_submit_records()

# print("Submitting summary_clinical data")
print("Submitting summary_clinical data")
for sc in self.summary_clinicals:
sc_record = {"type": "summary_clinical"}
sc_record.update(sc)
Expand Down

0 comments on commit f2bab8b

Please sign in to comment.