-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataCatalog2.0]: KedroDataCatalog
with dict interface
#4218
Changes from 209 commits
a8f4fb3
7d56818
787e121
0b80f23
5c727df
05c9171
5c804d6
e9ba5c4
530f7d6
64be83c
c29828a
957403a
14908ff
c5e925b
b9a92b0
2cb794f
2f32593
d1ea64e
fb89fca
4486939
c667645
be8e929
ec7ac39
8e23450
529e61a
e4cb21c
50bc816
6dfbcb0
9346f08
9568e29
86efdfe
5e27660
72b11d0
f0a4090
a4da52a
7d6227f
4092291
63e47f9
85bf720
68f6527
2ac4a2f
cb5879d
9038e96
cc89565
59b6764
4f5a3fb
12ed6f2
a106cec
6df04f7
8566e27
2dcea33
a46597f
3787545
68d612d
af5bee9
acc4d6e
e67ff0f
7b3afa2
658a759
7be2a8e
09f3f26
9e43a9a
b28a9bf
c9f3469
49a3b27
25b6501
3a646de
5e5df4a
aa59a35
c7efa3e
023ffc6
6971779
d57a567
585b44f
2769def
e447078
beb0165
975e968
11d782c
e4abd23
5f105de
f9cb9c6
31a9484
ced1b7a
7f9b576
a3828d9
16610c4
321affe
ff25405
d0000c0
7f5ddec
e030bb6
6433dd8
355576f
39d9ff6
659c9da
840b32a
77f551c
6e079a1
1f7e5f8
80f0e3d
017cda3
cab6f06
ac1ecc0
e955930
a07f3d4
4ecb826
2b9be66
fb3831b
8f604d1
9a4db18
0a6946a
fee7bd6
f5a7992
1c981f3
6128be7
8c91d0e
18d2ba0
d48c6d3
45ce6bc
6ca972f
fdce5ea
3029963
0150a21
0833a84
95ccb3c
07908a8
25a6fcf
07f8c12
3a1a0f2
cf663a0
caa7316
9540a32
0ac154d
0ec1f23
96d4576
4ecd8fd
11b3426
741b682
88ba38b
0020095
78feb51
6bf912c
c7699ec
bcd2d37
eb7e8f5
7348c12
5aee9e9
c9c7c9a
c66df33
2f1dcbd
4b8d90c
8f870a8
70dc177
ae7a271
135cb0e
ca4867c
4745f71
033a0b7
e74ffda
00af3ec
2de7ccb
8affed6
a52672e
84f249c
2548119
ac124e3
f62ed03
b65609f
17199ad
44c576e
6d5f094
e24b2a6
c8ef90f
26f3f99
5bbedfa
5ca6b48
3914cca
0e03aa5
643219d
b4ae279
5bdf16b
78900b7
8ea3667
8fd7043
a50fbc9
5b02d05
d69b9eb
3ddc01b
1223f26
f9912ec
02e2c5e
517d770
aa95229
494b4b9
e31ba9c
35e10f3
f031211
9d52ecf
cb93875
dac141d
6b3eb9e
3edb4fb
78af7eb
435bea1
d5b7099
08fa019
6de5fbf
23f3524
87addaa
c93aabb
6650a83
8df7d91
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ | |
import difflib | ||
import logging | ||
import re | ||
from typing import Any | ||
from typing import Any, Iterable, List # noqa: UP035 | ||
|
||
from kedro.io.catalog_config_resolver import CatalogConfigResolver, Patterns | ||
from kedro.io.core import ( | ||
|
@@ -84,10 +84,12 @@ def __init__( | |
|
||
@property | ||
def datasets(self) -> dict[str, Any]: | ||
# TODO: remove when removing old catalog | ||
return copy.copy(self._datasets) | ||
|
||
@datasets.setter | ||
def datasets(self, value: Any) -> None: | ||
# TODO: remove when removing old catalog | ||
raise AttributeError( | ||
"Operation not allowed. Please use KedroDataCatalog.add() instead." | ||
) | ||
|
@@ -112,6 +114,49 @@ def __eq__(self, other) -> bool: # type: ignore[no-untyped-def] | |
other.config_resolver.list_patterns(), | ||
) | ||
|
||
def keys(self) -> List[str]: # noqa: UP006 | ||
return list(self.__iter__()) | ||
|
||
def values(self) -> List[AbstractDataset]: # noqa: UP006 | ||
return [self._datasets[key] for key in self.__iter__()] | ||
|
||
def items(self) -> List[tuple[str, AbstractDataset]]: # noqa: UP006 | ||
return [(key, self._datasets[key]) for key in self.__iter__()] | ||
|
||
def __iter__(self) -> Iterable[str]: | ||
yield from self._datasets.keys() | ||
|
||
def __getitem__(self, ds_name: str) -> AbstractDataset: | ||
return self.get_dataset(ds_name) | ||
|
||
def __setitem__(self, key: str, value: Any) -> None: | ||
if key in self._datasets: | ||
self._logger.warning("Replacing dataset '%s'", key) | ||
if isinstance(value, AbstractDataset): | ||
self._datasets[key] = value | ||
else: | ||
self._logger.info(f"Adding input data as a MemoryDataset - {key}") | ||
self._datasets[key] = MemoryDataset(data=value) # type: ignore[abstract] | ||
ElenaKhaustova marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def __len__(self) -> int: | ||
return len(self.keys()) | ||
|
||
def get( | ||
self, key: str, default: AbstractDataset | None = None | ||
) -> AbstractDataset | None: | ||
"""Get a dataset by name from an internal collection of datasets.""" | ||
if key not in self._datasets: | ||
ds_config = self._config_resolver.resolve_pattern(key) | ||
if ds_config: | ||
self._add_from_config(key, ds_config) | ||
|
||
dataset = self._datasets.get(key, None) | ||
|
||
return dataset or default | ||
|
||
def _ipython_key_completions_(self) -> list[str]: | ||
return list(self._datasets.keys()) | ||
|
||
@property | ||
def _logger(self) -> logging.Logger: | ||
return logging.getLogger(__name__) | ||
|
@@ -178,6 +223,7 @@ def _add_from_config(self, ds_name: str, ds_config: dict[str, Any]) -> None: | |
def get_dataset( | ||
self, ds_name: str, version: Version | None = None, suggest: bool = True | ||
) -> AbstractDataset: | ||
# TODO: remove when removing old catalog | ||
"""Get a dataset by name from an internal collection of datasets. | ||
|
||
If a dataset is not in the collection but matches any pattern | ||
|
@@ -197,12 +243,7 @@ def get_dataset( | |
DatasetNotFoundError: When a dataset with the given name | ||
is not in the collection and do not match patterns. | ||
""" | ||
if ds_name not in self._datasets: | ||
ds_config = self._config_resolver.resolve_pattern(ds_name) | ||
if ds_config: | ||
self._add_from_config(ds_name, ds_config) | ||
|
||
dataset = self._datasets.get(ds_name, None) | ||
dataset = self.get(ds_name) | ||
|
||
if dataset is None: | ||
error_msg = f"Dataset '{ds_name}' not found in the catalog" | ||
|
@@ -231,40 +272,77 @@ def _get_dataset( | |
def add( | ||
ElenaKhaustova marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self, ds_name: str, dataset: AbstractDataset, replace: bool = False | ||
) -> None: | ||
# TODO: remove when removing old catalog | ||
"""Adds a new ``AbstractDataset`` object to the ``KedroDataCatalog``.""" | ||
if ds_name in self._datasets: | ||
if replace: | ||
self._logger.warning("Replacing dataset '%s'", ds_name) | ||
else: | ||
raise DatasetAlreadyExistsError( | ||
f"Dataset '{ds_name}' has already been registered" | ||
) | ||
self._datasets[ds_name] = dataset | ||
|
||
def list(self, regex_search: str | None = None) -> list[str]: | ||
if ds_name in self._datasets and not replace: | ||
raise DatasetAlreadyExistsError( | ||
f"Dataset '{ds_name}' has already been registered" | ||
) | ||
self.__setitem__(ds_name, dataset) | ||
|
||
def list(self, regex_search: str | None = None) -> List[str]: # noqa: UP006 | ||
# TODO: remove when removing old catalog | ||
""" | ||
List of all dataset names registered in the catalog. | ||
This can be filtered by providing an optional regular expression | ||
which will only return matching keys. | ||
""" | ||
|
||
if regex_search is None: | ||
return list(self._datasets.keys()) | ||
return self.keys() | ||
|
||
if not regex_search.strip(): | ||
if regex_search == "": | ||
self._logger.warning("The empty string will not match any datasets") | ||
return [] | ||
|
||
return self.filter(regex_search) | ||
|
||
def filter( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While I am usually in favour of simpler names, here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I quite disagree with this one, because I personally find There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Galileo-Galilei we agreed to rename it based on the solution for #3917, so we don't rename it several times. That's why we kept the old name for now. |
||
self, regex_search: str, regex_flags: int | re.RegexFlag = 0 | ||
) -> List[str]: # noqa: UP006 | ||
""" | ||
Filter dataset names registered in the catalog. | ||
""" | ||
if not regex_flags: | ||
regex_flags = re.IGNORECASE | ||
|
||
try: | ||
pattern = re.compile(regex_search, flags=re.IGNORECASE) | ||
pattern = re.compile(regex_search, flags=regex_flags) | ||
except re.error as exc: | ||
raise SyntaxError( | ||
f"Invalid regular expression provided: '{regex_search}'" | ||
) from exc | ||
return [ds_name for ds_name in self._datasets if pattern.search(ds_name)] | ||
return [ds_name for ds_name in self.__iter__() if pattern.search(ds_name)] | ||
|
||
def save(self, name: str, data: Any) -> None: | ||
"""Save data to a registered dataset.""" | ||
# TODO: rename input argument when breaking change: name -> ds_name | ||
ElenaKhaustova marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Save data to a registered dataset. | ||
|
||
Args: | ||
name: A dataset to be saved to. | ||
data: A data object to be saved as configured in the registered | ||
dataset. | ||
|
||
Raises: | ||
DatasetNotFoundError: When a dataset with the given name | ||
has not yet been registered. | ||
|
||
Example: | ||
:: | ||
|
||
>>> import pandas as pd | ||
>>> | ||
>>> from kedro_datasets.pandas import CSVDataset | ||
>>> | ||
>>> cars = CSVDataset(filepath="cars.csv", | ||
>>> load_args=None, | ||
>>> save_args={"index": False}) | ||
>>> catalog = DataCatalog(datasets={'cars': cars}) | ||
>>> | ||
>>> df = pd.DataFrame({'col1': [1, 2], | ||
>>> 'col2': [4, 5], | ||
>>> 'col3': [5, 6]}) | ||
>>> catalog.save("cars", df) | ||
""" | ||
dataset = self.get_dataset(name) | ||
|
||
self._logger.info( | ||
|
@@ -277,7 +355,35 @@ def save(self, name: str, data: Any) -> None: | |
dataset.save(data) | ||
|
||
def load(self, name: str, version: str | None = None) -> Any: | ||
"""Loads a registered dataset.""" | ||
# TODO: rename input argument when breaking change: name -> ds_name | ||
ElenaKhaustova marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# TODO: remove version from input arguments when breaking change | ||
ElenaKhaustova marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Loads a registered dataset. | ||
|
||
Args: | ||
name: A dataset to be loaded. | ||
version: Optional argument for concrete data version to be loaded. | ||
Works only with versioned datasets. | ||
|
||
Returns: | ||
The loaded data as configured. | ||
|
||
Raises: | ||
DatasetNotFoundError: When a dataset with the given name | ||
has not yet been registered. | ||
|
||
Example: | ||
:: | ||
|
||
>>> from kedro.io import DataCatalog | ||
>>> from kedro_datasets.pandas import CSVDataset | ||
>>> | ||
>>> cars = CSVDataset(filepath="cars.csv", | ||
>>> load_args=None, | ||
>>> save_args={"index": False}) | ||
>>> catalog = DataCatalog(datasets={'cars': cars}) | ||
>>> | ||
>>> df = catalog.load("cars") | ||
""" | ||
load_version = Version(version, None) if version else None | ||
dataset = self.get_dataset(name, version=load_version) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it for iteration cases, like