Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catalog to config #4323

Merged
merged 88 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
ae5384a
Captured init arguments
ElenaKhaustova Nov 7, 2024
fcdf357
Implemented unresoloving credentials
ElenaKhaustova Nov 7, 2024
b147374
Added some comments
ElenaKhaustova Nov 7, 2024
0ed0c1e
Put type in first place for dataset config
ElenaKhaustova Nov 7, 2024
3c839a9
Handled version key
ElenaKhaustova Nov 7, 2024
29bb714
Added lazy dataset to_config
ElenaKhaustova Nov 8, 2024
49858b6
Removed data key from MemoryDataset
ElenaKhaustova Nov 8, 2024
0d0ba91
Added TODOs
ElenaKhaustova Nov 8, 2024
8413f58
Saved call args
ElenaKhaustova Nov 11, 2024
a89db7e
Saved only set credentials
ElenaKhaustova Nov 11, 2024
b081c65
Processed CachedDataset case
ElenaKhaustova Nov 11, 2024
18c0ad6
Updated TODOs
ElenaKhaustova Nov 12, 2024
2751ea8
Tested with PartitionedDataset
ElenaKhaustova Nov 12, 2024
fc576ff
Popped metadata
ElenaKhaustova Nov 12, 2024
1d6454c
Fixed versioning when load
ElenaKhaustova Nov 12, 2024
8c31237
Fixed linter
ElenaKhaustova Nov 12, 2024
e035881
Tested datasets factories
ElenaKhaustova Nov 13, 2024
edcdc38
Tested transcoding
ElenaKhaustova Nov 13, 2024
15a1e72
Removed TODOs
ElenaKhaustova Nov 13, 2024
d4e4534
Removed debug output
ElenaKhaustova Nov 13, 2024
e7e8af5
Removed debug output
ElenaKhaustova Nov 13, 2024
1b6be8e
Added logic to set VERSIONED_FLAG_KEY
ElenaKhaustova Nov 14, 2024
db77436
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 14, 2024
c6dc380
Updated version set up
ElenaKhaustova Nov 14, 2024
54b0793
Added TODO for versioning
ElenaKhaustova Nov 14, 2024
f183e60
Added tests for unresolve_config_credentials
ElenaKhaustova Nov 14, 2024
0d9d241
Implemented test_to_config
ElenaKhaustova Nov 14, 2024
763e635
Added test with MemoryDataset
ElenaKhaustova Nov 14, 2024
8795dd6
Extended test examples
ElenaKhaustova Nov 14, 2024
e3289b4
Materialized cached_ds
ElenaKhaustova Nov 14, 2024
5d93a41
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 14, 2024
59b603e
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 15, 2024
ae62886
Exclude parameters
ElenaKhaustova Nov 18, 2024
b2ebfe2
Fixed import
ElenaKhaustova Nov 18, 2024
a07107a
Added test with parameters
ElenaKhaustova Nov 18, 2024
5dc4abf
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 19, 2024
e5adb5d
Moved tests for CatalogConfigResolver to a separate file
ElenaKhaustova Nov 19, 2024
bdf45a3
Made unresolve_config_credentials staticmethod
ElenaKhaustova Nov 19, 2024
33d6791
Updated comment to clarify meaning
ElenaKhaustova Nov 19, 2024
33ff979
Moved to_config anfter from_config
ElenaKhaustova Nov 19, 2024
7546540
Returned is_parameter for catalog and added TODOs
ElenaKhaustova Nov 19, 2024
c37c04d
Renamed catalog config resolver methods
ElenaKhaustova Nov 20, 2024
591f4a0
Implemented _validate_versions method
ElenaKhaustova Nov 21, 2024
5aaebe6
Added _validate_versions calls
ElenaKhaustova Nov 21, 2024
bdb7cf6
Updated error descriptions
ElenaKhaustova Nov 21, 2024
e2ffeaa
Added validation to the old catalog
ElenaKhaustova Nov 22, 2024
6b1e802
Fixed linter
ElenaKhaustova Nov 22, 2024
06e343b
Implemented unit tests for KedroDataCatalog
ElenaKhaustova Nov 22, 2024
5492b9f
Removed odd comments
ElenaKhaustova Nov 22, 2024
c96546c
Implemented tests for DataCatalog
ElenaKhaustova Nov 22, 2024
46f2df6
Added docstrings
ElenaKhaustova Nov 22, 2024
56a067c
Added release notes
ElenaKhaustova Nov 22, 2024
cbd1d4a
Merge branch 'fix/4327-validate-datasets-versions' into feature/3932-…
ElenaKhaustova Nov 22, 2024
e9027b9
Updated version logic
ElenaKhaustova Nov 22, 2024
11b148b
Added CachedDataset case
ElenaKhaustova Nov 22, 2024
163ca17
Merge branch 'main' into fix/4327-validate-datasets-versions
ElenaKhaustova Nov 27, 2024
ca2ac6c
Updated release notes
ElenaKhaustova Nov 27, 2024
615d135
Added tests for CachedDataset use case
ElenaKhaustova Nov 27, 2024
8dd1084
Merge branch 'fix/4327-validate-datasets-versions' into feature/3932-…
ElenaKhaustova Nov 27, 2024
8a01881
Updated unit test after version validation is applied
ElenaKhaustova Nov 27, 2024
eb44a30
Removed MemoryDatasets
ElenaKhaustova Nov 27, 2024
ba3d04e
Removed _is_parameter
ElenaKhaustova Nov 27, 2024
35953a9
Pop metadata from cached dataset configuration
ElenaKhaustova Nov 27, 2024
d56793b
Fixed lint
ElenaKhaustova Nov 27, 2024
ebf1483
Fixed unit test
ElenaKhaustova Nov 27, 2024
f5468c9
Added docstrings for AbstractDataset.to_config()
ElenaKhaustova Nov 27, 2024
edee597
Updated docstrings
ElenaKhaustova Nov 27, 2024
4454970
Fixed typos
ElenaKhaustova Nov 27, 2024
5d6bd3c
Updated TODOs
ElenaKhaustova Nov 27, 2024
f1ace7c
Merge branch 'main' into fix/4327-validate-datasets-versions
ElenaKhaustova Nov 27, 2024
95fc260
Merge branch 'fix/4327-validate-datasets-versions' into feature/3932-…
ElenaKhaustova Nov 27, 2024
86e25e9
Added docstring for KedroDataCatalog.to_config
ElenaKhaustova Nov 27, 2024
c8fd99e
Added docstrinbgs for unresolve_credentials
ElenaKhaustova Nov 27, 2024
5b2f21f
Updated release notes
ElenaKhaustova Nov 27, 2024
35dc102
Fixed indentation
ElenaKhaustova Nov 27, 2024
2853fda
Fixed to_config() example
ElenaKhaustova Nov 27, 2024
8f0fe4f
Fixed indentation
ElenaKhaustova Nov 27, 2024
0db9b46
Fixed indentation
ElenaKhaustova Nov 27, 2024
2f72e23
Added a note about to_config() constraints
ElenaKhaustova Nov 27, 2024
33f29fd
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 28, 2024
a7689b9
Fixed typo
ElenaKhaustova Nov 28, 2024
3c3664e
Replace type string with the constant
ElenaKhaustova Nov 28, 2024
b7183ab
Replace type string with the constant
ElenaKhaustova Nov 28, 2024
171e80f
Moved _is_memory_dataset
ElenaKhaustova Nov 28, 2024
b789018
Simplified nested decorator
ElenaKhaustova Nov 28, 2024
6ba6ee4
Fixed lint
ElenaKhaustova Nov 28, 2024
7008346
Removed _init_args class attribute
ElenaKhaustova Nov 29, 2024
7af15a5
Returned @wraps
ElenaKhaustova Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
## Bug fixes and other changes
* Added I/O support for Oracle Cloud Infrastructure (OCI) Object Storage filesystem.
* Fixed `DatasetAlreadyExistsError` for `ThreadRunner` when Kedro project run and using runner separately.
* Added validation to ensure dataset versions consistency across catalog.

## Breaking changes to the API
## Documentation changes
Expand Down
1 change: 1 addition & 0 deletions kedro/framework/cli/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def _create_session(package_name: str, **kwargs: Any) -> KedroSession:


def is_parameter(dataset_name: str) -> bool:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
# TODO: when breaking change replace with is_parameter from kedro/io/core.py
"""Check if dataset is a parameter."""
return dataset_name.startswith("params:") or dataset_name == "parameters"

Expand Down
34 changes: 29 additions & 5 deletions kedro/io/catalog_config_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
self._dataset_patterns, self._default_pattern = self._extract_patterns(
config, credentials
)
self._resolved_configs = self._resolve_config_credentials(config, credentials)
self._resolved_configs = self.resolve_credentials(config, credentials)

@property
def config(self) -> dict[str, dict[str, Any]]:
Expand Down Expand Up @@ -237,8 +237,9 @@ def _extract_patterns(

return sorted_patterns, user_default

def _resolve_config_credentials(
self,
@classmethod
def resolve_credentials(
cls,
config: dict[str, dict[str, Any]] | None,
credentials: dict[str, dict[str, Any]] | None,
) -> dict[str, dict[str, Any]]:
Expand All @@ -254,13 +255,36 @@ def _resolve_config_credentials(
"\nHint: If this catalog entry is intended for variable interpolation, "
"make sure that the key is preceded by an underscore."
)
if not self.is_pattern(ds_name):
resolved_configs[ds_name] = self._resolve_credentials(
if not cls.is_pattern(ds_name):
resolved_configs[ds_name] = cls._resolve_credentials(
ds_config, credentials
)

return resolved_configs

@staticmethod
def unresolve_credentials(
cred_name: str, ds_config: dict[str, dict[str, Any]] | None
) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]:
ds_config_copy = copy.deepcopy(ds_config) or {}
credentials: dict[str, Any] = {}
credentials_ref = f"{cred_name}_{CREDENTIALS_KEY}"

def unresolve(config: Any) -> None:
# We don't expect credentials key appears more than once within the same dataset config,
# So once we found the key first time we unresolve it and stop iterating after
for key, val in config.items():
if key == CREDENTIALS_KEY and config[key]:
credentials[credentials_ref] = config[key]
config[key] = credentials_ref
return
if isinstance(val, dict):
unresolve(val)

unresolve(ds_config_copy)

return ds_config_copy, credentials

def resolve_pattern(self, ds_name: str) -> dict[str, Any]:
"""Resolve dataset patterns and return resolved configurations based on the existing patterns."""
matched_pattern = self.match_pattern(ds_name)
Expand Down
130 changes: 124 additions & 6 deletions kedro/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datetime import datetime, timezone
from functools import partial, wraps
from glob import iglob
from inspect import getcallargs
from operator import attrgetter
from pathlib import Path, PurePath, PurePosixPath
from typing import (
Expand Down Expand Up @@ -57,6 +58,7 @@
"s3a",
"s3n",
)
TYPE_KEY = "type"


class DatasetError(Exception):
Expand All @@ -71,16 +73,16 @@ class DatasetError(Exception):


class DatasetNotFoundError(DatasetError):
"""``DatasetNotFoundError`` raised by ``DataCatalog`` class in case of
trying to use a non-existing dataset.
"""``DatasetNotFoundError`` raised by ```DataCatalog`` and ``KedroDataCatalog``
classes in case of trying to use a non-existing dataset.
"""

pass


class DatasetAlreadyExistsError(DatasetError):
"""``DatasetAlreadyExistsError`` raised by ``DataCatalog`` class in case
of trying to add a dataset which already exists in the ``DataCatalog``.
"""``DatasetAlreadyExistsError`` raised by ```DataCatalog`` and ``KedroDataCatalog``
classes in case of trying to add a dataset which already exists in the ``DataCatalog``.
"""

pass
Expand All @@ -94,6 +96,15 @@ class VersionNotFoundError(DatasetError):
pass


class VersionAlreadyExistsError(DatasetError):
"""``VersionAlreadyExistsError`` raised by ``DataCatalog`` and ``KedroDataCatalog``
classes when attempting to add a dataset to a catalog with a save version
that conflicts with the save version already set for the catalog.
"""

pass


_DI = TypeVar("_DI")
_DO = TypeVar("_DO")

Expand Down Expand Up @@ -148,6 +159,11 @@ class AbstractDataset(abc.ABC, Generic[_DI, _DO]):
need to change the `_EPHEMERAL` attribute to 'True'.
"""
_EPHEMERAL = False
_init_args: dict[str, Any] | None = None
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self, call_args: dict[str, Any]) -> None:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
self._init_args = call_args
self._init_args.pop("self", None)

@classmethod
def from_config(
Expand Down Expand Up @@ -201,6 +217,40 @@ def from_config(
) from err
return dataset

def to_config(self) -> dict[str, Any]:
return_config: dict[str, Any] = {
f"{TYPE_KEY}": f"{type(self).__module__}.{type(self).__name__}"
}

if self._init_args:
return_config.update(self._init_args)

if type(self).__name__ == "CachedDataset":
cached_ds = return_config.pop("dataset")
cached_ds_return_config: dict[str, Any] = {}
if isinstance(cached_ds, dict):
cached_ds_return_config = cached_ds
elif isinstance(cached_ds, AbstractDataset):
cached_ds_return_config = cached_ds.to_config()
if VERSIONED_FLAG_KEY in cached_ds_return_config:
return_config[VERSIONED_FLAG_KEY] = cached_ds_return_config.pop(
VERSIONED_FLAG_KEY
)
return_config["dataset"] = cached_ds_return_config

# Set `versioned` key if version present in the dataset
if return_config.pop(VERSION_KEY, None):
return_config[VERSIONED_FLAG_KEY] = True

# Pop data from configuration
if type(self).__name__ == "MemoryDataset":
return_config.pop("data", None)

# Pop metadata from configuration
return_config.pop("metadata", None)

return return_config

@property
def _logger(self) -> logging.Logger:
return logging.getLogger(__name__)
Expand Down Expand Up @@ -286,6 +336,20 @@ def __init_subclass__(cls, **kwargs: Any) -> None:
If `_load` or `_save` are defined, alias them as a prerequisite.

"""

init_func: Callable = cls.__init__

def init_decorator(previous_init: Callable) -> Callable:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
def new_init(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
previous_init(self, *args, **kwargs)
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
if type(self) is cls:
call_args = getcallargs(init_func, self, *args, **kwargs)
self.__post_init__(call_args)
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved

return new_init

cls.__init__ = init_decorator(cls.__init__) # type: ignore[method-assign]
astrojuanlu marked this conversation as resolved.
Show resolved Hide resolved

super().__init_subclass__(**kwargs)

if hasattr(cls, "_load") and not cls._load.__qualname__.startswith("Abstract"):
Expand Down Expand Up @@ -484,14 +548,14 @@ def parse_dataset_definition(
config = copy.deepcopy(config)

# TODO: remove when removing old catalog as moved to KedroDataCatalog
if "type" not in config:
if TYPE_KEY not in config:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
raise DatasetError(
"'type' is missing from dataset catalog configuration."
"\nHint: If this catalog entry is intended for variable interpolation, "
"make sure that the top level key is preceded by an underscore."
)

dataset_type = config.pop("type")
dataset_type = config.pop(TYPE_KEY)
class_obj = None
if isinstance(dataset_type, str):
if len(dataset_type.strip(".")) != len(dataset_type):
Expand Down Expand Up @@ -955,3 +1019,57 @@ def confirm(self, name: str) -> None:
def shallow_copy(self, extra_dataset_patterns: Patterns | None = None) -> _C:
"""Returns a shallow copy of the current object."""
...


def _is_parameter(dataset_name: str) -> bool:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
# TODO: when breaking change replace with is_parameter and remove is_parameter from kedro/framework/cli/catalog.py
"""Check if dataset is a parameter."""
return dataset_name.startswith("params:") or dataset_name == "parameters"


def _validate_versions(
datasets: dict[str, AbstractDataset] | None,
load_versions: dict[str, str],
save_version: str | None,
) -> tuple[dict[str, str], str | None]:
"""Validates and synchronizes dataset versions for loading and saving.

Insures consistency of dataset versions across a catalog, particularly
for versioned datasets. It updates load versions and validates that all
save versions are consistent.

Args:
datasets: A dictionary mapping dataset names to their instances.
if None, no validation occurs.
load_versions: A mapping between dataset names and versions
to load.
save_version: Version string to be used for ``save`` operations
by all datasets with enabled versioning.

Returns:
Updated ``load_versions`` with load versions specified in the ``datasets``
and resolved ``save_version``.

Raises:
VersionAlreadyExistsError: If a dataset's save version conflicts with
the catalog's save version.
"""
if not datasets:
return load_versions, save_version

cur_save_version = save_version
cur_load_versions = load_versions
for ds_name, ds in datasets.items():
if isinstance(ds, AbstractVersionedDataset) and ds._version:
if ds._version.load:
cur_load_versions[ds_name] = ds._version.load
if ds._version.save:
cur_save_version = cur_save_version or ds._version.save
if cur_save_version != ds._version.save:
raise VersionAlreadyExistsError(
f"Cannot add a dataset `{ds_name}` with `{ds._version.save}` save version. "
f"Save version set for the catalog is `{cur_save_version}`"
f"All datasets in the catalog must have the same Save version."
)

return cur_load_versions, cur_save_version
12 changes: 8 additions & 4 deletions kedro/io/data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
DatasetError,
DatasetNotFoundError,
Version,
_validate_versions,
generate_timestamp,
)
from kedro.io.memory_dataset import MemoryDataset
Expand Down Expand Up @@ -160,20 +161,20 @@ def __init__( # noqa: PLR0913
>>> catalog = DataCatalog(datasets={'cars': cars})
"""
self._config_resolver = config_resolver or CatalogConfigResolver()

# Kept to avoid breaking changes
if not config_resolver:
self._config_resolver._dataset_patterns = dataset_patterns or {}
self._config_resolver._default_pattern = default_pattern or {}

self._load_versions, self._save_version = _validate_versions(
datasets, load_versions or {}, save_version
)

self._datasets: dict[str, AbstractDataset] = {}
self.datasets: _FrozenDatasets | None = None

self.add_all(datasets or {})

self._load_versions = load_versions or {}
self._save_version = save_version

self._use_rich_markup = _has_rich_handler()

if feed_dict:
Expand Down Expand Up @@ -506,6 +507,9 @@ def add(
raise DatasetAlreadyExistsError(
f"Dataset '{dataset_name}' has already been registered"
)
self._load_versions, self._save_version = _validate_versions(
{dataset_name: dataset}, self._load_versions, self._save_version
)
self._datasets[dataset_name] = dataset
self.datasets = _FrozenDatasets(self.datasets, {dataset_name: dataset})

Expand Down
47 changes: 44 additions & 3 deletions kedro/io/kedro_data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
DatasetError,
DatasetNotFoundError,
Version,
_is_parameter,
_validate_versions,
generate_timestamp,
)
from kedro.io.memory_dataset import MemoryDataset
Expand Down Expand Up @@ -96,10 +98,11 @@ def __init__(
>>> catalog = KedroDataCatalog(datasets={"cars": cars})
"""
self._config_resolver = config_resolver or CatalogConfigResolver()
self._datasets = datasets or {}
self._datasets: dict[str, AbstractDataset] = datasets or {}
self._lazy_datasets: dict[str, _LazyDataset] = {}
self._load_versions = load_versions or {}
self._save_version = save_version
self._load_versions, self._save_version = _validate_versions(
datasets, load_versions or {}, save_version
)

self._use_rich_markup = _has_rich_handler()

Expand Down Expand Up @@ -218,6 +221,9 @@ def __setitem__(self, key: str, value: Any) -> None:
if key in self._datasets:
self._logger.warning("Replacing dataset '%s'", key)
if isinstance(value, AbstractDataset):
self._load_versions, self._save_version = _validate_versions(
{key: value}, self._load_versions, self._save_version
)
self._datasets[key] = value
elif isinstance(value, _LazyDataset):
self._lazy_datasets[key] = value
Expand Down Expand Up @@ -364,6 +370,41 @@ class to be loaded is specified with the key ``type`` and their
config_resolver=config_resolver,
)

def to_config(
self,
) -> tuple[
dict[str, dict[str, Any]],
dict[str, dict[str, Any]],
dict[str, str | None],
str | None,
]:
catalog: dict[str, dict[str, Any]] = {}
credentials: dict[str, dict[str, Any]] = {}
load_versions: dict[str, str | None] = {}

for ds_name, ds in self._lazy_datasets.items():
if _is_parameter(ds_name):
continue
unresolved_config, unresolved_credentials = (
self._config_resolver.unresolve_credentials(ds_name, ds.config)
)
catalog[ds_name] = unresolved_config
credentials.update(unresolved_credentials)
load_versions[ds_name] = self._load_versions.get(ds_name, None)

for ds_name, ds in self._datasets.items(): # type: ignore[assignment]
if _is_parameter(ds_name):
continue
resolved_config = ds.to_config() # type: ignore[attr-defined]
unresolved_config, unresolved_credentials = (
self._config_resolver.unresolve_credentials(ds_name, resolved_config)
)
catalog[ds_name] = unresolved_config
credentials.update(unresolved_credentials)
load_versions[ds_name] = self._load_versions.get(ds_name, None)

return catalog, credentials, load_versions, self._save_version

@staticmethod
def _validate_dataset_config(ds_name: str, ds_config: Any) -> None:
if not isinstance(ds_config, dict):
Expand Down
Loading
Loading