From 47954ec8d611bfa190b239d793b178b019070d8f Mon Sep 17 00:00:00 2001 From: Ed Summers Date: Wed, 19 Jun 2024 11:43:29 -0400 Subject: [PATCH] Create snapshots directory The snapshot directories will now go in data/snapshots/. This commit also removes the incremental harvesting logic. If you want to artificially limit the sul_pub harvesting you can set this in your `.env` file: ``` AIRFLOW_VAR_DEV_LIMIT=25000 ``` Closes #26 --- docker-compose.yaml | 1 + .../dags/{update_data.py => harvest.py} | 27 ++++++++++--------- rialto_airflow/utils.py | 16 +++++++++++ rialto_airflow/utils/__init__.py | 15 ----------- test/harvest/test_sul_pub.py | 11 -------- test/test_utils.py | 7 +++++ 6 files changed, 39 insertions(+), 38 deletions(-) rename rialto_airflow/dags/{update_data.py => harvest.py} (80%) create mode 100644 rialto_airflow/utils.py delete mode 100644 rialto_airflow/utils/__init__.py create mode 100644 test/test_utils.py diff --git a/docker-compose.yaml b/docker-compose.yaml index a7f8f4c..5de9bc0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -78,6 +78,7 @@ x-airflow-common: AIRFLOW_VAR_DIMENSIONS_API_PASS: ${AIRFLOW_VAR_DIMENSIONS_API_PASS} AIRFLOW_VAR_SUL_PUB_HOST: ${AIRFLOW_VAR_SUL_PUB_HOST} AIRFLOW_VAR_SUL_PUB_KEY: ${AIRFLOW_VAR_SUL_PUB_KEY} + AIRFLOW_VAR_DEV_LIMIT: ${AIRFLOW_VAR_DEV_LIMIT} AIRFLOW_VAR_DATA_DIR: /opt/airflow/data volumes: - ${AIRFLOW_PROJ_DIR:-.}/rialto_airflow:/opt/airflow/rialto_airflow diff --git a/rialto_airflow/dags/update_data.py b/rialto_airflow/dags/harvest.py similarity index 80% rename from rialto_airflow/dags/update_data.py rename to rialto_airflow/dags/harvest.py index 9d3cfa0..2f0e541 100644 --- a/rialto_airflow/dags/update_data.py +++ b/rialto_airflow/dags/harvest.py @@ -4,37 +4,40 @@ from airflow.models import Variable from airflow.decorators import dag, task -from rialto_airflow.utils import last_harvest, create_snapshot_dir +from rialto_airflow.utils import create_snapshot_dir from rialto_airflow.harvest.sul_pub import sul_pub_csv data_dir = Variable.get("data_dir") sul_pub_host = Variable.get("sul_pub_host") sul_pub_key = Variable.get("sul_pub_key") +# to artificially limit the API activity in development +dev_limit = Variable.get("dev_limit") +if dev_limit is not None: + dev_limit = int(dev_limit) @dag( schedule=None, start_date=datetime.datetime(2024, 1, 1), catchup=False, ) -def update_data(): - @task(multiple_outputs=True) +def harvest(): + + @task() def setup(): """ Setup the data directory to write to and determine the last harvest. """ - return { - "last_harvest": last_harvest(), - "snapshot_dir": create_snapshot_dir(data_dir), - } + snapshot_dir = create_snapshot_dir(data_dir) + return snapshot_dir @task() - def fetch_sul_pub(last_harvest, snapshot_dir): + def fetch_sul_pub(snapshot_dir): """ Harvest data from sul_pub using the last harvest date. """ csv_file = pathlib.Path(snapshot_dir) / "sulpub.csv" - sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, since=last_harvest) + sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, limit=dev_limit) return str(csv_file) @@ -87,8 +90,8 @@ def publish(dataset): """ return True - config = setup() - sul_pub = fetch_sul_pub(config["last_harvest"], config["snapshot_dir"]) + snapshot_dir = setup() + sul_pub = fetch_sul_pub(snapshot_dir) dois = extract_doi(sul_pub) openalex = fetch_openalex(dois) dimensions = fetch_dimensions(dois) @@ -98,4 +101,4 @@ def publish(dataset): publish(dataset) -update_data() +harvest() diff --git a/rialto_airflow/utils.py b/rialto_airflow/utils.py new file mode 100644 index 0000000..ade64d2 --- /dev/null +++ b/rialto_airflow/utils.py @@ -0,0 +1,16 @@ +import datetime + +from pathlib import Path + + +def create_snapshot_dir(data_dir): + snapshots_dir = Path(data_dir) / "snapshots" + + if not snapshots_dir.is_dir(): + snapshots_dir.mkdir() + + now = datetime.datetime.now() + snapshot_dir = snapshots_dir / now.strftime("%Y%m%d%H%M%S") + snapshot_dir.mkdir() + + return str(snapshot_dir) diff --git a/rialto_airflow/utils/__init__.py b/rialto_airflow/utils/__init__.py deleted file mode 100644 index 3da8f9a..0000000 --- a/rialto_airflow/utils/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -import os -import datetime - - -def last_harvest(): - # TODO: look in the data_dir to determine the last harvest - return datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc) - - -def create_snapshot_dir(data_dir): - now = datetime.datetime.now() - snapshot_dir = os.path.join(data_dir, now.strftime("%Y%m%d%H%M%S")) - os.mkdir(snapshot_dir) - - return snapshot_dir diff --git a/test/harvest/test_sul_pub.py b/test/harvest/test_sul_pub.py index f439c9e..1ada300 100644 --- a/test/harvest/test_sul_pub.py +++ b/test/harvest/test_sul_pub.py @@ -24,14 +24,3 @@ def test_sul_pub_csv(tmpdir): df = pandas.read_csv(csv_file) assert len(df) == 2000 assert "title" in df.columns - - -@pytest.mark.skip(reason="sul_pub changeSince broken") -@pytest.mark.skipif(no_auth, reason="no sul_pub key") -def test_sul_pub_csv_since(tmpdir): - csv_file = tmpdir / "sul_pub.csv" - since = datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc) - sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, since=since, limit=100) - - df = pandas.read_csv(csv_file, parse_dates=["last_updated"]) - assert len(df[df["last_updated"] < since]) == 0 diff --git a/test/test_utils.py b/test/test_utils.py new file mode 100644 index 0000000..a63cae6 --- /dev/null +++ b/test/test_utils.py @@ -0,0 +1,7 @@ +from pathlib import Path + +from rialto_airflow.utils import create_snapshot_dir + +def test_create_snapshot_dir(tmpdir): + snap_dir = Path(create_snapshot_dir(tmpdir)) + assert snap_dir.is_dir()