Skip to content

Commit

Permalink
Create snapshots directory
Browse files Browse the repository at this point in the history
The snapshot directories will now go in data/snapshots/. This commit
also removes the incremental harvesting logic.

If you want to artificially limit the sul_pub harvesting you can set
this in your `.env` file:

```
AIRFLOW_VAR_DEV_LIMIT=25000
```

Closes #26
  • Loading branch information
edsu committed Jun 19, 2024
1 parent 5ff1640 commit ca815c0
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 38 deletions.
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ x-airflow-common:
AIRFLOW_VAR_DIMENSIONS_API_PASS: ${AIRFLOW_VAR_DIMENSIONS_API_PASS}
AIRFLOW_VAR_SUL_PUB_HOST: ${AIRFLOW_VAR_SUL_PUB_HOST}
AIRFLOW_VAR_SUL_PUB_KEY: ${AIRFLOW_VAR_SUL_PUB_KEY}
AIRFLOW_VAR_DEV_LIMIT: ${AIRFLOW_VAR_DEV_LIMIT}
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/rialto_airflow:/opt/airflow/rialto_airflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,37 @@
from airflow.models import Variable
from airflow.decorators import dag, task

from rialto_airflow.utils import last_harvest, create_snapshot_dir
from rialto_airflow.utils import create_snapshot_dir
from rialto_airflow.harvest.sul_pub import sul_pub_csv

data_dir = Variable.get("data_dir")
sul_pub_host = Variable.get("sul_pub_host")
sul_pub_key = Variable.get("sul_pub_key")

# to artificially limit the API activity in development
dev_limit = Variable.get("dev_limit")
if dev_limit is not None:
dev_limit = int(dev_limit)

@dag(
schedule=None,
start_date=datetime.datetime(2024, 1, 1),
catchup=False,
)
def update_data():
@task(multiple_outputs=True)
def harvest():
def setup():
"""
Setup the data directory to write to and determine the last harvest.
"""
return {
"last_harvest": last_harvest(),
"snapshot_dir": create_snapshot_dir(data_dir),
}
return create_snapshot_dir(data_dir)

@task()
def fetch_sul_pub(last_harvest, snapshot_dir):
def fetch_sul_pub(snapshot_dir):
"""
Harvest data from sul_pub using the last harvest date.
"""
csv_file = pathlib.Path(snapshot_dir) / "sulpub.csv"
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, since=last_harvest)
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, limit=dev_limit)

return str(csv_file)

Expand Down Expand Up @@ -87,8 +87,8 @@ def publish(dataset):
"""
return True

config = setup()
sul_pub = fetch_sul_pub(config["last_harvest"], config["snapshot_dir"])
snapshot_dir = setup()
sul_pub = fetch_sul_pub(snapshot_dir)
dois = extract_doi(sul_pub)
openalex = fetch_openalex(dois)
dimensions = fetch_dimensions(dois)
Expand All @@ -98,4 +98,4 @@ def publish(dataset):
publish(dataset)


update_data()
harvest()
16 changes: 16 additions & 0 deletions rialto_airflow/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import datetime

from pathlib import Path


def create_snapshot_dir(data_dir):
snapshots_dir = Path(data_dir) / "snapshots"

if not snapshots_dir.is_dir():
snapshots_dir.mkdir()

now = datetime.datetime.now()
snapshot_dir = snapshots_dir / now.strftime("%Y%m%d%H%M%S")
snapshot_dir.mkdir()

return str(snapshot_dir)
15 changes: 0 additions & 15 deletions rialto_airflow/utils/__init__.py

This file was deleted.

11 changes: 0 additions & 11 deletions test/harvest/test_sul_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,3 @@ def test_sul_pub_csv(tmpdir):
df = pandas.read_csv(csv_file)
assert len(df) == 2000
assert "title" in df.columns


@pytest.mark.skip(reason="sul_pub changeSince broken")
@pytest.mark.skipif(no_auth, reason="no sul_pub key")
def test_sul_pub_csv_since(tmpdir):
csv_file = tmpdir / "sul_pub.csv"
since = datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, since=since, limit=100)

df = pandas.read_csv(csv_file, parse_dates=["last_updated"])
assert len(df[df["last_updated"] < since]) == 0
7 changes: 7 additions & 0 deletions test/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pathlib import Path

from rialto_airflow.utils import create_snapshot_dir

def test_create_snapshot_dir(tmpdir):
snap_dir = Path(create_snapshot_dir(tmpdir))
assert snap_dir.is_dir()

0 comments on commit ca815c0

Please sign in to comment.