Skip to content

Commit

Permalink
Use JSON Lines
Browse files Browse the repository at this point in the history
This commit moves our sul_pub, dimensions and openalex harvesters over
to writing records as JSON Lines (jsonl). This means we will preserve
the dictionary and list data structures we received from the APIs and
will be able to use them for querying later, e.g. ORCID IDs embedded in
author objects.

Reading in JSON-L can be achieved with Pandas like:

```python
pandas.read_json('data.jsonl', orient='records', lines=True)
```

and Polars:

```python
polars.read_ndjson('data.jsonl')
```

or lazily:

```python
polars.scan_ndjson('data.jsonl')
```
  • Loading branch information
edsu committed Jun 28, 2024
1 parent a6d32db commit 4db75b6
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 231 deletions.
31 changes: 17 additions & 14 deletions rialto_airflow/dags/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from airflow.decorators import dag, task

from rialto_airflow.utils import create_snapshot_dir, rialto_authors_file
from rialto_airflow.harvest import dimensions, openalex, merge_pubs
from rialto_airflow.harvest.sul_pub import sul_pub_csv
from rialto_airflow.harvest import dimensions, openalex, merge_pubs, sul_pub
from rialto_airflow.harvest.doi_set import create_doi_set


Expand Down Expand Up @@ -64,10 +63,12 @@ def sul_pub_harvest(snapshot_dir):
"""
Harvest data from SUL-Pub.
"""
csv_file = Path(snapshot_dir) / "sulpub.csv"
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, limit=dev_limit)
jsonl_file = Path(snapshot_dir) / "sulpub.jsonl"
sul_pub.publications_jsonl(
jsonl_file, sul_pub_host, sul_pub_key, limit=dev_limit
)

return str(csv_file)
return str(jsonl_file)

@task()
def doi_set(dimensions, openalex, sul_pub):
Expand All @@ -82,18 +83,18 @@ def dimensions_harvest_pubs(dois, snapshot_dir):
"""
Harvest publication metadata from Dimensions using the dois from doi_set.
"""
csv_file = Path(snapshot_dir) / "dimensions-pubs.csv"
dimensions.publications_csv(dois, csv_file)
return str(csv_file)
jsonl_file = Path(snapshot_dir) / "dimensions-pubs.jsonl"
dimensions.publications_jsonl(dois, jsonl_file)
return str(jsonl_file)

@task()
def openalex_harvest_pubs(dois, snapshot_dir):
"""
Harvest publication metadata from OpenAlex using the dois from doi_set.
"""
csv_file = Path(snapshot_dir) / "openalex-pubs.csv"
openalex.publications_csv(dois, csv_file)
return str(csv_file)
jsonl_file = Path(snapshot_dir) / "openalex-pubs.jsonl"
openalex.publications_jsonl(dois, jsonl_file)
return str(jsonl_file)

@task()
def merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir):
Expand Down Expand Up @@ -129,19 +130,21 @@ def publish(dataset):

authors_csv = find_authors_csv()

sul_pub = sul_pub_harvest(snapshot_dir)
sul_pub_pubs = sul_pub_harvest(snapshot_dir)

dimensions_dois = dimensions_harvest_dois(authors_csv, snapshot_dir)

openalex_dois = openalex_harvest_dois(authors_csv, snapshot_dir)

dois = doi_set(dimensions_dois, openalex_dois, sul_pub)
dois = doi_set(dimensions_dois, openalex_dois, sul_pub_pubs)

dimensions_pubs = dimensions_harvest_pubs(dois, snapshot_dir)

openalex_pubs = openalex_harvest_pubs(dois, snapshot_dir)

pubs = merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir)
pubs = merge_publications(
sul_pub_pubs, openalex_pubs, dimensions_pubs, snapshot_dir
)

pubs_authors = join_authors(pubs, authors_csv)

Expand Down
14 changes: 6 additions & 8 deletions rialto_airflow/harvest/dimensions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import csv
import json
import logging
import os
import pickle
Expand Down Expand Up @@ -58,13 +58,11 @@ def doi_orcids_pickle(authors_csv, pickle_file, limit=None) -> None:
pickle.dump(invert_dict(orcid_dois), handle, protocol=pickle.HIGHEST_PROTOCOL)


def publications_csv(dois, csv_file) -> None:
with open(csv_file, "w") as output:
writer = csv.DictWriter(output, publication_fields())
writer.writeheader()
for pub in publications_from_dois(dois):
logging.info(f"writing metadata for {pub.get('doi')}")
writer.writerow(pub)
def publications_jsonl(dois, jsonl_file) -> None:
with open(jsonl_file, "w") as output:
for record in publications_from_dois(dois):
logging.info(f"writing metadata for {record.get('doi')}")
output.write(json.dumps(record, ensure_ascii=False) + "\n")


def publications_from_dois(dois: list, batch_size=200):
Expand Down
16 changes: 8 additions & 8 deletions rialto_airflow/harvest/doi_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import pickle


def create_doi_set(dimensions: str, openalex: str, sul_pub_csv: str) -> list:
import polars as pl


def create_doi_set(dimensions: str, openalex: str, sul_pub_jsonl: str) -> list:
"""Get DOIs from each source and dedupe."""
dimensions_dois = dois_from_pickle(dimensions)
openalex_dois = dois_from_pickle(openalex)
sul_pub_dois = get_sul_pub_dois(sul_pub_csv)
sul_pub_dois = get_sul_pub_dois(sul_pub_jsonl)
unique_dois = list(set(dimensions_dois + openalex_dois + sul_pub_dois))
logging.info(f"found {len(unique_dois)}")

Expand All @@ -23,10 +26,7 @@ def dois_from_pickle(pickle_file: str) -> list:
return dois


def get_sul_pub_dois(sul_pub_csv: str) -> list:
def get_sul_pub_dois(sul_pub_jsonl: str) -> list:
"""Extract DOIs from sul_pub CSV and remove empty values."""
with open(sul_pub_csv, "r") as file:
reader = csv.DictReader(file)
doi_column = [row["doi"] for row in reader if row["doi"]]

return doi_column
df = pl.read_ndjson(sul_pub_jsonl)
return df["doi"].to_list()
6 changes: 3 additions & 3 deletions rialto_airflow/harvest/merge_pubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def dimensions_pubs_df(dimensions_pubs):
# Create a LazyFrame of dimension pubs to avoid loading all data into memory
"""
# Polars is inferring volume is an integer, but it should be a string e.g. "97-B"
df = pl.scan_csv(dimensions_pubs, schema_overrides={"volume": pl.String})
df = pl.scan_ndjson(dimensions_pubs)
df = df.select(
pl.col(
"authors",
Expand All @@ -69,7 +69,7 @@ def openalex_pubs_df(openalex_pubs):
"""
Create an openalex pubs LazyFrame and rename columns
"""
df = pl.scan_csv(openalex_pubs)
df = pl.scan_ndjson(openalex_pubs)
df = df.select(
pl.col("doi").str.replace("https://doi.org/", ""),
pl.col(
Expand All @@ -84,7 +84,7 @@ def sulpub_df(sul_pub):
"""
Create a sulpub LazyFrame and rename columns
"""
df = pl.scan_csv(sul_pub)
df = pl.scan_ndjson(sul_pub)
df = df.drop_nulls("doi")
df = df.with_columns(pl.col("doi").str.replace("https://doi.org/", ""))
df = df.rename(lambda column_name: "sul_pub_" + column_name)
Expand Down
14 changes: 7 additions & 7 deletions rialto_airflow/harvest/openalex.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import json
import logging
import os
import pickle
Expand Down Expand Up @@ -67,15 +68,14 @@ def dois_from_orcid(orcid: str, limit=None):
yield pub.get("doi").replace("https://doi.org/", "")


def publications_csv(dois: list, csv_file: str) -> None:
def publications_jsonl(dois: list, jsonl_file: str) -> None:
"""
Get publication records for a list of DOIs and create a CSV file.
Get publication records for a list of DOIs and create a JSONL file.
"""
with open(csv_file, "w") as output:
writer = csv.DictWriter(output, fieldnames=FIELDS)
writer.writeheader()
for pub in publications_from_dois(dois):
writer.writerow(pub)
with open(jsonl_file, "w") as output:
for record in publications_from_dois(dois):
logging.info(f"writing metadata for {record.get('doi')}")
output.write(json.dumps(record, ensure_ascii=False) + "\n")


def publications_from_dois(dois: list, batch_size=75):
Expand Down
46 changes: 8 additions & 38 deletions rialto_airflow/harvest/sul_pub.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,15 @@
import csv
import json
import logging

import requests


SUL_PUB_FIELDS = [
"authorship",
"title",
"abstract",
"author",
"year",
"type",
"mesh_headings",
"publisher",
"journal",
"provenance",
"doi",
"issn",
"sulpubid",
"sw_id",
"pmid",
"identifier",
"last_updated",
"pages",
"date",
"country",
"booktitle",
"edition",
"series",
"chapter",
"editor",
]


def sul_pub_csv(csv_file, host, key, since=None, limit=None):
with open(csv_file, "w") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=SUL_PUB_FIELDS)
writer.writeheader()
for row in harvest(host, key, since, limit):
writer.writerow(row)
def publications_jsonl(jsonl_file, host, key, since=None, limit=None):
with open(jsonl_file, "w") as output:
for record in harvest(host, key, since, limit):
json.dump(record, output, ensure_ascii=False)
output.write("\n")


def harvest(host, key, since, limit):
Expand Down Expand Up @@ -73,10 +44,9 @@ def harvest(host, key, since, limit):
more = False
break

pub = {key: record[key] for key in record if key in SUL_PUB_FIELDS}
pub["doi"] = extract_doi(record)
record["doi"] = extract_doi(record)

yield pub
yield record


def extract_doi(record):
Expand Down
10 changes: 10 additions & 0 deletions rialto_airflow/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import json
import datetime
from pathlib import Path

Expand Down Expand Up @@ -54,3 +55,12 @@ def invert_dict(dict):
inverted_dict[i] = [k for k, v in dict.items() if i in v]

return inverted_dict


def write_jsonl(filename, records):
"""
Write a list of dictionaries as line-oriented JSON.
"""
with open(filename, "w") as output:
for record in records:
output.write(json.dumps(record) + "\n")
10 changes: 5 additions & 5 deletions test/harvest/test_dimensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ def test_publication_fields():
assert "title" in fields


def test_publications_csv(tmpdir):
pubs_csv = tmpdir / "dimensions-pubs.csv"
dimensions.publications_csv(
["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_csv
def test_publications_jsonl(tmpdir):
pubs_jsonl = tmpdir / "dimensions-pubs.jsonl"
dimensions.publications_jsonl(
["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_jsonl
)

df = pandas.read_csv(pubs_csv)
df = pandas.read_json(pubs_jsonl, orient="records", lines=True)

assert len(df) == 2

Expand Down
26 changes: 16 additions & 10 deletions test/harvest/test_doi_set.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import csv
import pickle

import pytest

from rialto_airflow.harvest.doi_set import create_doi_set
from rialto_airflow.utils import write_jsonl


@pytest.fixture
Expand Down Expand Up @@ -33,18 +33,24 @@ def openalex_pickle(tmp_path):


@pytest.fixture
def sul_pub_csv(tmp_path):
fixture_file = tmp_path / "sul_pub.csv"
with open(fixture_file, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["sunetid", "title", "doi"])
writer.writerow(["author1", "A Publication", "10.0000/aaaa"])
writer.writerow(["author2", "A Research Article", "10.0000/1234"])
def sul_pub_jsonl(tmp_path):
fixture_file = tmp_path / "sul_pub.jsonl"
write_jsonl(
fixture_file,
[
{"sunetid": "author1", "title": "A Publication", "doi": "10.0000/aaaa"},
{
"sunetid": "author2",
"title": "A Research Article",
"doi": "10.0000/1234",
},
],
)
return fixture_file


def test_doi_set(dimensions_pickle, openalex_pickle, sul_pub_csv):
dois = create_doi_set(dimensions_pickle, openalex_pickle, sul_pub_csv)
def test_doi_set(dimensions_pickle, openalex_pickle, sul_pub_jsonl):
dois = create_doi_set(dimensions_pickle, openalex_pickle, sul_pub_jsonl)
assert len(dois) == 4
assert set(dois) == set(
["10.0000/1234", "10.0000/aaaa", "10.0000/cccc", "10.0000/zzzz"]
Expand Down
Loading

0 comments on commit 4db75b6

Please sign in to comment.