Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata attribute to kedro.io datasets #2537

Merged
merged 15 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kedro/io/cached_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
dataset: Union[AbstractDataSet, Dict],
version: Version = None,
copy_mode: str = None,
metadata: Dict[str, Any] = None,
):
"""Creates a new instance of ``CachedDataSet`` pointing to the
provided Python object.
Expand All @@ -50,6 +51,8 @@ def __init__(
copy_mode: The copy mode used to copy the data. Possible
values are: "deepcopy", "copy" and "assign". If not
provided, it is inferred based on the data type.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:
ValueError: If the provided dataset is not a valid dict/YAML
Expand All @@ -65,6 +68,7 @@ def __init__(
"representation of the dataset, or the actual dataset object."
)
self._cache = MemoryDataSet(copy_mode=copy_mode)
self.metadata = metadata

def _release(self) -> None:
self._cache.release()
Expand Down
5 changes: 5 additions & 0 deletions kedro/io/lambda_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ def _release(self) -> None:
else:
self.__release()

# pylint: disable=too-many-arguments
def __init__(
self,
load: Optional[Callable[[], Any]],
save: Optional[Callable[[Any], None]],
exists: Callable[[], bool] = None,
release: Callable[[], None] = None,
metadata: Dict[str, Any] = None,
):
"""Creates a new instance of ``LambdaDataSet`` with references to the
required input/output data set methods.
Expand All @@ -89,6 +91,8 @@ def __init__(
save: Method to save data to a data set.
exists: Method to check whether output data already exists.
release: Method to release any cached information.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:
DataSetError: If a method is specified, but is not a Callable.
Expand All @@ -111,3 +115,4 @@ def __init__(
self.__save = save
self.__exists = exists
self.__release = release
self.metadata = metadata
merelcht marked this conversation as resolved.
Show resolved Hide resolved
7 changes: 6 additions & 1 deletion kedro/io/memory_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class MemoryDataSet(AbstractDataSet):

"""

def __init__(self, data: Any = _EMPTY, copy_mode: str = None):
def __init__(
self, data: Any = _EMPTY, copy_mode: str = None, metadata: Dict[str, Any] = None
):
"""Creates a new instance of ``MemoryDataSet`` pointing to the
provided Python object.

Expand All @@ -42,9 +44,12 @@ def __init__(self, data: Any = _EMPTY, copy_mode: str = None):
copy_mode: The copy mode used to copy the data. Possible
values are: "deepcopy", "copy" and "assign". If not
provided, it is inferred based on the data type.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
self._data = _EMPTY
self._copy_mode = copy_mode
self.metadata = metadata
if data is not _EMPTY:
self._save(data)

Expand Down
10 changes: 9 additions & 1 deletion kedro/io/partitioned_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def __init__( # pylint: disable=too-many-arguments
load_args: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
overwrite: bool = False,
metadata: Dict[str, Any] = None,
):
"""Creates a new instance of ``PartitionedDataSet``.

Expand Down Expand Up @@ -177,6 +178,8 @@ def __init__( # pylint: disable=too-many-arguments
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``)
overwrite: If True, any existing partitions will be removed.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:
DataSetError: If versioning is enabled for the underlying dataset.
Expand All @@ -190,7 +193,8 @@ def __init__( # pylint: disable=too-many-arguments
self._filename_suffix = filename_suffix
self._overwrite = overwrite
self._protocol = infer_storage_options(self._path)["protocol"]
self._partition_cache: Cache = Cache(maxsize=1)
self._partition_cache = Cache(maxsize=1)
self.metadata = metadata

dataset = dataset if isinstance(dataset, dict) else {"type": dataset}
self._dataset_type, self._dataset_config = parse_dataset_definition(dataset)
Expand Down Expand Up @@ -381,6 +385,7 @@ def __init__(
credentials: Dict[str, Any] = None,
load_args: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
metadata: Dict[str, Any] = None,
):

"""Creates a new instance of ``IncrementalDataSet``.
Expand Down Expand Up @@ -427,6 +432,8 @@ def __init__(
the filesystem implementation.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:
DataSetError: If versioning is enabled for the underlying dataset.
Expand All @@ -444,6 +451,7 @@ def __init__(

self._checkpoint_config = self._parse_checkpoint_config(checkpoint)
self._force_checkpoint = self._checkpoint_config.pop("force_checkpoint", None)
self.metadata = metadata

comparison_func = self._checkpoint_config.pop("comparison_func", operator.gt)
if isinstance(comparison_func, str):
Expand Down