From 168600ff8c8a216bbf556b8afeceb74b7827926b Mon Sep 17 00:00:00 2001 From: Ed Summers Date: Mon, 1 Jul 2024 05:42:58 -0500 Subject: [PATCH] Add new tasks create_doi_sunet and contribs Once the initial DOI collection process is complete we know the population of DOIs we are working with in the dataset. We are also able to join the data to the authors.csv using either the `orcidid` (for Dimensions and OpenAlex) or `cap_profile_id` (for sul_pub). The doi_sunet task will create a mapping of `doi -> [sunetid]` using the pickle files, sul_pub harvest and the authors.csv. Once the publications datasets are merged the `doi_sunet` mapping is used to add the `sunetid` column, and then join with the `authors.csv`. --- rialto_airflow/dags/harvest.py | 50 +++++++------ rialto_airflow/harvest/contribs.py | 20 ++++++ rialto_airflow/harvest/doi_set.py | 32 --------- rialto_airflow/harvest/doi_sunet.py | 108 ++++++++++++++++++++++++++++ test/harvest/test_contribs.py | 76 ++++++++++++++++++++ test/harvest/test_doi_set.py | 51 ------------- test/harvest/test_doi_sunet.py | 107 +++++++++++++++++++++++++++ 7 files changed, 339 insertions(+), 105 deletions(-) create mode 100644 rialto_airflow/harvest/contribs.py delete mode 100644 rialto_airflow/harvest/doi_set.py create mode 100644 rialto_airflow/harvest/doi_sunet.py create mode 100644 test/harvest/test_contribs.py delete mode 100644 test/harvest/test_doi_set.py create mode 100644 test/harvest/test_doi_sunet.py diff --git a/rialto_airflow/dags/harvest.py b/rialto_airflow/dags/harvest.py index 94db713..aa4dd31 100644 --- a/rialto_airflow/dags/harvest.py +++ b/rialto_airflow/dags/harvest.py @@ -1,14 +1,14 @@ import datetime +import pickle from pathlib import Path -from airflow.models import Variable from airflow.decorators import dag, task +from airflow.models import Variable -from rialto_airflow.utils import create_snapshot_dir, rialto_authors_file -from rialto_airflow.harvest import dimensions, openalex, merge_pubs +from rialto_airflow.harvest import dimensions, merge_pubs, openalex +from rialto_airflow.harvest.doi_sunet import create_doi_sunet_pickle from rialto_airflow.harvest.sul_pub import sul_pub_csv -from rialto_airflow.harvest.doi_set import create_doi_set - +from rialto_airflow.utils import create_snapshot_dir, rialto_authors_file data_dir = Variable.get("data_dir") sul_pub_host = Variable.get("sul_pub_host") @@ -70,12 +70,22 @@ def sul_pub_harvest(snapshot_dir): return str(csv_file) @task() - def doi_set(dimensions, openalex, sul_pub): + def create_doi_sunet(dimensions, openalex, sul_pub, authors, snapshot_dir): + """ + Extract a mapping of DOI -> [SUNET] from the dimensions doi-orcid dict, + openalex doi-orcid dict, SUL-Pub publications, and authors data. + """ + pickle_file = Path(snapshot_dir) / "doi-sunet.pickle" + create_doi_sunet_pickle(dimensions, openalex, sul_pub, authors, pickle_file) + + return str(pickle_file) + + @task() + def doi_set(doi_sunet_pickle): """ - Extract a unique list of DOIs from the dimensions doi-orcid dict, - the openalex doi-orcid dict, and the SUL-Pub publications. + Use the DOI -> [SUNET] pickle to return a list of all DOIs. """ - return create_doi_set(dimensions, openalex, sul_pub) + return list(pickle.load(open(doi_sunet_pickle, "rb")).keys()) @task() def dimensions_harvest_pubs(dois, snapshot_dir): @@ -105,18 +115,12 @@ def merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir): return str(output) @task() - def join_authors(pubs, authors_csv): - """ - Add the Stanford organizational data to the publications. - """ - return True - - @task() - def pubs_to_contribs(pubs): + def pubs_to_contribs(pubs, doi_sunet_pickle): """ Get contributions from publications. """ - return True + output = Path(snapshot_dir) / "contributions.parquet" + return str(output) @task() def publish(dataset): @@ -135,7 +139,11 @@ def publish(dataset): openalex_dois = openalex_harvest_dois(authors_csv, snapshot_dir) - dois = doi_set(dimensions_dois, openalex_dois, sul_pub) + doi_sunet = create_doi_sunet( + dimensions_dois, openalex_dois, sul_pub, authors_csv, snapshot_dir + ) + + dois = doi_set(doi_sunet) dimensions_pubs = dimensions_harvest_pubs(dois, snapshot_dir) @@ -143,9 +151,7 @@ def publish(dataset): pubs = merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir) - pubs_authors = join_authors(pubs, authors_csv) - - contribs = pubs_to_contribs(pubs_authors) + contribs = pubs_to_contribs(pubs, doi_sunet) publish(contribs) diff --git a/rialto_airflow/harvest/contribs.py b/rialto_airflow/harvest/contribs.py new file mode 100644 index 0000000..ff3ec49 --- /dev/null +++ b/rialto_airflow/harvest/contribs.py @@ -0,0 +1,20 @@ +import pickle +import polars as pl + + +def make_contribs(pubs_parquet, doi_sunet_pickle, authors, contribs_path): + pubs = pl.read_parquet(pubs_parquet) + authors = pl.read_csv(authors) + + doi_sunet = pickle.load(open(doi_sunet_pickle, "rb")) + doi_sunet = pl.DataFrame({"doi": doi_sunet.keys(), "sunetid": doi_sunet.values()}) + + pubs = pubs.join(doi_sunet, on="doi") + + contribs = pubs.explode("sunetid") + + contribs = contribs.join(authors, on="sunetid") + + contribs.write_parquet(contribs_path) + + return contribs_path diff --git a/rialto_airflow/harvest/doi_set.py b/rialto_airflow/harvest/doi_set.py deleted file mode 100644 index 6bfc199..0000000 --- a/rialto_airflow/harvest/doi_set.py +++ /dev/null @@ -1,32 +0,0 @@ -import csv -import logging -import pickle - - -def create_doi_set(dimensions: str, openalex: str, sul_pub_csv: str) -> list: - """Get DOIs from each source and dedupe.""" - dimensions_dois = dois_from_pickle(dimensions) - openalex_dois = dois_from_pickle(openalex) - sul_pub_dois = get_sul_pub_dois(sul_pub_csv) - unique_dois = list(set(dimensions_dois + openalex_dois + sul_pub_dois)) - logging.info(f"found {len(unique_dois)}") - - return unique_dois - - -def dois_from_pickle(pickle_file: str) -> list: - """Load a pickled dictionary of DOIs and ORCIDs from file.""" - with open(pickle_file, "rb") as handle: - data = pickle.load(handle) - - dois = list(data.keys()) - return dois - - -def get_sul_pub_dois(sul_pub_csv: str) -> list: - """Extract DOIs from sul_pub CSV and remove empty values.""" - with open(sul_pub_csv, "r") as file: - reader = csv.DictReader(file) - doi_column = [row["doi"] for row in reader if row["doi"]] - - return doi_column diff --git a/rialto_airflow/harvest/doi_sunet.py b/rialto_airflow/harvest/doi_sunet.py new file mode 100644 index 0000000..081d7ac --- /dev/null +++ b/rialto_airflow/harvest/doi_sunet.py @@ -0,0 +1,108 @@ +import logging +import pickle +from collections import defaultdict + +import pandas as pd + + +def create_doi_sunet_pickle( + dimensions: str, openalex: str, sul_pub_csv: str, authors_csv: str, output_path +) -> dict: + """ + Get DOIs from each source and determine their SUNETID(s) using the authors + csv file. Write the resulting mapping as a pickle to the output_path. + """ + # use the authors csv to generate two dictionaries for looking up the sunet + # based on an orcid or a cap_profile + orcid_sunet, cap_profile_sunet = get_author_maps(authors_csv) + + # dimensions and openalex pickle files map doi -> [orcid] and use the + # orcid_sunet mapping to turn that into doi -> [sunet] + dimensions_map = doi_sunetids(dimensions, orcid_sunet) + openalex_map = doi_sunetids(openalex, orcid_sunet) + + # sulpub csv has doi and authorship columns the latter of which contains the cap_profile_id so + # the cap_profile_sunet mapping can be used to return a mapping of doi -> [sunet] + sulpub_map = sulpub_doi_sunetids(sul_pub_csv, cap_profile_sunet) + + doi_sunet = combine_maps(dimensions_map, openalex_map, sulpub_map) + + with open(output_path, "wb") as handle: + pickle.dump(doi_sunet, handle, protocol=pickle.HIGHEST_PROTOCOL) + + logging.info(f"Found {len(doi_sunet)} DOIs") + + +def doi_sunetids(pickle_file: str, orcid_sunet: dict) -> dict: + """ + Convert a mapping of doi -> [orcid] to a mapping of doi -> [sunet]. + """ + doi_orcids = pickle.load(open(pickle_file, "rb")) + + mapping = {} + for doi, orcids in doi_orcids.items(): + mapping[doi] = [orcid_sunet[orcid] for orcid in orcids] + + return mapping + + +def sulpub_doi_sunetids(sul_pub_csv, cap_profile_sunet): + # create a dataframe for sul_pubs which has a column for cap_profile_id + # extracted from the authorship column + df = pd.read_csv(sul_pub_csv, usecols=["doi", "authorship"]) + df = df[df["doi"].notna()] + + def extract_cap_ids(authors): + return [a["cap_profile_id"] for a in eval(authors)] + + df["cap_profile_id"] = df["authorship"].apply(extract_cap_ids) + + df = df.explode("cap_profile_id") + + # create a column for sunet using the cap_profile_sunet dictionary + df["sunet"] = df["cap_profile_id"].apply(lambda cap_id: cap_profile_sunet[cap_id]) + + return df.groupby("doi")["sunet"].apply(list).to_dict() + + +def get_author_maps(authors): + """ + Reads the authors csv and returns two dictionary mappings: orcid -> sunet, + cap_profile_id -> sunet. + """ + df = pd.read_csv(authors, usecols=["sunetid", "orcidid", "cap_profile_id"]) + df["orcidid"] = df["orcidid"].apply(orcid_id) + + # orcid -> sunet + orcid = pd.Series(df["sunetid"].values, index=df["orcidid"]).to_dict() + + # cap_profile_id -> sunet + cap_profile_id = pd.Series( + df["sunetid"].values, index=df["cap_profile_id"] + ).to_dict() + + return orcid, cap_profile_id + + +def combine_maps(m1, m2, m3): + m = defaultdict(set) + + # fold values from dictionary d2 into dictionary d1 + def combine(d1, d2): + for doi, sunets in d2.items(): + for sunet in sunets: + d1[doi].add(sunet) + + combine(m, m1) + combine(m, m2) + combine(m, m3) + + # return the mapping with the sets turned into lists + return {k: list(v) for k, v in m.items()} + + +def orcid_id(orcid): + if pd.isna(orcid): + return None + else: + return orcid.replace("https://orcid.org/", "") diff --git a/test/harvest/test_contribs.py b/test/harvest/test_contribs.py new file mode 100644 index 0000000..2c184a5 --- /dev/null +++ b/test/harvest/test_contribs.py @@ -0,0 +1,76 @@ +import pickle +import pytest +import polars as pl + +from rialto_airflow.harvest.contribs import make_contribs + + +@pytest.fixture +def pubs_parquet(tmp_path): + fixture_path = tmp_path / "pubs.parquet" + df = pl.DataFrame( + { + "doi": [ + "0000/abc", + "0000/123", + "0000/999", + ], + "title": ["Exquisite article", "Fantabulous research", "Perfect prose"], + } + ) + df.write_parquet(fixture_path) + return str(fixture_path) + + +@pytest.fixture +def doi_sunet(tmp_path): + fixture_path = tmp_path / "doi-sunet.pickle" + m = {"0000/abc": ["user1"], "0000/123": ["user2"], "0000/999": ["user1", "user2"]} + pickle.dump(m, open(str(fixture_path), "wb")) + return str(fixture_path) + + +@pytest.fixture +def authors(tmp_path): + fixture_path = tmp_path / "users.csv" + df = pl.DataFrame( + {"sunetid": ["user1", "user2"], "first_name": ["Mary", "Frederico"]} + ) + df.write_csv(fixture_path) + return str(fixture_path) + + +def test_make_contribs(pubs_parquet, doi_sunet, authors, tmp_path): + contribs_parquet = tmp_path / "contribs.parquet" + make_contribs(pubs_parquet, doi_sunet, authors, contribs_parquet) + + df = pl.read_parquet(contribs_parquet) + assert set(df.columns) == set( + ["doi", "sunetid", "title", "first_name"] + ), "columns are correct" + + # first publication got joined to authors + assert len(df.filter(pl.col("doi") == "0000/abc")) == 1 + row = df.filter(pl.col("doi") == "0000/abc").row(0, named=True) + assert row["sunetid"] == "user1" + assert row["first_name"] == "Mary" + assert row["title"] == "Exquisite article" + + # second publication got joined to authors + assert len(df.filter(pl.col("doi") == "0000/123")) == 1 + row = df.filter(pl.col("doi") == "0000/123").row(0, named=True) + assert row["sunetid"] == "user2" + assert row["first_name"] == "Frederico" + assert row["title"] == "Fantabulous research" + + # third publication was broken out into two rows since the doi_sunet pickle + # file indicates it was authored by two people. + rows = df.filter(pl.col("doi") == "0000/999").sort("sunetid") + assert len(rows) == 2 + assert rows["sunetid"][0] == "user1" + assert rows["first_name"][0] == "Mary" + assert rows["title"][0] == "Perfect prose" + + assert rows["sunetid"][1] == "user2" + assert rows["first_name"][1] == "Frederico" + assert rows["title"][1] == "Perfect prose" diff --git a/test/harvest/test_doi_set.py b/test/harvest/test_doi_set.py deleted file mode 100644 index c3265d4..0000000 --- a/test/harvest/test_doi_set.py +++ /dev/null @@ -1,51 +0,0 @@ -import csv -import pickle - -import pytest - -from rialto_airflow.harvest.doi_set import create_doi_set - - -@pytest.fixture -def dimensions_pickle(tmp_path): - data = { - "10.0000/1234": ["https://orcid.org/0000-0000-0000-0001"], - "10.0000/cccc": ["https://orcid.org/0000-0000-0000-0002"], - } - pickle_file = tmp_path / "dimensions.pickle" - with open(pickle_file, "wb") as handle: - pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL) - - return pickle_file - - -@pytest.fixture -def openalex_pickle(tmp_path): - data = { - "10.0000/cccc": ["https://orcid.org/0000-0000-0000-0001"], - "10.0000/zzzz": ["https://orcid.org/0000-0000-0000-0002"], - } - pickle_file = tmp_path / "openalex.pickle" - with open(pickle_file, "wb") as handle: - pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL) - - return pickle_file - - -@pytest.fixture -def sul_pub_csv(tmp_path): - fixture_file = tmp_path / "sul_pub.csv" - with open(fixture_file, "w", newline="") as csvfile: - writer = csv.writer(csvfile) - writer.writerow(["sunetid", "title", "doi"]) - writer.writerow(["author1", "A Publication", "10.0000/aaaa"]) - writer.writerow(["author2", "A Research Article", "10.0000/1234"]) - return fixture_file - - -def test_doi_set(dimensions_pickle, openalex_pickle, sul_pub_csv): - dois = create_doi_set(dimensions_pickle, openalex_pickle, sul_pub_csv) - assert len(dois) == 4 - assert set(dois) == set( - ["10.0000/1234", "10.0000/aaaa", "10.0000/cccc", "10.0000/zzzz"] - ) diff --git a/test/harvest/test_doi_sunet.py b/test/harvest/test_doi_sunet.py new file mode 100644 index 0000000..479a006 --- /dev/null +++ b/test/harvest/test_doi_sunet.py @@ -0,0 +1,107 @@ +import csv +import pickle + +import pytest + +from rialto_airflow.harvest.doi_sunet import create_doi_sunet_pickle, combine_maps + +# +# create fixture data for dimensions, openalex, sulpub and authors +# such that there are four publications that are authored by two authors +# +# 10.0000/1234: +# - sunet1 (via dimensions) +# - sunet2 (via sul_pub) +# +# 10.0000/aaaa: +# - sunet1 (via dimensions) +# +# 10.0000/cccc: +# - sunet1 (via openalex) +# - sunet2 (via dimensions) +# +# 10.0000/zzzz: +# - sunet2 (via openalex) +# + + +@pytest.fixture +def dimensions_pickle(tmp_path): + data = { + "10.0000/1234": ["0000-0000-0000-0001"], + "10.0000/cccc": ["0000-0000-0000-0002"], + } + pickle_file = tmp_path / "dimensions.pickle" + with open(pickle_file, "wb") as handle: + pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL) + + return pickle_file + + +@pytest.fixture +def openalex_pickle(tmp_path): + data = { + "10.0000/cccc": ["0000-0000-0000-0001"], + "10.0000/zzzz": ["0000-0000-0000-0002"], + } + pickle_file = tmp_path / "openalex.pickle" + with open(pickle_file, "wb") as handle: + pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL) + + return pickle_file + + +@pytest.fixture +def sul_pub_csv(tmp_path): + fixture_file = tmp_path / "sul_pub.csv" + with open(fixture_file, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerow(["authorship", "title", "doi"]) + writer.writerow([authorship("cap-01"), "A Publication", "10.0000/aaaa"]) + writer.writerow([authorship("cap-02"), "A Research Article", "10.0000/1234"]) + return fixture_file + + +@pytest.fixture +def authors_csv(tmp_path): + fixture_file = tmp_path / "authors.csv" + with open(fixture_file, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerow(["sunetid", "orcidid", "cap_profile_id"]) + writer.writerow(["sunet1", "https://orcid.org/0000-0000-0000-0001", "cap-01"]) + writer.writerow(["sunet2", "https://orcid.org/0000-0000-0000-0002", "cap-02"]) + return fixture_file + + +def authorship(cap_id): + return str([{"cap_profile_id": cap_id}]) + + +def test_doi_sunet( + dimensions_pickle, openalex_pickle, sul_pub_csv, authors_csv, tmp_path +): + output_file = tmp_path / "doi_sunet.pickle" + + create_doi_sunet_pickle( + dimensions_pickle, openalex_pickle, sul_pub_csv, authors_csv, output_file + ) + + doi_sunet = pickle.load(open(output_file, "rb")) + + assert len(doi_sunet) == 4 + assert set(doi_sunet["10.0000/1234"]) == set(["sunet1", "sunet2"]) + assert doi_sunet["10.0000/aaaa"] == ["sunet1"] + assert set(doi_sunet["10.0000/cccc"]) == set(["sunet1", "sunet2"]) + assert doi_sunet["10.0000/zzzz"] == ["sunet2"] + + +def test_combine_maps(): + m1 = {"a": [1], "b": [2]} + m2 = {"a": [1, 2], "b": [3]} + m3 = {"c": [4]} + + m4 = combine_maps(m1, m2, m3) + assert len(m4) == 3 + assert set(m4["a"]) == set([1, 2]) + assert set(m4["b"]) == set([2, 3]) + assert set(m4["c"]) == set([4])