Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobthill committed Jun 20, 2024
1 parent e614313 commit c834a67
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions rialto_airflow/dags/harvest.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import datetime
import dimcli
import pathlib

from airflow.models import Variable
from airflow.decorators import dag, task

from rialto_airflow.utils import create_snapshot_dir
from rialto_airflow.harvest.sul_pub import sul_pub_csv
from rialto_airflow.harvest.dimensions import dimensions_doi_orcids_dict

data_dir = Variable.get("data_dir")
sul_pub_host = Variable.get("sul_pub_host")
sul_pub_key = Variable.get("sul_pub_key")

# to artificially limit the API activity in development
dev_limit = Variable.get("dev_limit")
if dev_limit is not None:
dev_limit = int(dev_limit)
# dev_limit = Variable.get("dev_limit")
# if dev_limit is not None:
# dev_limit = int(dev_limit)


@dag(
Expand All @@ -32,16 +34,20 @@ def setup():
return snapshot_dir

@task()
def dimensions_harvest_orcid(orcids):
def dimensions_harvest_dois(orcids):
"""
Fetch the data by ORCID from Dimensions.
Fetch the DOIs from Dimensions by querying the ORCIDs.
"""
return True
author_file = pathlib.Path(snapshot_dir) / "authors.csv"
pickle_file = pathlib.Path(snapshot_dir) / "dimensions_doi_orcid_dict.pickle"

doi_orcid_dict = dimensions_doi_orcids_dict(pickle_file, pickle_file, limit=None)
return doi_orcid_dict

@task()
def openalex_harvest_orcid(orcids):
def openalex_harvest_dois(orcids):
"""
Fetch the data by ORCID from OpenAlex.
Fetch the DOIs from OpenAlex by querying the ORCIDs.
"""
return True

Expand All @@ -64,14 +70,14 @@ def doi_set(dimensions, openalex, sul_pub):
return True

@task()
def dimensions_harvest_doi(dois):
def dimensions_harvest_pubs(dois):
"""
Harvest publication metadata from Dimensions using the dois from doi_set.
"""
return True

@task()
def openalex_harvest_doi(dois):
def openalex_harvest_pubs(dois):
"""
Harvest publication metadata from OpenAlex using the dois from doi_set.
"""
Expand Down Expand Up @@ -107,13 +113,14 @@ def publish(dataset):

snapshot_dir = setup()
sul_pub = sul_pub_harvest(snapshot_dir)
dimensions_orcid = dimensions_harvest_orcid(snapshot_dir)
openalex_orcid = openalex_harvest_orcid(snapshot_dir)
dimensions_orcid = dimensions_harvest_dois(snapshot_dir)
openalex_orcid = openalex_harvest_dois(snapshot_dir)
dois = doi_set(sul_pub, dimensions_orcid, openalex_orcid)
dimensions_doi = dimensions_harvest_doi(dois)
openalex_doi = openalex_harvest_doi(dois)
dimensions_doi = dimensions_harvest_pubs(dois)
openalex_doi = openalex_harvest_pubs(dois)
pubs = merge_publications(sul_pub, dimensions_doi, openalex_doi)
contribs = pubs_to_contribs(pubs)
pubs_org = join_org_data(pubs)
contribs = pubs_to_contribs(pubs_org)
publish(contribs)


Expand Down

0 comments on commit c834a67

Please sign in to comment.