-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
ed9d56e
commit b78326b
Showing
8 changed files
with
414 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
name: Pytest | ||
on: [push, pull_request] | ||
jobs: | ||
uv: | ||
name: python | ||
runs-on: ubuntu-latest | ||
|
||
steps: | ||
- uses: actions/checkout@v4 | ||
|
||
- name: Install uv | ||
uses: astral-sh/setup-uv@v3 | ||
with: | ||
version: "0.5.3" | ||
|
||
- name: Set up Python | ||
run: uv python install | ||
|
||
- name: Install the project | ||
run: uv sync --all-extras --dev | ||
|
||
- name: Run tests | ||
run: uv run pytest tests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
from io import StringIO | ||
from ingest_wikimedia.common import load_ids, null_safe, get_list, get_str, get_dict | ||
|
||
|
||
def test_load_ids(): | ||
ids_file = StringIO("id1\nid2\nid3") | ||
expected_ids = ["id1", "id2", "id3"] | ||
assert load_ids(ids_file) == expected_ids | ||
|
||
|
||
def test_null_safe(): | ||
data = {"key1": "value1", "key2": 2} | ||
assert null_safe(data, "key1", "") == "value1" | ||
assert null_safe(data, "key2", 0) == 2 | ||
assert null_safe(data, "key3", "default") == "default" | ||
assert null_safe(None, "key1", "default") == "default" | ||
assert ( | ||
null_safe(data, "key1", 0) == 0 | ||
) # Type mismatch, should return identity_element | ||
|
||
|
||
def test_get_list(): | ||
data = {"key1": [1, 2, 3], "key2": "not a list"} | ||
assert get_list(data, "key1") == [1, 2, 3] | ||
assert get_list(data, "key2") == [] | ||
assert get_list(data, "key3") == [] | ||
|
||
|
||
def test_get_str(): | ||
data = {"key1": "value1", "key2": 2} | ||
assert get_str(data, "key1") == "value1" | ||
assert get_str(data, "key2") == "" | ||
assert get_str(data, "key3") == "" | ||
|
||
|
||
def test_get_dict(): | ||
data = {"key1": {"subkey": "subvalue"}, "key2": "not a dict"} | ||
assert get_dict(data, "key1") == {"subkey": "subvalue"} | ||
assert get_dict(data, "key2") == {} | ||
assert get_dict(data, "key3") == {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import os | ||
import hashlib | ||
import pytest | ||
from ingest_wikimedia.local import ( | ||
setup_temp_dir, | ||
cleanup_temp_dir, | ||
get_temp_file, | ||
clean_up_tmp_file, | ||
get_file_hash, | ||
get_bytes_hash, | ||
get_content_type, | ||
) | ||
|
||
|
||
@pytest.fixture(autouse=True) | ||
def setup_and_teardown_temp_dir(): | ||
setup_temp_dir() | ||
yield | ||
print("cleanup") | ||
cleanup_temp_dir() | ||
|
||
|
||
def test_get_and_cleanup_temp_file(): | ||
temp_file = get_temp_file() | ||
assert os.path.exists(temp_file.name) | ||
temp_file.close() | ||
clean_up_tmp_file(temp_file) | ||
assert not os.path.exists(temp_file.name) | ||
|
||
|
||
def test_get_file_hash(tmp_path): | ||
test_file = tmp_path / "test_file.txt" | ||
test_file.write_text("test content") | ||
expected_hash = hashlib.sha1(test_file.read_bytes()).hexdigest() | ||
assert get_file_hash(str(test_file)) == expected_hash | ||
|
||
|
||
def test_get_bytes_hash(): | ||
data = "test content" | ||
expected_hash = hashlib.sha1(data.encode("utf-8")).hexdigest() | ||
assert get_bytes_hash(data) == expected_hash | ||
|
||
|
||
SPACER_GIF = ( | ||
b"GIF89a\\x01\\x00\\x01\\x00\\x80\\x00\\x00\\xff\\xff\\xff\\xff\\xff\\xff!\\xf9" | ||
b"\\x04\\x01\\x00\\x00\\x01\\x00,\\x00\\x00\\x00\\x00\\x01\\x00\\x01\\x00\\x00" | ||
b"\\x02\\x02L\\x01\\x00;" | ||
) | ||
|
||
|
||
def test_get_content_type(tmp_path): | ||
test_file = tmp_path / "test_file.txt" | ||
test_file.write_bytes(SPACER_GIF) | ||
assert get_content_type(str(test_file)) == "image/gif" | ||
|
||
|
||
def test_get_invalid_content_type(tmp_path): | ||
invalid_file = tmp_path / "invalid_file.invalid" | ||
invalid_file.write_text("invalid content") | ||
with pytest.raises(Exception, match="Invalid content-type"): | ||
get_content_type(str(invalid_file)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
import pytest | ||
|
||
from unittest.mock import patch, MagicMock | ||
from ingest_wikimedia.metadata import ( | ||
check_partner, | ||
get_item_metadata, | ||
is_wiki_eligible, | ||
get_provider_and_data_provider, | ||
get_providers_data, | ||
provider_str, | ||
extract_urls, | ||
iiif_v2_urls, | ||
iiif_v3_urls, | ||
get_iiif_urls, | ||
get_iiif_manifest, | ||
contentdm_iiif_url, | ||
) | ||
|
||
|
||
def test_check_partner(): | ||
with pytest.raises(Exception, match="Unrecognized partner."): | ||
check_partner("invalid_partner") | ||
|
||
# Assuming "bpl" is a valid partner | ||
check_partner("bpl") | ||
|
||
|
||
@patch("ingest_wikimedia.metadata.get_http_session") | ||
def test_get_item_metadata(mock_get_http_session): | ||
mock_response = MagicMock() | ||
mock_response.json.return_value = {"docs": [{"id": "test_id"}]} | ||
mock_get_http_session.return_value.get.return_value = mock_response | ||
|
||
result = get_item_metadata("test_id", "test_api_key") | ||
assert result == {"id": "test_id"} | ||
|
||
|
||
def test_is_wiki_eligible(): | ||
item_metadata = { | ||
"rightsCategory": "Unlimited Re-Use", | ||
"isShownAt": "http://example.com", | ||
"mediaMaster": ["http://example.com/media"], | ||
} | ||
provider = {"upload": True} | ||
data_provider = {"upload": True} | ||
|
||
assert is_wiki_eligible(item_metadata, provider, data_provider) | ||
|
||
|
||
def test_get_provider_and_data_provider(): | ||
item_metadata = { | ||
"provider": {"name": "test_provider"}, | ||
"dataProvider": {"name": "test_data_provider"}, | ||
} | ||
providers_json = {"test_provider": {"institutions": {"test_data_provider": {}}}} | ||
|
||
provider, data_provider = get_provider_and_data_provider( | ||
item_metadata, providers_json | ||
) | ||
assert provider == {"institutions": {"test_data_provider": {}}} | ||
assert data_provider == {} | ||
|
||
|
||
@patch("ingest_wikimedia.metadata.get_http_session") | ||
def test_get_providers_data(mock_get_http_session): | ||
mock_response = MagicMock() | ||
mock_response.json.return_value = {"provider": "data"} | ||
mock_get_http_session.return_value.get.return_value = mock_response | ||
|
||
result = get_providers_data() | ||
assert result == {"provider": "data"} | ||
|
||
|
||
def test_provider_str(): | ||
provider = {"Wikidata": "Q123", "upload": True} | ||
result = provider_str(provider) | ||
assert result == "Provider: Q123, True" | ||
|
||
|
||
def test_extract_urls(): | ||
item_metadata = {"mediaMaster": ["http://example.com/media"]} | ||
result = extract_urls("partner", "dpla_id", item_metadata) | ||
assert result == ["http://example.com/media"] | ||
|
||
|
||
def test_iiif_v2_urls(): | ||
iiif = { | ||
"sequences": [ | ||
{ | ||
"canvases": [ | ||
{"images": [{"resource": {"@id": "http://example.com/image"}}]} | ||
] | ||
} | ||
] | ||
} | ||
result = iiif_v2_urls(iiif) | ||
assert result == ["http://example.com/image"] | ||
|
||
|
||
def test_iiif_v3_urls(): | ||
iiif = { | ||
"items": [ | ||
{"items": [{"items": [{"body": {"id": "http://example.com/image"}}]}]} | ||
] | ||
} | ||
result = iiif_v3_urls(iiif) | ||
assert result == ["http://example.com/image/full/full/0/default.jpg"] | ||
|
||
|
||
def test_get_iiif_urls(): | ||
iiif_v2 = {"@context": "http://iiif.io/api/presentation/2/context.json"} | ||
iiif_v3 = {"@context": "http://iiif.io/api/presentation/3/context.json"} | ||
iiif_not = {"@context": "https://realultimatepower.net/"} | ||
|
||
with patch("ingest_wikimedia.metadata.iiif_v2_urls", return_value=["v2_url"]): | ||
assert get_iiif_urls(iiif_v2) == ["v2_url"] | ||
|
||
with patch("ingest_wikimedia.metadata.iiif_v3_urls", return_value=["v3_url"]): | ||
assert get_iiif_urls(iiif_v3) == ["v3_url"] | ||
|
||
with pytest.raises(Exception, match="Unimplemented IIIF version"): | ||
get_iiif_urls(iiif_not) | ||
|
||
|
||
@patch("ingest_wikimedia.metadata.get_http_session") | ||
def test_get_iiif_manifest(mock_get_http_session): | ||
mock_response = MagicMock() | ||
mock_response.json.return_value = {"manifest": "data"} | ||
mock_get_http_session.return_value.get.return_value = mock_response | ||
|
||
result = get_iiif_manifest("http://example.com/manifest") | ||
assert result == {"manifest": "data"} | ||
|
||
|
||
def test_contentdm_iiif_url(): | ||
is_shown_at = "http://www.ohiomemory.org/cdm/ref/collection/p16007coll33/id/126923" | ||
expected_url = ( | ||
"http://www.ohiomemory.org/iiif/info/p16007coll33/126923/manifest.json" | ||
) | ||
assert contentdm_iiif_url(is_shown_at) == expected_url |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
from unittest.mock import patch, MagicMock | ||
|
||
from botocore.exceptions import ClientError | ||
|
||
from ingest_wikimedia.s3 import ( | ||
get_s3, | ||
get_item_s3_path, | ||
get_media_s3_path, | ||
s3_file_exists, | ||
write_item_metadata, | ||
write_file_list, | ||
write_iiif_manifest, | ||
write_item_file, | ||
) | ||
from ingest_wikimedia.common import CHECKSUM | ||
|
||
|
||
@patch("ingest_wikimedia.s3.boto3.resource") | ||
def test_get_s3(mock_boto3_resource): | ||
mock_s3 = MagicMock() | ||
mock_boto3_resource.return_value = mock_s3 | ||
|
||
s3 = get_s3() | ||
assert s3 == mock_s3 | ||
assert mock_boto3_resource.called | ||
|
||
|
||
def test_get_item_s3_path(): | ||
path = get_item_s3_path("abcd1234", "file.txt", "partner") | ||
expected_path = "partner/images/a/b/c/d/abcd1234/file.txt" | ||
assert path == expected_path | ||
|
||
|
||
def test_get_media_s3_path(): | ||
path = get_media_s3_path("abcd1234", 1, "partner") | ||
expected_path = "partner/images/a/b/c/d/abcd1234/1_abcd1234" | ||
assert path == expected_path | ||
|
||
|
||
@patch("ingest_wikimedia.s3.get_s3") | ||
def test_s3_file_exists(mock_get_s3): | ||
mock_s3 = MagicMock() | ||
mock_get_s3.return_value = mock_s3 | ||
mock_s3.Object.return_value.load.return_value = None | ||
|
||
assert s3_file_exists("path/to/file") | ||
mock_s3.Object.return_value.load.side_effect = ClientError( | ||
{"Error": {"Code": "404"}}, "load" | ||
) | ||
assert not s3_file_exists("path/to/file") | ||
|
||
|
||
@patch("ingest_wikimedia.s3.write_item_file") | ||
def test_write_item_metadata(mock_write_item_file): | ||
write_item_metadata("partner", "abcd1234", "metadata") | ||
mock_write_item_file.assert_called_once_with( | ||
"partner", "abcd1234", "metadata", "dpla-map.json", "text/plain" | ||
) | ||
|
||
|
||
@patch("ingest_wikimedia.s3.write_item_file") | ||
def test_write_file_list(mock_write_item_file): | ||
write_file_list("partner", "abcd1234", ["url1", "url2"]) | ||
mock_write_item_file.assert_called_once_with( | ||
"partner", "abcd1234", "url1\nurl2", "file-list.txt", "text/plain" | ||
) | ||
|
||
|
||
@patch("ingest_wikimedia.s3.write_item_file") | ||
def test_write_iiif_manifest(mock_write_item_file): | ||
write_iiif_manifest("partner", "abcd1234", "manifest") | ||
mock_write_item_file.assert_called_once_with( | ||
"partner", "abcd1234", "manifest", "iiif.json", "application/json" | ||
) | ||
|
||
|
||
@patch("ingest_wikimedia.s3.get_s3") | ||
@patch("ingest_wikimedia.s3.get_bytes_hash") | ||
def test_write_item_file(mock_get_bytes_hash, mock_get_s3): | ||
mock_s3 = MagicMock() | ||
mock_get_s3.return_value = mock_s3 | ||
mock_get_bytes_hash.return_value = "fakehash" | ||
|
||
write_item_file("partner", "abcd1234", "data", "file.txt", "text/plain") | ||
mock_s3.Object.return_value.put.assert_called_once_with( | ||
ContentType="text/plain", Metadata={CHECKSUM: "fakehash"}, Body="data" | ||
) |
Oops, something went wrong.