Skip to content

Commit

Permalink
Add new tasks create_doi_sunet and contribs
Browse files Browse the repository at this point in the history
Once the initial DOI collection process is complete we know the
population of DOIs we are working with in the dataset. We are also able
to join the data to the authors.csv using either the `orcidid` (for
Dimensions and OpenAlex) or `cap_profile_id` (for sul_pub).

The doi_sunet task will create a mapping of `doi -> [sunetid]` using the
pickle files, sul_pub harvest and the authors.csv.

Once the publications datasets are merged the `doi_sunet` mapping is
used to add the `sunetid` column, and then join with the `authors.csv`.
  • Loading branch information
edsu committed Jul 1, 2024
1 parent a6d32db commit 168600f
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 105 deletions.
50 changes: 28 additions & 22 deletions rialto_airflow/dags/harvest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import datetime
import pickle
from pathlib import Path

from airflow.models import Variable
from airflow.decorators import dag, task
from airflow.models import Variable

from rialto_airflow.utils import create_snapshot_dir, rialto_authors_file
from rialto_airflow.harvest import dimensions, openalex, merge_pubs
from rialto_airflow.harvest import dimensions, merge_pubs, openalex
from rialto_airflow.harvest.doi_sunet import create_doi_sunet_pickle
from rialto_airflow.harvest.sul_pub import sul_pub_csv
from rialto_airflow.harvest.doi_set import create_doi_set

from rialto_airflow.utils import create_snapshot_dir, rialto_authors_file

data_dir = Variable.get("data_dir")
sul_pub_host = Variable.get("sul_pub_host")
Expand Down Expand Up @@ -70,12 +70,22 @@ def sul_pub_harvest(snapshot_dir):
return str(csv_file)

@task()
def doi_set(dimensions, openalex, sul_pub):
def create_doi_sunet(dimensions, openalex, sul_pub, authors, snapshot_dir):
"""
Extract a mapping of DOI -> [SUNET] from the dimensions doi-orcid dict,
openalex doi-orcid dict, SUL-Pub publications, and authors data.
"""
pickle_file = Path(snapshot_dir) / "doi-sunet.pickle"
create_doi_sunet_pickle(dimensions, openalex, sul_pub, authors, pickle_file)

return str(pickle_file)

@task()
def doi_set(doi_sunet_pickle):
"""
Extract a unique list of DOIs from the dimensions doi-orcid dict,
the openalex doi-orcid dict, and the SUL-Pub publications.
Use the DOI -> [SUNET] pickle to return a list of all DOIs.
"""
return create_doi_set(dimensions, openalex, sul_pub)
return list(pickle.load(open(doi_sunet_pickle, "rb")).keys())

@task()
def dimensions_harvest_pubs(dois, snapshot_dir):
Expand Down Expand Up @@ -105,18 +115,12 @@ def merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir):
return str(output)

@task()
def join_authors(pubs, authors_csv):
"""
Add the Stanford organizational data to the publications.
"""
return True

@task()
def pubs_to_contribs(pubs):
def pubs_to_contribs(pubs, doi_sunet_pickle):
"""
Get contributions from publications.
"""
return True
output = Path(snapshot_dir) / "contributions.parquet"
return str(output)

@task()
def publish(dataset):
Expand All @@ -135,17 +139,19 @@ def publish(dataset):

openalex_dois = openalex_harvest_dois(authors_csv, snapshot_dir)

dois = doi_set(dimensions_dois, openalex_dois, sul_pub)
doi_sunet = create_doi_sunet(
dimensions_dois, openalex_dois, sul_pub, authors_csv, snapshot_dir
)

dois = doi_set(doi_sunet)

dimensions_pubs = dimensions_harvest_pubs(dois, snapshot_dir)

openalex_pubs = openalex_harvest_pubs(dois, snapshot_dir)

pubs = merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir)

pubs_authors = join_authors(pubs, authors_csv)

contribs = pubs_to_contribs(pubs_authors)
contribs = pubs_to_contribs(pubs, doi_sunet)

publish(contribs)

Expand Down
20 changes: 20 additions & 0 deletions rialto_airflow/harvest/contribs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pickle
import polars as pl


def make_contribs(pubs_parquet, doi_sunet_pickle, authors, contribs_path):
pubs = pl.read_parquet(pubs_parquet)
authors = pl.read_csv(authors)

doi_sunet = pickle.load(open(doi_sunet_pickle, "rb"))
doi_sunet = pl.DataFrame({"doi": doi_sunet.keys(), "sunetid": doi_sunet.values()})

pubs = pubs.join(doi_sunet, on="doi")

contribs = pubs.explode("sunetid")

contribs = contribs.join(authors, on="sunetid")

contribs.write_parquet(contribs_path)

return contribs_path
32 changes: 0 additions & 32 deletions rialto_airflow/harvest/doi_set.py

This file was deleted.

108 changes: 108 additions & 0 deletions rialto_airflow/harvest/doi_sunet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import logging
import pickle
from collections import defaultdict

import pandas as pd


def create_doi_sunet_pickle(
dimensions: str, openalex: str, sul_pub_csv: str, authors_csv: str, output_path
) -> dict:
"""
Get DOIs from each source and determine their SUNETID(s) using the authors
csv file. Write the resulting mapping as a pickle to the output_path.
"""
# use the authors csv to generate two dictionaries for looking up the sunet
# based on an orcid or a cap_profile
orcid_sunet, cap_profile_sunet = get_author_maps(authors_csv)

# dimensions and openalex pickle files map doi -> [orcid] and use the
# orcid_sunet mapping to turn that into doi -> [sunet]
dimensions_map = doi_sunetids(dimensions, orcid_sunet)
openalex_map = doi_sunetids(openalex, orcid_sunet)

# sulpub csv has doi and authorship columns the latter of which contains the cap_profile_id so
# the cap_profile_sunet mapping can be used to return a mapping of doi -> [sunet]
sulpub_map = sulpub_doi_sunetids(sul_pub_csv, cap_profile_sunet)

doi_sunet = combine_maps(dimensions_map, openalex_map, sulpub_map)

with open(output_path, "wb") as handle:
pickle.dump(doi_sunet, handle, protocol=pickle.HIGHEST_PROTOCOL)

logging.info(f"Found {len(doi_sunet)} DOIs")


def doi_sunetids(pickle_file: str, orcid_sunet: dict) -> dict:
"""
Convert a mapping of doi -> [orcid] to a mapping of doi -> [sunet].
"""
doi_orcids = pickle.load(open(pickle_file, "rb"))

mapping = {}
for doi, orcids in doi_orcids.items():
mapping[doi] = [orcid_sunet[orcid] for orcid in orcids]

return mapping


def sulpub_doi_sunetids(sul_pub_csv, cap_profile_sunet):
# create a dataframe for sul_pubs which has a column for cap_profile_id
# extracted from the authorship column
df = pd.read_csv(sul_pub_csv, usecols=["doi", "authorship"])
df = df[df["doi"].notna()]

def extract_cap_ids(authors):
return [a["cap_profile_id"] for a in eval(authors)]

df["cap_profile_id"] = df["authorship"].apply(extract_cap_ids)

df = df.explode("cap_profile_id")

# create a column for sunet using the cap_profile_sunet dictionary
df["sunet"] = df["cap_profile_id"].apply(lambda cap_id: cap_profile_sunet[cap_id])

return df.groupby("doi")["sunet"].apply(list).to_dict()


def get_author_maps(authors):
"""
Reads the authors csv and returns two dictionary mappings: orcid -> sunet,
cap_profile_id -> sunet.
"""
df = pd.read_csv(authors, usecols=["sunetid", "orcidid", "cap_profile_id"])
df["orcidid"] = df["orcidid"].apply(orcid_id)

# orcid -> sunet
orcid = pd.Series(df["sunetid"].values, index=df["orcidid"]).to_dict()

# cap_profile_id -> sunet
cap_profile_id = pd.Series(
df["sunetid"].values, index=df["cap_profile_id"]
).to_dict()

return orcid, cap_profile_id


def combine_maps(m1, m2, m3):
m = defaultdict(set)

# fold values from dictionary d2 into dictionary d1
def combine(d1, d2):
for doi, sunets in d2.items():
for sunet in sunets:
d1[doi].add(sunet)

combine(m, m1)
combine(m, m2)
combine(m, m3)

# return the mapping with the sets turned into lists
return {k: list(v) for k, v in m.items()}


def orcid_id(orcid):
if pd.isna(orcid):
return None
else:
return orcid.replace("https://orcid.org/", "")
76 changes: 76 additions & 0 deletions test/harvest/test_contribs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import pickle
import pytest
import polars as pl

from rialto_airflow.harvest.contribs import make_contribs


@pytest.fixture
def pubs_parquet(tmp_path):
fixture_path = tmp_path / "pubs.parquet"
df = pl.DataFrame(
{
"doi": [
"0000/abc",
"0000/123",
"0000/999",
],
"title": ["Exquisite article", "Fantabulous research", "Perfect prose"],
}
)
df.write_parquet(fixture_path)
return str(fixture_path)


@pytest.fixture
def doi_sunet(tmp_path):
fixture_path = tmp_path / "doi-sunet.pickle"
m = {"0000/abc": ["user1"], "0000/123": ["user2"], "0000/999": ["user1", "user2"]}
pickle.dump(m, open(str(fixture_path), "wb"))
return str(fixture_path)


@pytest.fixture
def authors(tmp_path):
fixture_path = tmp_path / "users.csv"
df = pl.DataFrame(
{"sunetid": ["user1", "user2"], "first_name": ["Mary", "Frederico"]}
)
df.write_csv(fixture_path)
return str(fixture_path)


def test_make_contribs(pubs_parquet, doi_sunet, authors, tmp_path):
contribs_parquet = tmp_path / "contribs.parquet"
make_contribs(pubs_parquet, doi_sunet, authors, contribs_parquet)

df = pl.read_parquet(contribs_parquet)
assert set(df.columns) == set(
["doi", "sunetid", "title", "first_name"]
), "columns are correct"

# first publication got joined to authors
assert len(df.filter(pl.col("doi") == "0000/abc")) == 1
row = df.filter(pl.col("doi") == "0000/abc").row(0, named=True)
assert row["sunetid"] == "user1"
assert row["first_name"] == "Mary"
assert row["title"] == "Exquisite article"

# second publication got joined to authors
assert len(df.filter(pl.col("doi") == "0000/123")) == 1
row = df.filter(pl.col("doi") == "0000/123").row(0, named=True)
assert row["sunetid"] == "user2"
assert row["first_name"] == "Frederico"
assert row["title"] == "Fantabulous research"

# third publication was broken out into two rows since the doi_sunet pickle
# file indicates it was authored by two people.
rows = df.filter(pl.col("doi") == "0000/999").sort("sunetid")
assert len(rows) == 2
assert rows["sunetid"][0] == "user1"
assert rows["first_name"][0] == "Mary"
assert rows["title"][0] == "Perfect prose"

assert rows["sunetid"][1] == "user2"
assert rows["first_name"][1] == "Frederico"
assert rows["title"][1] == "Perfect prose"
51 changes: 0 additions & 51 deletions test/harvest/test_doi_set.py

This file was deleted.

Loading

0 comments on commit 168600f

Please sign in to comment.