Skip to content

Commit

Permalink
added feature to pull entire remote catalog for use in project
Browse files Browse the repository at this point in the history
  • Loading branch information
bpmeek committed Jun 17, 2024
1 parent d6a074d commit 9719315
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 19 deletions.
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ and unaffected.

Kedro-Universal-Catalog lets you define in a single place the instructions for loading a particular piece of data, this
will ensure that all of your users are using the correct and latest definition. It will also let them easily re-use data
across projects.
across projects. Best of all, it still works with [Kedro-Viz](https://github.com/kedro-org/kedro-viz)


## Getting Started
Expand Down Expand Up @@ -72,6 +72,29 @@ cars:
url: http://localhost:5000/
```
## Advanced usage
### Replace entire catalog
If you have a lot of datasets and don't want to define each one in your Kedro Project's catalog you can
use `RemoteCatalog` instead.

In your Kedro project's `settings.py` add:

```python
from universal_catalog import RemoteCatalog
DATA_CATALOG_CLASS = RemoteCatalog
```

Then update `local/credentials.yml` to tell `RemoteCatalog` where to get your catalog from. Be sure to name the
entry `remote_catalog` and have the key `url`.

```yaml
remote_catalog:
url: http://127.0.0.1:8000/catalog/
```


## What if I don't use Kedro?

[Why you should](https://docs.kedro.org/en/stable/introduction/index.html).
Expand Down
File renamed without changes.
Empty file.
70 changes: 70 additions & 0 deletions tests/core/datasets/test_remote_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest

from .test_universal_catalog_dataset import TEST_URL, TEST_METHOD

from kedro.io.data_catalog import DataCatalog
from kedro.io.core import DatasetError
from universal_catalog import RemoteCatalog


@pytest.fixture
def remote_catalog():
return RemoteCatalog()


@pytest.fixture
def credentials():
return dict(remote_catalog=dict(url=TEST_URL))


@pytest.fixture
def bad_credentials():
return {}


@pytest.fixture
def missing_url():
return dict(remote_catalog={"bad_url": "bad"})


@pytest.fixture
def catalog_json():
return '{"companies": {"type": "pandas.CSVDataset", "filepath": "data/01_raw/companies.csv"}}'


@pytest.fixture
def update_catalog_dict():
return dict(
cars=dict(type="pandas.CSVDataset", filepath="data/01_raw/companies.csv")
)


def test_remote_catalog(remote_catalog):
assert isinstance(remote_catalog, DataCatalog)


def test_bad_credentials(bad_credentials):
with pytest.raises(DatasetError):
RemoteCatalog.from_config(catalog=None, credentials=bad_credentials)


def test_missing_url(missing_url):
with pytest.raises(DatasetError):
RemoteCatalog.from_config(catalog=None, credentials=missing_url)


def test_load_remote_catalog(requests_mock, catalog_json, credentials):
requests_mock.register_uri(TEST_METHOD, TEST_URL + "/catalog/", text=catalog_json)
remote_catalog = RemoteCatalog.from_config(catalog=None, credentials=credentials)
assert isinstance(remote_catalog, DataCatalog)
assert "companies" in remote_catalog._datasets


def test_update(requests_mock, catalog_json, credentials, update_catalog_dict):
requests_mock.register_uri(TEST_METHOD, TEST_URL + "/catalog/", text=catalog_json)
remote_catalog = RemoteCatalog.from_config(
catalog=update_catalog_dict, credentials=credentials
)
assert isinstance(remote_catalog, DataCatalog)
assert "companies" in remote_catalog._datasets
assert "cars" in remote_catalog._datasets
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

from kedro.io.core import DatasetError

from universal_catalog.core.universal_catalog_dataset import UniversalCatalogDataset
from universal_catalog.core.datasets.universal_catalog_dataset import (
UniversalCatalogDataset,
)

TEST_URL = "http://localhost:5000/"
TEST_METHOD = "POST"
Expand Down
7 changes: 7 additions & 0 deletions tests/core/test_universal_catalog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import yaml

from universal_catalog.core.universal_catalog import load_catalog, UniversalCatalog
from kedro.io.core import DatasetNotFoundError
Expand Down Expand Up @@ -43,3 +44,9 @@ def test_get_entry(tmp_catalog: UniversalCatalog):
def test_bad_entry(tmp_catalog: UniversalCatalog):
with pytest.raises(DatasetNotFoundError):
tmp_catalog.get_entry("companie")


def test_get_catalog(tmp_catalog: UniversalCatalog):
catalog = tmp_catalog.get_catalog()
catalog_dict = yaml.safe_load(CATALOG_CONTEXT)
assert catalog_dict == catalog
6 changes: 3 additions & 3 deletions universal_catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .core import UniversalCatalogDataset
from .core import UniversalCatalogDataset, RemoteCatalog

__all__ = ["UniversalCatalogDataset"]
__version__ = "0.1.0"
__all__ = ["UniversalCatalogDataset", "RemoteCatalog"]
__version__ = "0.1.1"
7 changes: 5 additions & 2 deletions universal_catalog/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from .universal_catalog_dataset import UniversalCatalogDataset
from .datasets.universal_catalog_dataset import UniversalCatalogDataset
from .datasets.datasets import Datasets
from .datasets.remote_catalog import RemoteCatalog
from .universal_catalog import UniversalCatalog, load_catalog
from .datasets import Datasets

from .serving import load_server_settings

__all__ = [
"UniversalCatalogDataset",
"UniversalCatalog",
"load_catalog",
"Datasets",
"RemoteCatalog",
"load_server_settings",
]
Empty file.
File renamed without changes.
40 changes: 40 additions & 0 deletions universal_catalog/core/datasets/remote_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

from kedro.io import DataCatalog
from kedro.io.core import DatasetError

from typing import Any

from .utils import _execute_request


class RemoteCatalog(DataCatalog):
"""``RemoteCatalog`` is a DataCatalog subclass that overrides the
`from_config` class method to fetch the entire remote catalog
and merge it with the DataCatalog from your project.
If a dataset is present in both the remote catalog and the project catalog
the project catalog entry is kept.
"""

def __init__(self):
super().__init__()

@classmethod
def from_config(
cls,
catalog: dict[str, dict[str, Any]] | None,
credentials: dict[str, dict[str, Any]] | None = None,
load_versions: dict[str, str] | None = None,
save_version: str | None = None,
) -> DataCatalog:
if "remote_catalog" not in credentials.keys():
raise DatasetError(
"`remote_catalog` must be provided in credentials.\nAlong with url."
)
if "url" not in credentials["remote_catalog"].keys():
raise DatasetError("`url` must be provided in `remote_catalog` entry.")
url = credentials["remote_catalog"]["url"]
url = url + "/catalog/"
cfg = _execute_request(url, None).json()
cfg.update(catalog or {})
return DataCatalog.from_config(cfg, credentials, load_versions, save_version)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import json

from .utils import _execute_request


class UniversalCatalogDataset(AbstractDataset):
"""``UniversalCatalogDataset`` is a wrapper around the ``AbstractDataset``
Expand Down Expand Up @@ -47,22 +49,11 @@ def __init__(self, url: str, source_name: str):
def _materialize(self):
if not self._dataset:
request_body = dict(name=self._source_name)
_config: dict[str, Any] = self._execute_request(request_body).json()
_config: dict[str, Any] = _execute_request(self._url, request_body).json()
self._dataset = AbstractDataset.from_config(
name=self._source_name, config=_config
)

def _execute_request(self, json_obj: dict[str, str]) -> requests.Response:
try:
response = requests.post(self._url, data=json.dumps(json_obj))
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DatasetError("Failed to fetch data", exc) from exc
except OSError as exc:
raise DatasetError("Failed to connect to the remote server", exc) from exc

return response

def _load(self):
self._materialize()
return self._dataset.load()
Expand Down
18 changes: 18 additions & 0 deletions universal_catalog/core/datasets/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

import requests
import json

from kedro.io.core import DatasetError


def _execute_request(url: str, json_obj: dict[str, str] | None) -> requests.Response:
try:
response = requests.post(url, data=json.dumps(json_obj))
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DatasetError("Failed to fetch data", exc) from exc
except OSError as exc:
raise DatasetError("Failed to connect to the remote server", exc) from exc

return response
4 changes: 4 additions & 0 deletions universal_catalog/core/universal_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,7 @@ def get_entry(self, dataset_name: str, suggest: bool = True) -> Dict[str, Any]:
dataset_config = self._datasets.get(dataset_name, None)

return dataset_config

def get_catalog(self) -> dict[str, dict[str, Any]]:
"""Return dictionary of catalog entries."""
return self._datasets
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ async def get_record(dataset_name: Datasets):
return dataset


@app.post("/catalog/")
async def get_catalog():
catalog = CATALOG.get_catalog()
return catalog


if __name__ == "__main__":
load_catalog(CONFIG_LOCATION)
server_settings = load_server_settings(CONFIG_LOCATION)
Expand Down

0 comments on commit 9719315

Please sign in to comment.