Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create snapshots directory #27

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: Test
on:
- push
- pull_request
jobs:
build:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ x-airflow-common:
AIRFLOW_VAR_DIMENSIONS_API_PASS: ${AIRFLOW_VAR_DIMENSIONS_API_PASS}
AIRFLOW_VAR_SUL_PUB_HOST: ${AIRFLOW_VAR_SUL_PUB_HOST}
AIRFLOW_VAR_SUL_PUB_KEY: ${AIRFLOW_VAR_SUL_PUB_KEY}
AIRFLOW_VAR_DEV_LIMIT: ${AIRFLOW_VAR_DEV_LIMIT}
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/rialto_airflow:/opt/airflow/rialto_airflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,40 @@
from airflow.models import Variable
from airflow.decorators import dag, task

from rialto_airflow.utils import last_harvest, create_snapshot_dir
from rialto_airflow.utils import create_snapshot_dir
from rialto_airflow.harvest.sul_pub import sul_pub_csv

data_dir = Variable.get("data_dir")
sul_pub_host = Variable.get("sul_pub_host")
sul_pub_key = Variable.get("sul_pub_key")

# to artificially limit the API activity in development
dev_limit = Variable.get("dev_limit")
if dev_limit is not None:
dev_limit = int(dev_limit)


@dag(
schedule=None,
start_date=datetime.datetime(2024, 1, 1),
catchup=False,
)
def update_data():
@task(multiple_outputs=True)
def harvest():
@task()
def setup():
"""
Setup the data directory to write to and determine the last harvest.
"""
return {
"last_harvest": last_harvest(),
"snapshot_dir": create_snapshot_dir(data_dir),
}
snapshot_dir = create_snapshot_dir(data_dir)
return snapshot_dir

@task()
def fetch_sul_pub(last_harvest, snapshot_dir):
def fetch_sul_pub(snapshot_dir):
"""
Harvest data from sul_pub using the last harvest date.
"""
csv_file = pathlib.Path(snapshot_dir) / "sulpub.csv"
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, since=last_harvest)
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, limit=dev_limit)

return str(csv_file)

Expand Down Expand Up @@ -87,8 +90,8 @@ def publish(dataset):
"""
return True

config = setup()
sul_pub = fetch_sul_pub(config["last_harvest"], config["snapshot_dir"])
snapshot_dir = setup()
sul_pub = fetch_sul_pub(snapshot_dir)
dois = extract_doi(sul_pub)
openalex = fetch_openalex(dois)
dimensions = fetch_dimensions(dois)
Expand All @@ -98,4 +101,4 @@ def publish(dataset):
publish(dataset)


update_data()
harvest()
16 changes: 16 additions & 0 deletions rialto_airflow/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import datetime

from pathlib import Path


def create_snapshot_dir(data_dir):
snapshots_dir = Path(data_dir) / "snapshots"

if not snapshots_dir.is_dir():
snapshots_dir.mkdir()

now = datetime.datetime.now()
snapshot_dir = snapshots_dir / now.strftime("%Y%m%d%H%M%S")
snapshot_dir.mkdir()

return str(snapshot_dir)
15 changes: 0 additions & 15 deletions rialto_airflow/utils/__init__.py

This file was deleted.

12 changes: 0 additions & 12 deletions test/harvest/test_sul_pub.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import datetime

import dotenv
import pandas
Expand All @@ -24,14 +23,3 @@ def test_sul_pub_csv(tmpdir):
df = pandas.read_csv(csv_file)
assert len(df) == 2000
assert "title" in df.columns


@pytest.mark.skip(reason="sul_pub changeSince broken")
@pytest.mark.skipif(no_auth, reason="no sul_pub key")
def test_sul_pub_csv_since(tmpdir):
csv_file = tmpdir / "sul_pub.csv"
since = datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, since=since, limit=100)

df = pandas.read_csv(csv_file, parse_dates=["last_updated"])
assert len(df[df["last_updated"] < since]) == 0
8 changes: 8 additions & 0 deletions test/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from pathlib import Path

from rialto_airflow.utils import create_snapshot_dir


def test_create_snapshot_dir(tmpdir):
snap_dir = Path(create_snapshot_dir(tmpdir))
assert snap_dir.is_dir()
Loading