From aa6bf97ed01b2d34d279aa840430da6c20835321 Mon Sep 17 00:00:00 2001 From: Laura Wrubel Date: Thu, 27 Jun 2024 13:12:22 -0400 Subject: [PATCH] Cleanup on merge_pubs --- rialto_airflow/dags/harvest.py | 4 +--- rialto_airflow/harvest/merge_pubs.py | 1 + test/harvest/test_merge_pubs.py | 17 +++++++++++++---- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/rialto_airflow/dags/harvest.py b/rialto_airflow/dags/harvest.py index 5b634fe..3b2481e 100644 --- a/rialto_airflow/dags/harvest.py +++ b/rialto_airflow/dags/harvest.py @@ -100,9 +100,7 @@ def merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir): """ Merge the OpenAlex, Dimensions and sul_pub data. """ - output = ( - Path(snapshot_dir) / "publications.parquet" - ) # TODO: update file extension to actual format used + output = Path(snapshot_dir) / "publications.parquet" merge_pubs.merge(sul_pub, openalex_pubs, dimensions_pubs, output) return str(output) diff --git a/rialto_airflow/harvest/merge_pubs.py b/rialto_airflow/harvest/merge_pubs.py index c18e933..3477210 100644 --- a/rialto_airflow/harvest/merge_pubs.py +++ b/rialto_airflow/harvest/merge_pubs.py @@ -86,5 +86,6 @@ def sulpub_df(sul_pub): """ df = pl.scan_csv(sul_pub) df = df.drop_nulls("doi") + df = df.with_columns(pl.col("doi").str.replace("https://doi.org/", "")) df = df.rename(lambda column_name: "sul_pub_" + column_name) return df diff --git a/test/harvest/test_merge_pubs.py b/test/harvest/test_merge_pubs.py index 99ab2b7..b4d62fa 100644 --- a/test/harvest/test_merge_pubs.py +++ b/test/harvest/test_merge_pubs.py @@ -125,6 +125,14 @@ def sul_pubs_csv(tmp_path): "2024", ] ) + writer.writerow( + [ + "[]", + "A Published Research Article", + "2024", + "https://doi.org/10.0000/dddd", + ] + ) return fixture_file @@ -150,14 +158,14 @@ def test_sulpub_df(sul_pubs_csv): lazy_df = merge_pubs.sulpub_df(sul_pubs_csv) assert type(lazy_df) == pl.lazyframe.frame.LazyFrame df = lazy_df.collect() - assert df.shape[0] == 1, "Row without a doi has been dropped" + assert df.shape[0] == 2, "Row without a doi has been dropped" assert df.columns == [ "sul_pub_authorship", "sul_pub_title", "sul_pub_year", "sul_pub_doi", ] - assert df["sul_pub_doi"].to_list() == ["10.0000/cccc"] + assert df["sul_pub_doi"].to_list() == ["10.0000/cccc", "10.0000/dddd"] def test_merge(tmp_path, sul_pubs_csv, openalex_pubs_csv, dimensions_pubs_csv): @@ -165,7 +173,8 @@ def test_merge(tmp_path, sul_pubs_csv, openalex_pubs_csv, dimensions_pubs_csv): merge_pubs.merge(sul_pubs_csv, openalex_pubs_csv, dimensions_pubs_csv, output) assert output.is_file(), "output file has been created" df = pl.read_parquet(output) - assert df.shape[0] == 3 + assert df.shape[0] == 4 + assert df.shape[1] == 25 assert set(df["doi"].to_list()) == set( - ["10.0000/aaaa", "10.0000/1234", "10.0000/cccc"] + ["10.0000/aaaa", "10.0000/1234", "10.0000/cccc", "10.0000/dddd"] )