diff --git a/rialto_airflow/dags/harvest.py b/rialto_airflow/dags/harvest.py index 94db713..829fd7f 100644 --- a/rialto_airflow/dags/harvest.py +++ b/rialto_airflow/dags/harvest.py @@ -5,8 +5,7 @@ from airflow.decorators import dag, task from rialto_airflow.utils import create_snapshot_dir, rialto_authors_file -from rialto_airflow.harvest import dimensions, openalex, merge_pubs -from rialto_airflow.harvest.sul_pub import sul_pub_csv +from rialto_airflow.harvest import dimensions, openalex, merge_pubs, sul_pub from rialto_airflow.harvest.doi_set import create_doi_set @@ -64,10 +63,10 @@ def sul_pub_harvest(snapshot_dir): """ Harvest data from SUL-Pub. """ - csv_file = Path(snapshot_dir) / "sulpub.csv" - sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, limit=dev_limit) + jsonl_file = Path(snapshot_dir) / "sulpub.jsonl" + sul_pub.publications_jsonl(jsonl_file, sul_pub_host, sul_pub_key, limit=dev_limit) - return str(csv_file) + return str(jsonl_file) @task() def doi_set(dimensions, openalex, sul_pub): @@ -82,18 +81,18 @@ def dimensions_harvest_pubs(dois, snapshot_dir): """ Harvest publication metadata from Dimensions using the dois from doi_set. """ - csv_file = Path(snapshot_dir) / "dimensions-pubs.csv" - dimensions.publications_csv(dois, csv_file) - return str(csv_file) + jsonl_file = Path(snapshot_dir) / "dimensions-pubs.csv" + dimensions.publications_jsonl(dois, jsonl_file) + return str(jsonl_file) @task() def openalex_harvest_pubs(dois, snapshot_dir): """ Harvest publication metadata from OpenAlex using the dois from doi_set. """ - csv_file = Path(snapshot_dir) / "openalex-pubs.csv" - openalex.publications_csv(dois, csv_file) - return str(csv_file) + jsonl_file = Path(snapshot_dir) / "openalex-pubs.csv" + openalex.publications_jsonl(dois, jsonl_file) + return str(jsonl_file) @task() def merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir): @@ -129,19 +128,19 @@ def publish(dataset): authors_csv = find_authors_csv() - sul_pub = sul_pub_harvest(snapshot_dir) + sul_pub_pubs = sul_pub_harvest(snapshot_dir) dimensions_dois = dimensions_harvest_dois(authors_csv, snapshot_dir) openalex_dois = openalex_harvest_dois(authors_csv, snapshot_dir) - dois = doi_set(dimensions_dois, openalex_dois, sul_pub) + dois = doi_set(dimensions_dois, openalex_dois, sul_pub_pubs) dimensions_pubs = dimensions_harvest_pubs(dois, snapshot_dir) openalex_pubs = openalex_harvest_pubs(dois, snapshot_dir) - pubs = merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir) + pubs = merge_publications(sul_pub_pubs, openalex_pubs, dimensions_pubs, snapshot_dir) pubs_authors = join_authors(pubs, authors_csv) diff --git a/rialto_airflow/harvest/dimensions.py b/rialto_airflow/harvest/dimensions.py index fe69bab..f019051 100644 --- a/rialto_airflow/harvest/dimensions.py +++ b/rialto_airflow/harvest/dimensions.py @@ -1,4 +1,4 @@ -import csv +import json import logging import os import pickle @@ -58,13 +58,11 @@ def doi_orcids_pickle(authors_csv, pickle_file, limit=None) -> None: pickle.dump(invert_dict(orcid_dois), handle, protocol=pickle.HIGHEST_PROTOCOL) -def publications_csv(dois, csv_file) -> None: - with open(csv_file, "w") as output: - writer = csv.DictWriter(output, publication_fields()) - writer.writeheader() - for pub in publications_from_dois(dois): - logging.info(f"writing metadata for {pub.get('doi')}") - writer.writerow(pub) +def publications_jsonl(dois, jsonl_file) -> None: + with open(jsonl_file, "w") as output: + for record in publications_from_dois(dois): + logging.info(f"writing metadata for {record.get('doi')}") + output.write(json.dumps(record, ensure_ascii=False) + "\n") def publications_from_dois(dois: list, batch_size=200): diff --git a/rialto_airflow/harvest/doi_set.py b/rialto_airflow/harvest/doi_set.py index 6bfc199..add6620 100644 --- a/rialto_airflow/harvest/doi_set.py +++ b/rialto_airflow/harvest/doi_set.py @@ -3,11 +3,13 @@ import pickle -def create_doi_set(dimensions: str, openalex: str, sul_pub_csv: str) -> list: +import polars as pl + +def create_doi_set(dimensions: str, openalex: str, sul_pub_jsonl: str) -> list: """Get DOIs from each source and dedupe.""" dimensions_dois = dois_from_pickle(dimensions) openalex_dois = dois_from_pickle(openalex) - sul_pub_dois = get_sul_pub_dois(sul_pub_csv) + sul_pub_dois = get_sul_pub_dois(sul_pub_jsonl) unique_dois = list(set(dimensions_dois + openalex_dois + sul_pub_dois)) logging.info(f"found {len(unique_dois)}") @@ -23,10 +25,7 @@ def dois_from_pickle(pickle_file: str) -> list: return dois -def get_sul_pub_dois(sul_pub_csv: str) -> list: +def get_sul_pub_dois(sul_pub_jsonl: str) -> list: """Extract DOIs from sul_pub CSV and remove empty values.""" - with open(sul_pub_csv, "r") as file: - reader = csv.DictReader(file) - doi_column = [row["doi"] for row in reader if row["doi"]] - - return doi_column + df = pl.read_ndjson(sul_pub_jsonl) + return df['doi'].to_list() diff --git a/rialto_airflow/harvest/merge_pubs.py b/rialto_airflow/harvest/merge_pubs.py index 3477210..d9da8c2 100644 --- a/rialto_airflow/harvest/merge_pubs.py +++ b/rialto_airflow/harvest/merge_pubs.py @@ -44,7 +44,7 @@ def dimensions_pubs_df(dimensions_pubs): # Create a LazyFrame of dimension pubs to avoid loading all data into memory """ # Polars is inferring volume is an integer, but it should be a string e.g. "97-B" - df = pl.scan_csv(dimensions_pubs, schema_overrides={"volume": pl.String}) + df = pl.scan_ndjson(dimensions_pubs) df = df.select( pl.col( "authors", @@ -69,7 +69,7 @@ def openalex_pubs_df(openalex_pubs): """ Create an openalex pubs LazyFrame and rename columns """ - df = pl.scan_csv(openalex_pubs) + df = pl.scan_ndjson(openalex_pubs) df = df.select( pl.col("doi").str.replace("https://doi.org/", ""), pl.col( @@ -84,7 +84,7 @@ def sulpub_df(sul_pub): """ Create a sulpub LazyFrame and rename columns """ - df = pl.scan_csv(sul_pub) + df = pl.scan_ndjson(sul_pub) df = df.drop_nulls("doi") df = df.with_columns(pl.col("doi").str.replace("https://doi.org/", "")) df = df.rename(lambda column_name: "sul_pub_" + column_name) diff --git a/rialto_airflow/harvest/openalex.py b/rialto_airflow/harvest/openalex.py index e1d9eeb..d658e02 100644 --- a/rialto_airflow/harvest/openalex.py +++ b/rialto_airflow/harvest/openalex.py @@ -1,4 +1,5 @@ import csv +import json import logging import os import pickle @@ -67,15 +68,13 @@ def dois_from_orcid(orcid: str, limit=None): yield pub.get("doi").replace("https://doi.org/", "") -def publications_csv(dois: list, csv_file: str) -> None: +def publications_jsonl(dois: list, jsonl_file: str) -> None: """ - Get publication records for a list of DOIs and create a CSV file. + Get publication records for a list of DOIs and create a JSONL file. """ - with open(csv_file, "w") as output: - writer = csv.DictWriter(output, fieldnames=FIELDS) - writer.writeheader() - for pub in publications_from_dois(dois): - writer.writerow(pub) + with open(jsonl_file, "w") as output: + for record in publications_from_dois(dois): + output.write(json.dumps(record, ensure_ascii=False) + "\n") def publications_from_dois(dois: list, batch_size=75): diff --git a/rialto_airflow/harvest/sul_pub.py b/rialto_airflow/harvest/sul_pub.py index 9395f1e..7b8463a 100644 --- a/rialto_airflow/harvest/sul_pub.py +++ b/rialto_airflow/harvest/sul_pub.py @@ -1,44 +1,15 @@ import csv +import json import logging import requests -SUL_PUB_FIELDS = [ - "authorship", - "title", - "abstract", - "author", - "year", - "type", - "mesh_headings", - "publisher", - "journal", - "provenance", - "doi", - "issn", - "sulpubid", - "sw_id", - "pmid", - "identifier", - "last_updated", - "pages", - "date", - "country", - "booktitle", - "edition", - "series", - "chapter", - "editor", -] - - -def sul_pub_csv(csv_file, host, key, since=None, limit=None): - with open(csv_file, "w") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=SUL_PUB_FIELDS) - writer.writeheader() - for row in harvest(host, key, since, limit): - writer.writerow(row) +def publications_jsonl(jsonl_file, host, key, since=None, limit=None): + with open(jsonl_file, "w") as output: + for record in harvest(host, key, since, limit): + json.dump(record, output, ensure_ascii=False) + output.write("\n") def harvest(host, key, since, limit): @@ -73,10 +44,9 @@ def harvest(host, key, since, limit): more = False break - pub = {key: record[key] for key in record if key in SUL_PUB_FIELDS} - pub["doi"] = extract_doi(record) + record["doi"] = extract_doi(record) - yield pub + yield record def extract_doi(record): diff --git a/rialto_airflow/utils.py b/rialto_airflow/utils.py index 4a48597..94fe230 100644 --- a/rialto_airflow/utils.py +++ b/rialto_airflow/utils.py @@ -1,4 +1,5 @@ import csv +import json import datetime from pathlib import Path @@ -54,3 +55,12 @@ def invert_dict(dict): inverted_dict[i] = [k for k, v in dict.items() if i in v] return inverted_dict + + +def write_jsonl(filename, records): + """ + Write a list of dictionaries as line-oriented JSON. + """ + with open(filename, 'w') as output: + for record in records: + output.write(json.dumps(record) + "\n") diff --git a/test/harvest/test_dimensions.py b/test/harvest/test_dimensions.py index 9aff083..53731fa 100644 --- a/test/harvest/test_dimensions.py +++ b/test/harvest/test_dimensions.py @@ -43,13 +43,13 @@ def test_publication_fields(): assert "title" in fields -def test_publications_csv(tmpdir): - pubs_csv = tmpdir / "dimensions-pubs.csv" - dimensions.publications_csv( - ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_csv +def test_publications_jsonl(tmpdir): + pubs_jsonl = tmpdir / "dimensions-pubs.jsonl" + dimensions.publications_jsonl( + ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_jsonl ) - df = pandas.read_csv(pubs_csv) + df = pandas.read_json(pubs_jsonl, orient='records', lines=True) assert len(df) == 2 diff --git a/test/harvest/test_doi_set.py b/test/harvest/test_doi_set.py index c3265d4..16b0c74 100644 --- a/test/harvest/test_doi_set.py +++ b/test/harvest/test_doi_set.py @@ -1,9 +1,9 @@ -import csv import pickle import pytest from rialto_airflow.harvest.doi_set import create_doi_set +from rialto_airflow.utils import write_jsonl @pytest.fixture @@ -33,18 +33,20 @@ def openalex_pickle(tmp_path): @pytest.fixture -def sul_pub_csv(tmp_path): - fixture_file = tmp_path / "sul_pub.csv" - with open(fixture_file, "w", newline="") as csvfile: - writer = csv.writer(csvfile) - writer.writerow(["sunetid", "title", "doi"]) - writer.writerow(["author1", "A Publication", "10.0000/aaaa"]) - writer.writerow(["author2", "A Research Article", "10.0000/1234"]) +def sul_pub_jsonl(tmp_path): + fixture_file = tmp_path / "sul_pub.jsonl" + write_jsonl( + fixture_file, + [ + {"sunetid": "author1", "title": "A Publication", "doi": "10.0000/aaaa"}, + {"sunetid": "author2", "title": "A Research Article", "doi": "10.0000/1234"} + ] + ) return fixture_file -def test_doi_set(dimensions_pickle, openalex_pickle, sul_pub_csv): - dois = create_doi_set(dimensions_pickle, openalex_pickle, sul_pub_csv) +def test_doi_set(dimensions_pickle, openalex_pickle, sul_pub_jsonl): + dois = create_doi_set(dimensions_pickle, openalex_pickle, sul_pub_jsonl) assert len(dois) == 4 assert set(dois) == set( ["10.0000/1234", "10.0000/aaaa", "10.0000/cccc", "10.0000/zzzz"] diff --git a/test/harvest/test_merge_pubs.py b/test/harvest/test_merge_pubs.py index b4d62fa..0056f03 100644 --- a/test/harvest/test_merge_pubs.py +++ b/test/harvest/test_merge_pubs.py @@ -1,143 +1,118 @@ -import csv +import json import polars as pl import pytest from rialto_airflow.harvest import merge_pubs - +from rialto_airflow.utils import write_jsonl @pytest.fixture -def dimensions_pubs_csv(tmp_path): - fixture_file = tmp_path / "dimensions-pubs.csv" - with open(fixture_file, "w") as csvfile: - writer = csv.writer(csvfile) - header = [ - "bogus", - "volume", - "authors", - "document_type", - "doi", - "funders", - "funding_section", - "open_access", - "publisher", - "research_orgs", - "researchers", - "title", - "type", - "year", +def dimensions_pubs_jsonl(tmp_path): + fixture_file = tmp_path / "dimensions-pubs.jsonl" + write_jsonl( + fixture_file, + [ + { + "bogus": "a", + "volume": "1", + "authors": [], + "document_type": "ARTICLE", + "doi": "10.0000/aaaa", + "funders": [], + "funding_section": [], + "open_access": "True", + "publisher": "publisher", + "research_orgs": [], + "researchers": [], + "title": "A Publication", + "type": "article", + "year": "2024", + }, + { + "bogus": "b", + "volume": "2", + "authors": [], + "document_type": "ARTICLE", + "doi": "10.0000/1234", + "funders": [], + "funding_section": [], + "open_access": "True", + "publisher": "publisher", + "research_orgs": [], + "researchers": [], + "title": "A Research Article", + "type": "article", + "year": "2024", + } ] - writer.writerow(header) - writer.writerow( - [ - "a", - "1", - "[]", - "ARTICLE", - "10.0000/aaaa", - "[]", - "[]", - "True", - "publisher", - "[]", - "[]", - "A Publication", - "article", - "2024", - ] - ) - writer.writerow( - [ - "b", - "2", - "[]", - "ARTICLE", - "10.0000/1234", - "[]", - "[]", - "True", - "publisher", - "[]", - "[]", - "A Research Article", - "article", - "2024", - ] - ) + ) + return fixture_file @pytest.fixture -def openalex_pubs_csv(tmp_path): - fixture_file = tmp_path / "openalex-pubs.csv" - with open(fixture_file, "w") as csvfile: - writer = csv.writer(csvfile) - header = [ - "bogus", - "apc_paid", - "authorships", - "grants", - "publication_year", - "title", - "type", - "doi", +def openalex_pubs_jsonl(tmp_path): + fixture_file = tmp_path / "openalex-pubs.jsonl" + write_jsonl( + fixture_file, + [ + { + "bogus": "blah", + "apc_paid": 10, + "authorships": [], + "grants": [], + "publication_year": "2024", + "title": "A Publication", + "type": "article", + "doi": "https://doi.org/10.0000/cccc" + }, + { + "bogus": "blah", + "apc_paid": 0, + "authorships": [], + "grants": [], + "publication_year": "2024", + "title": "A Research Article", + "type": "article", + "doi": "https://doi.org/10.0000/1234", + } ] - writer.writerow(header) - writer.writerow( - [ - "blah", - 10, - "[]", - "[]", - "2024", - "A Publication", - "article", - "https://doi.org/10.0000/cccc", - ] - ) - writer.writerow( - [ - "blah", - 0, - "[]", - "[]", - "2024", - "A Research Article", - "article", - "https://doi.org/10.0000/1234", - ] - ) + ) + return fixture_file @pytest.fixture -def sul_pubs_csv(tmp_path): - fixture_file = tmp_path / "sulpub.csv" - with open(fixture_file, "w") as csvfile: - writer = csv.writer(csvfile) - header = ["authorship", "title", "year", "doi"] - writer.writerow(header) - writer.writerow(["[]", "A Publication", "2024", "10.0000/cccc"]) - writer.writerow( - [ - "[]", - "A Research Article", - "2024", - ] - ) - writer.writerow( - [ - "[]", - "A Published Research Article", - "2024", - "https://doi.org/10.0000/dddd", - ] - ) +def sul_pubs_jsonl(tmp_path): + fixture_file = tmp_path / "sulpub.jsonl" + write_jsonl( + fixture_file, + [ + { + "authorship": [], + "title": "A Publication", + "year": "2024", + "doi": "10.0000/cccc" + }, + { + "authorship": [], + "title": "A Research Article", + "year": "2024" + }, + { + "authorship": [], + "title": "A Published Research Article", + "year": "2024", + "doi": "https://doi.org/10.0000/dddd", + } + ] + ) + return fixture_file -def test_dimensions_pubs_df(dimensions_pubs_csv): - lazy_df = merge_pubs.dimensions_pubs_df(dimensions_pubs_csv) +def test_dimensions_pubs_df(dimensions_pubs_jsonl): + lazy_df = merge_pubs.dimensions_pubs_df(dimensions_pubs_jsonl) assert type(lazy_df) == pl.lazyframe.frame.LazyFrame df = lazy_df.collect() assert df.shape[0] == 2 @@ -145,8 +120,8 @@ def test_dimensions_pubs_df(dimensions_pubs_csv): assert df["dim_doi"].to_list() == ["10.0000/aaaa", "10.0000/1234"] -def test_openalex_pubs_df(openalex_pubs_csv): - lazy_df = merge_pubs.openalex_pubs_df(openalex_pubs_csv) +def test_openalex_pubs_df(openalex_pubs_jsonl): + lazy_df = merge_pubs.openalex_pubs_df(openalex_pubs_jsonl) assert type(lazy_df) == pl.lazyframe.frame.LazyFrame df = lazy_df.collect() assert df.shape[0] == 2 @@ -154,8 +129,8 @@ def test_openalex_pubs_df(openalex_pubs_csv): assert df["openalex_doi"].to_list() == ["10.0000/cccc", "10.0000/1234"] -def test_sulpub_df(sul_pubs_csv): - lazy_df = merge_pubs.sulpub_df(sul_pubs_csv) +def test_sulpub_df(sul_pubs_jsonl): + lazy_df = merge_pubs.sulpub_df(sul_pubs_jsonl) assert type(lazy_df) == pl.lazyframe.frame.LazyFrame df = lazy_df.collect() assert df.shape[0] == 2, "Row without a doi has been dropped" @@ -168,9 +143,9 @@ def test_sulpub_df(sul_pubs_csv): assert df["sul_pub_doi"].to_list() == ["10.0000/cccc", "10.0000/dddd"] -def test_merge(tmp_path, sul_pubs_csv, openalex_pubs_csv, dimensions_pubs_csv): +def test_merge(tmp_path, sul_pubs_jsonl, openalex_pubs_jsonl, dimensions_pubs_jsonl): output = tmp_path / "merged_pubs.parquet" - merge_pubs.merge(sul_pubs_csv, openalex_pubs_csv, dimensions_pubs_csv, output) + merge_pubs.merge(sul_pubs_jsonl, openalex_pubs_jsonl, dimensions_pubs_jsonl, output) assert output.is_file(), "output file has been created" df = pl.read_parquet(output) assert df.shape[0] == 4 diff --git a/test/harvest/test_openalex.py b/test/harvest/test_openalex.py index fdd054c..1bea165 100644 --- a/test/harvest/test_openalex.py +++ b/test/harvest/test_openalex.py @@ -55,13 +55,13 @@ def test_publications_from_dois(): assert len(pubs[1].keys()) == 51, "second publication has 51 columns" -def test_publications_csv(tmp_path): - pubs_csv = tmp_path / "openalex-pubs.csv" - openalex.publications_csv( - ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_csv +def test_publications_jsonl(tmp_path): + pubs_jsonl = tmp_path / "openalex-pubs.jsonl" + openalex.publications_jsonl( + ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_jsonl ) - df = pandas.read_csv(pubs_csv) + df = pandas.read_json(pubs_jsonl, orient="records", lines=True) assert len(df) == 2 diff --git a/test/harvest/test_sul_pub.py b/test/harvest/test_sul_pub.py index 7fbad27..367d776 100644 --- a/test/harvest/test_sul_pub.py +++ b/test/harvest/test_sul_pub.py @@ -4,7 +4,7 @@ import pandas import pytest -from rialto_airflow.harvest.sul_pub import sul_pub_csv +from rialto_airflow.harvest import sul_pub dotenv.load_dotenv() @@ -15,12 +15,12 @@ @pytest.mark.skipif(no_auth, reason="no sul_pub key") -def test_sul_pub_csv(tmpdir): - csv_file = tmpdir / "sul_pub.csv" - sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, limit=2000) - assert csv_file.isfile() +def test_publications_jsonl(tmpdir): + jsonl_file = tmpdir / "sul_pub.jsonl" + sul_pub.publications_jsonl(jsonl_file, sul_pub_host, sul_pub_key, limit=2000) + assert jsonl_file.isfile() - df = pandas.read_csv(csv_file) + df = pandas.read_json(jsonl_file, orient='records', lines=True) assert len(df) == 2000 assert "title" in df.columns