From e3168db60f16c752ae005ffa5af72d68f5de6034 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 6 Aug 2024 11:35:01 +0100 Subject: [PATCH 01/14] Refarctored catalog resolve Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 58 +++++++++++++++++++++++++++++++ kedro/io/data_catalog_redesign.py | 14 +++++--- 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index a0f6b413bd..e8dc160e80 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -17,6 +17,7 @@ from kedro.framework.startup import ProjectMetadata from kedro.io import AbstractDataset from kedro.io.data_catalog import DataCatalog +from kedro.io.data_catalog_redesign import KedroDataCatalog def _create_session(package_name: str, **kwargs: Any) -> KedroSession: @@ -227,6 +228,63 @@ def rank_catalog_factories(metadata: ProjectMetadata, env: str) -> None: click.echo("There are no dataset factories in the catalog.") +@catalog.command("resolve_new") +@env_option +@click.pass_obj +def resolve_patterns_new(metadata: ProjectMetadata, env: str) -> None: + """Resolve catalog factories against pipeline datasets. Note that this command is runner + agnostic and thus won't take into account any default dataset creation defined in the runner.""" + + session = _create_session(metadata.package_name, env=env) + context = session.load_context() + + catalog_config = context.config_loader["catalog"] + credentials_config = context.config_loader.get("credentials", None) + + data_catalog = KedroDataCatalog( + config=catalog_config, credentials=credentials_config + ) + + target_pipelines = pipelines.keys() + datasets = set() + for pipe in target_pipelines: + pl_obj = pipelines.get(pipe) + if pl_obj: + datasets.update(pl_obj.datasets()) + + datasets_lst = [ + ds_name + for ds_name in datasets + if not (ds_name.startswith("params:") or ds_name == "parameters") + and ( + ds_name in data_catalog.resolved_ds_configs + or data_catalog.match_pattern(ds_name) + ) + ] + + # Add datasets from catalog that are not used in target pipelines + for ds_name in data_catalog.resolved_ds_configs: + if not (ds_name in datasets or data_catalog.match_pattern(ds_name)): + datasets_lst.append(ds_name) + + resolved_configs = data_catalog.resolve_patterns(datasets_lst) + + # print("-" * 50) + # print(datasets_lst) + # print("-" * 50) + # print(resolved_configs) + # print("-" * 50) + + secho( + yaml.dump( + { + ds_name: ds_config + for ds_name, ds_config in zip(datasets_lst, resolved_configs) + } + ) + ) + + @catalog.command("resolve") @env_option @click.pass_obj diff --git a/kedro/io/data_catalog_redesign.py b/kedro/io/data_catalog_redesign.py index 346e422822..37150d3b1c 100644 --- a/kedro/io/data_catalog_redesign.py +++ b/kedro/io/data_catalog_redesign.py @@ -158,12 +158,11 @@ def _is_pattern(pattern: str) -> bool: """Check if a given string is a pattern. Assume that any name with '{' is a pattern.""" return "{" in pattern - @staticmethod - def _match_pattern(dataset_patterns: Patterns, dataset_name: str) -> str | None: + def match_pattern(self, dataset_name: str) -> str | None: """Match a dataset name against patterns in a dictionary.""" matches = ( pattern - for pattern in dataset_patterns.keys() + for pattern in self._dataset_patterns.keys() if parse(pattern, dataset_name) ) return next(matches, None) @@ -269,7 +268,7 @@ def resolve_patterns( resolved_configs = [] for ds_name in datasets_lst: - matched_pattern = self._match_pattern(self._dataset_patterns, ds_name) + matched_pattern = self.match_pattern(ds_name) if matched_pattern and ds_name not in self.datasets: # If the dataset is a patterned dataset, materialise it and add it to # the catalog @@ -345,12 +344,17 @@ def __init__( # noqa: PLR0913 super().__init__(datasets, config, credentials) # print(self.datasets) + # print("-") # print(self.resolved_ds_configs) + # print("-") + # print(self._dataset_patterns) + # print("-") + # print(self._default_pattern) missing_keys = [ key for key in self._load_versions.keys() - if not (key in config or self._match_pattern(self._dataset_patterns, key)) + if not (key in config or self.match_pattern(key)) ] if missing_keys: raise DatasetNotFoundError( From 54c6e9ab80fc118af118dd0b0a765730b003eb8f Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 6 Aug 2024 11:53:07 +0100 Subject: [PATCH 02/14] Removed debug output Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index e8dc160e80..2486e03fba 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -255,7 +255,9 @@ def resolve_patterns_new(metadata: ProjectMetadata, env: str) -> None: datasets_lst = [ ds_name for ds_name in datasets + # Excluding parameters if not (ds_name.startswith("params:") or ds_name == "parameters") + # Excluding free outputs and ( ds_name in data_catalog.resolved_ds_configs or data_catalog.match_pattern(ds_name) @@ -269,12 +271,6 @@ def resolve_patterns_new(metadata: ProjectMetadata, env: str) -> None: resolved_configs = data_catalog.resolve_patterns(datasets_lst) - # print("-" * 50) - # print(datasets_lst) - # print("-" * 50) - # print(resolved_configs) - # print("-" * 50) - secho( yaml.dump( { From eb242849695763a7a9e6e6a1b98bf3409faf14a4 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 6 Aug 2024 15:15:40 +0100 Subject: [PATCH 03/14] Reimplemented catalog rank Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 19 ++++++++++++ kedro/framework/context/context.py | 47 +++++++++++++++++++++++++++++ kedro/framework/hooks/specs.py | 28 ++++++++++++++++- kedro/framework/project/__init__.py | 5 +++ kedro/io/__init__.py | 3 ++ 5 files changed, 101 insertions(+), 1 deletion(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 2486e03fba..27f03853c3 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -210,6 +210,25 @@ def _add_missing_datasets_to_catalog(missing_ds: list[str], catalog_path: Path) yaml.safe_dump(catalog_config, catalog_file, default_flow_style=False) +@catalog.command("rank_new") +@env_option +@click.pass_obj +def rank_catalog_factories_new(metadata: ProjectMetadata, env: str) -> None: + """List all dataset factories in the catalog, ranked by priority by which they are matched.""" + session = _create_session(metadata.package_name, env=env) + context = session.load_context() + data_catalog = context.catalog_new + + catalog_factories = { + **data_catalog._dataset_patterns, + **data_catalog._default_pattern, + } + if catalog_factories: + click.echo(yaml.dump(list(catalog_factories.keys()))) + else: + click.echo("There are no dataset factories in the catalog.") + + @catalog.command("rank") @env_option @click.pass_obj diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index ddc197b65b..51202b6998 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -15,6 +15,7 @@ from kedro.config import AbstractConfigLoader, MissingConfigException from kedro.framework.project import settings from kedro.io import DataCatalog +from kedro.io.data_catalog_redesign import AbstractDataCatalog, KedroDataCatalog from kedro.pipeline.transcoding import _transcode_split @@ -137,6 +138,12 @@ def _validate_transcoded_datasets(catalog: DataCatalog) -> None: _transcode_split(dataset_name) +def _validate_transcoded_datasets_new(catalog: AbstractDataCatalog) -> None: + """Validates transcoded datasets are correctly named""" + for dataset_name in catalog.datasets.keys(): + _transcode_split(dataset_name) + + def _expand_full_path(project_path: str | Path) -> Path: return Path(project_path).expanduser().resolve() @@ -186,6 +193,10 @@ def catalog(self) -> DataCatalog: """ return self._get_catalog() + @property + def catalog_new(self) -> AbstractDataCatalog: + return self._get_catalog_new() + @property def params(self) -> dict[str, Any]: """Read-only property referring to Kedro's parameters for this context. @@ -206,6 +217,42 @@ def params(self) -> dict[str, Any]: return OmegaConf.to_container(params) if OmegaConf.is_config(params) else params # type: ignore[no-any-return] + def _get_catalog_new( + self, + save_version: str | None = None, + load_versions: dict[str, str] | None = None, + ) -> AbstractDataCatalog: + conf_catalog = self.config_loader["catalog"] + conf_catalog = _convert_paths_to_absolute_posix( + project_path=self.project_path, conf_dictionary=conf_catalog + ) + conf_creds = self._get_config_credentials() + + if isinstance(settings.DATA_CATALOG_CLASS_NEW, KedroDataCatalog): + catalog = settings.DATA_CATALOG_CLASS_NEW( + config=conf_catalog, + credentials=conf_creds, + load_versions=load_versions, + save_version=save_version, + ) + else: + catalog = settings.DATA_CATALOG_CLASS_NEW( + config=conf_catalog, + credentials=conf_creds, + ) + feed_dict = self._get_feed_dict() + catalog.add_from_dict(feed_dict) + _validate_transcoded_datasets_new(catalog) + self._hook_manager.hook.after_catalog_created_new( + catalog=catalog, + conf_catalog=conf_catalog, + conf_creds=conf_creds, + feed_dict=feed_dict, + save_version=save_version, + load_versions=load_versions, + ) + return catalog + def _get_catalog( self, save_version: str | None = None, diff --git a/kedro/framework/hooks/specs.py b/kedro/framework/hooks/specs.py index 14431f0362..68ce2eb7c7 100644 --- a/kedro/framework/hooks/specs.py +++ b/kedro/framework/hooks/specs.py @@ -7,7 +7,7 @@ from typing import Any from kedro.framework.context import KedroContext -from kedro.io import DataCatalog +from kedro.io import AbstractDataCatalog, DataCatalog from kedro.pipeline import Pipeline from kedro.pipeline.node import Node @@ -43,6 +43,32 @@ def after_catalog_created( # noqa: PLR0913 """ pass + @hook_spec + def after_catalog_created_new( # noqa: PLR0913 + self, + catalog: AbstractDataCatalog, + conf_catalog: dict[str, Any], + conf_creds: dict[str, Any], + feed_dict: dict[str, Any], + save_version: str, + load_versions: dict[str, str], + ) -> None: + """Hooks to be invoked after a data catalog is created. + It receives the ``catalog`` as well as + all the arguments for ``KedroContext._create_catalog``. + + Args: + catalog: The catalog that was created. + conf_catalog: The config from which the catalog was created. + conf_creds: The credentials conf from which the catalog was created. + feed_dict: The feed_dict that was added to the catalog after creation. + save_version: The save_version used in ``save`` operations + for all datasets in the catalog. + load_versions: The load_versions used in ``load`` operations + for each dataset in the catalog. + """ + pass + class NodeSpecs: """Namespace that defines all specifications for a node's lifecycle hooks.""" diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index f24b48d17c..b2497deffc 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -115,6 +115,10 @@ class _ProjectSettings(LazySettings): _DATA_CATALOG_CLASS = _IsSubclassValidator( "DATA_CATALOG_CLASS", default=_get_default_class("kedro.io.DataCatalog") ) + _DATA_CATALOG_CLASS_NEW = _IsSubclassValidator( + "DATA_CATALOG_CLASS_NEW", + default=_get_default_class("kedro.io.AbstractDataCatalog"), + ) def __init__(self, *args: Any, **kwargs: Any): kwargs.update( @@ -128,6 +132,7 @@ def __init__(self, *args: Any, **kwargs: Any): self._CONFIG_LOADER_CLASS, self._CONFIG_LOADER_ARGS, self._DATA_CATALOG_CLASS, + self._DATA_CATALOG_CLASS_NEW, ] ) super().__init__(*args, **kwargs) diff --git a/kedro/io/__init__.py b/kedro/io/__init__.py index 7902f866bd..c907a66136 100644 --- a/kedro/io/__init__.py +++ b/kedro/io/__init__.py @@ -13,6 +13,7 @@ Version, ) from .data_catalog import DataCatalog +from .data_catalog_redesign import AbstractDataCatalog, KedroDataCatalog from .lambda_dataset import LambdaDataset from .memory_dataset import MemoryDataset from .shared_memory_dataset import SharedMemoryDataset @@ -29,4 +30,6 @@ "MemoryDataset", "SharedMemoryDataset", "Version", + "AbstractDataCatalog", + "KedroDataCatalog", ] From 45c7c5e131e7e071abcbf855f367314f0da7d78b Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 6 Aug 2024 16:10:41 +0100 Subject: [PATCH 04/14] Reimplemented catalog list Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 90 ++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 27f03853c3..06b044b8c2 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -35,6 +35,80 @@ def catalog() -> None: """Commands for working with catalog.""" +@catalog.command("list_new") +@env_option +@click.option( + "--pipeline", + "-p", + type=str, + default="", + help="Name of the modular pipeline to run. If not set, " + "the project pipeline is run by default.", + callback=split_string, +) +@click.pass_obj +def list_datasets_new(metadata: ProjectMetadata, pipeline: str, env: str) -> None: + """Show datasets per type.""" + title = "Datasets in '{}' pipeline" + not_mentioned = "Datasets not mentioned in pipeline" + mentioned = "Datasets mentioned in pipeline" + factories = "Datasets generated from factories" + + session = _create_session(metadata.package_name, env=env) + context = session.load_context() + + try: + data_catalog = context.catalog_new + datasets_meta = data_catalog.datasets + catalog_ds = set(data_catalog.list()) + except Exception as exc: + raise KedroCliError( + f"Unable to instantiate Kedro Catalog.\nError: {exc}" + ) from exc + + target_pipelines = pipeline or pipelines.keys() + + result = {} + for pipe in target_pipelines: + pl_obj = pipelines.get(pipe) + if pl_obj: + pipeline_ds = pl_obj.datasets() + else: + existing_pls = ", ".join(sorted(pipelines.keys())) + raise KedroCliError( + f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}" + ) + + unused_ds = catalog_ds - pipeline_ds + default_ds = pipeline_ds - catalog_ds + used_ds = catalog_ds - unused_ds + + # resolve any factory datasets in the pipeline + factory_ds_by_type = defaultdict(list) + resolved_configs = data_catalog.resolve_patterns(default_ds) + for ds_name, ds_config in zip(default_ds, resolved_configs): + if data_catalog.match_pattern(ds_name): + factory_ds_by_type[ds_config.get("type", "DefaultDataset")].append( + ds_name + ) + + default_ds = default_ds - set(chain.from_iterable(factory_ds_by_type.values())) + + unused_by_type = _map_type_to_datasets_new(unused_ds, datasets_meta) + used_by_type = _map_type_to_datasets_new(used_ds, datasets_meta) + + if default_ds: + used_by_type["DefaultDataset"].extend(default_ds) + + data = ( + (mentioned, dict(used_by_type)), + (factories, dict(factory_ds_by_type)), + (not_mentioned, dict(unused_by_type)), + ) + result[title.format(pipe)] = {key: value for key, value in data if value} + secho(yaml.dump(result)) + + @catalog.command("list") @env_option @click.option( @@ -134,6 +208,22 @@ def _map_type_to_datasets( return mapping +def _map_type_to_datasets_new( + datasets: set[str], datasets_meta: dict[str, type] +) -> dict: + """Build dictionary with a dataset type as a key and list of + datasets of the specific type as a value. + """ + mapping = defaultdict(list) # type: ignore[var-annotated] + for dataset in datasets: + is_param = dataset.startswith("params:") or dataset == "parameters" + if not is_param: + ds_type = datasets_meta[dataset].__class__.__name__ + if dataset not in mapping[ds_type]: + mapping[ds_type].append(dataset) + return mapping + + @catalog.command("create") @env_option(help="Environment to create Data Catalog YAML file in. Defaults to `base`.") @click.option( From 9248205ad1824f7d6e9db6d554d9ba79813ee1c5 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 6 Aug 2024 17:41:50 +0100 Subject: [PATCH 05/14] Reimplemented catalog create Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 59 ++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 06b044b8c2..6283cdf64b 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -224,6 +224,65 @@ def _map_type_to_datasets_new( return mapping +@catalog.command("create_new") +@env_option(help="Environment to create Data Catalog YAML file in. Defaults to `base`.") +@click.option( + "--pipeline", + "-p", + "pipeline_name", + type=str, + required=True, + help="Name of a pipeline.", +) +@click.pass_obj +def create_catalog_new(metadata: ProjectMetadata, pipeline_name: str, env: str) -> None: + """Create Data Catalog YAML configuration with missing datasets. + + Add ``MemoryDataset`` datasets to Data Catalog YAML configuration + file for each dataset in a registered pipeline if it is missing from + the ``DataCatalog``. + + The catalog configuration will be saved to + `//catalog_.yml` file. + """ + env = env or "base" + session = _create_session(metadata.package_name, env=env) + context = session.load_context() + + pipeline = pipelines.get(pipeline_name) + if not pipeline: + existing_pipelines = ", ".join(sorted(pipelines.keys())) + raise KedroCliError( + f"'{pipeline_name}' pipeline not found! Existing pipelines: {existing_pipelines}" + ) + pipeline_datasets = { + ds_name + for ds_name in pipeline.datasets() + if not ds_name.startswith("params:") and ds_name != "parameters" + } + + data_catalog = context.catalog_new + catalog_datasets = { + ds_name + for ds_name in data_catalog.datasets.keys() + if not ds_name.startswith("params:") and ds_name != "parameters" + } + + # Datasets that are missing in Data Catalog + missing_ds = sorted(pipeline_datasets - catalog_datasets) + if missing_ds: + catalog_path = ( + context.project_path + / settings.CONF_SOURCE + / env + / f"catalog_{pipeline_name}.yml" + ) + _add_missing_datasets_to_catalog(missing_ds, catalog_path) + click.echo(f"Data Catalog YAML configuration was created: {catalog_path}") + else: + click.echo("All datasets are already configured.") + + @catalog.command("create") @env_option(help="Environment to create Data Catalog YAML file in. Defaults to `base`.") @click.option( From 29412f2b297cf6fc03a7ce562f78228231532882 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 7 Aug 2024 11:42:27 +0100 Subject: [PATCH 06/14] Added datasets and resolved_ds_configs properties Signed-off-by: Elena Khaustova --- kedro/io/data_catalog_redesign.py | 68 ++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/kedro/io/data_catalog_redesign.py b/kedro/io/data_catalog_redesign.py index 7c79ef7b69..a9eef7c4bb 100644 --- a/kedro/io/data_catalog_redesign.py +++ b/kedro/io/data_catalog_redesign.py @@ -117,15 +117,15 @@ def __init__( config: dict[str, dict[str, Any]] | None = None, credentials: dict[str, dict[str, Any]] | None = None, ) -> None: - self.config = config or {} - self.resolved_ds_configs = {} - self.datasets = datasets or {} + self._config = config or {} + self._resolved_ds_configs = {} + self._datasets = datasets or {} self._dataset_patterns = {} self._default_pattern = {} if datasets: for ds_name in datasets: - self.resolved_ds_configs[ds_name] = {} + self._resolved_ds_configs[ds_name] = {} if config: self._dataset_patterns, self._default_pattern = self._get_patterns( @@ -134,8 +134,28 @@ def __init__( self._update_ds_configs(config, credentials) self._init_datasets(config, credentials) + @property + def datasets(self): + return copy.deepcopy(self._datasets) + + @datasets.setter + def datasets(self, value: Any): + msg = "Operation not allowed! " + # TODO: specify message based on the case + raise AttributeError(msg) + + @property + def resolved_ds_configs(self): + return copy.deepcopy(self._resolved_ds_configs) + + @resolved_ds_configs.setter + def resolved_ds_configs(self, value: Any): + msg = "Operation not allowed! " + # TODO: specify message based on the case + raise AttributeError(msg) + def __iter__(self): - yield from self.datasets.values() + yield from self._datasets.values() def _update_ds_configs( self, @@ -146,11 +166,11 @@ def _update_ds_configs( credentials = copy.deepcopy(credentials) or {} for ds_name, ds_config in config.items(): if ds_name in self._dataset_patterns: - self.resolved_ds_configs[ds_name] = _resolve_config( + self._resolved_ds_configs[ds_name] = _resolve_config( ds_name, ds_name, self._dataset_patterns[ds_name] ) else: - self.resolved_ds_configs[ds_name] = _resolve_config( + self._resolved_ds_configs[ds_name] = _resolve_config( ds_name, ds_name, _resolve_credentials(ds_config, credentials) ) @@ -262,7 +282,7 @@ def resolve_patterns( for ds_name in datasets_lst: matched_pattern = self.match_pattern(ds_name) - if matched_pattern and ds_name not in self.datasets: + if matched_pattern and ds_name not in self._datasets: # If the dataset is a patterned dataset, materialise it and add it to # the catalog config_copy = copy.deepcopy( @@ -283,8 +303,8 @@ def resolve_patterns( ds_name, ) resolved_configs.append(ds_config) - elif ds_name in self.datasets: - resolved_configs.append(self.resolved_ds_configs.get(ds_name, {})) + elif ds_name in self._datasets: + resolved_configs.append(self._resolved_ds_configs.get(ds_name, {})) else: resolved_configs.append(None) @@ -302,16 +322,16 @@ def get_dataset(self, ds_name: str, suggest: bool = True) -> Any: # Flag to turn on/off fuzzy-matching which can be time consuming and # slow down plugins like `kedro-viz` if suggest: - matches = difflib.get_close_matches(ds_name, self.datasets.keys()) + matches = difflib.get_close_matches(ds_name, self._datasets.keys()) if matches: suggestions = ", ".join(matches) error_msg += f" - did you mean one of these instead: {suggestions}" raise DatasetNotFoundError(error_msg) - elif ds_name not in self.datasets: + elif ds_name not in self._datasets: self._init_dataset(ds_name, ds_config) - self.resolved_ds_configs[ds_name] = ds_config + self._resolved_ds_configs[ds_name] = ds_config - return self.datasets[ds_name] + return self._datasets[ds_name] @abc.abstractmethod def add_from_dict(self, datasets: dict[str, Any], **kwargs) -> None: @@ -322,12 +342,12 @@ def add_from_dict(self, datasets: dict[str, Any], **kwargs) -> None: def add(self, dataset_name: str, dataset: Any, **kwargs) -> None: """Adds a new ``AbstractDataset`` object to the ``DataCatalog``.""" - if dataset_name in self.datasets: + if dataset_name in self._datasets: raise DatasetAlreadyExistsError( f"Dataset '{dataset_name}' has already been registered" ) - self.datasets[dataset_name] = dataset - self.resolved_ds_configs[dataset_name] = {} + self._datasets[dataset_name] = dataset + self._resolved_ds_configs[dataset_name] = {} @property def _logger(self) -> logging.Logger: @@ -341,7 +361,7 @@ def list(self, regex_search: str | None = None) -> list[str]: """ if regex_search is None: - return list(self.datasets.keys()) + return list(self._datasets.keys()) if not regex_search.strip(): self._logger.warning("The empty string will not match any data sets") @@ -354,7 +374,7 @@ def list(self, regex_search: str | None = None) -> list[str]: raise SyntaxError( f"Invalid regular expression provided: '{regex_search}'" ) from exc - return [ds_name for ds_name in self.datasets if pattern.search(ds_name)] + return [ds_name for ds_name in self._datasets if pattern.search(ds_name)] class KedroDataCatalog(AbstractDataCatalog): @@ -377,7 +397,7 @@ def _validate_missing_keys(self) -> None: missing_keys = [ key for key in self._load_versions.keys() - if not (key in self.config or self.match_pattern(key)) + if not (key in self._config or self.match_pattern(key)) ] if missing_keys: raise DatasetNotFoundError( @@ -386,7 +406,7 @@ def _validate_missing_keys(self) -> None: ) def _init_dataset(self, ds_name: str, config: dict[str, Any]): - self.datasets[ds_name] = AbstractDataset.from_config( + self._datasets[ds_name] = AbstractDataset.from_config( ds_name, config, self._load_versions.get(ds_name), @@ -409,15 +429,15 @@ def add( self, dataset_name: str, dataset: AbstractDataset, replace: bool = False ) -> None: """Adds a new ``AbstractDataset`` object to the ``DataCatalog``.""" - if dataset_name in self.datasets: + if dataset_name in self._datasets: if replace: self._logger.warning("Replacing dataset '%s'", dataset_name) else: raise DatasetAlreadyExistsError( f"Dataset '{dataset_name}' has already been registered" ) - self.datasets[dataset_name] = dataset - self.resolved_ds_configs[dataset_name] = {} + self._datasets[dataset_name] = dataset + self._resolved_ds_configs[dataset_name] = {} def add_from_dict(self, datasets: dict[str, Any], replace: bool = False) -> None: for ds_name in datasets: From a527442c0924dfd81727763401cca2ea23b90c04 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 7 Aug 2024 12:25:04 +0100 Subject: [PATCH 07/14] Removed TODOs Signed-off-by: Elena Khaustova --- kedro/io/data_catalog_redesign.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/kedro/io/data_catalog_redesign.py b/kedro/io/data_catalog_redesign.py index a9eef7c4bb..5d2f629cc2 100644 --- a/kedro/io/data_catalog_redesign.py +++ b/kedro/io/data_catalog_redesign.py @@ -140,8 +140,7 @@ def datasets(self): @datasets.setter def datasets(self, value: Any): - msg = "Operation not allowed! " - # TODO: specify message based on the case + msg = "Operation not allowed! Please change datasets through configuration." raise AttributeError(msg) @property @@ -150,13 +149,18 @@ def resolved_ds_configs(self): @resolved_ds_configs.setter def resolved_ds_configs(self, value: Any): - msg = "Operation not allowed! " - # TODO: specify message based on the case + msg = "Operation not allowed! Please change datasets through configuration." raise AttributeError(msg) def __iter__(self): yield from self._datasets.values() + def __getitem__(self, ds_name: str) -> Any: + return self.get_dataset(ds_name) + + def _ipython_key_completions_(self) -> list[str]: + return list(self._datasets.keys()) + def _update_ds_configs( self, config: dict[str, dict[str, Any]], From 5581956c34eb3e366ac03f717741264a01cf4a4f Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 7 Aug 2024 13:41:00 +0100 Subject: [PATCH 08/14] Updated catalog resolve Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 6283cdf64b..1157cb8646 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -420,20 +420,17 @@ def resolve_patterns_new(metadata: ProjectMetadata, env: str) -> None: if pl_obj: datasets.update(pl_obj.datasets()) + catalog_datasets = set(data_catalog.list()) datasets_lst = [ ds_name for ds_name in datasets - # Excluding parameters + # Excluding parameters and free outputs if not (ds_name.startswith("params:") or ds_name == "parameters") - # Excluding free outputs - and ( - ds_name in data_catalog.resolved_ds_configs - or data_catalog.match_pattern(ds_name) - ) + and (ds_name in catalog_datasets or data_catalog.match_pattern(ds_name)) ] # Add datasets from catalog that are not used in target pipelines - for ds_name in data_catalog.resolved_ds_configs: + for ds_name in catalog_datasets: if not (ds_name in datasets or data_catalog.match_pattern(ds_name)): datasets_lst.append(ds_name) From 6f8f54a8be705b9003591f140314c2b313644118 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 7 Aug 2024 13:47:19 +0100 Subject: [PATCH 09/14] Added public properties for dataset_patterns and default_pattern Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 4 ++-- kedro/io/data_catalog_redesign.py | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 1157cb8646..1e330fc6cf 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -369,8 +369,8 @@ def rank_catalog_factories_new(metadata: ProjectMetadata, env: str) -> None: data_catalog = context.catalog_new catalog_factories = { - **data_catalog._dataset_patterns, - **data_catalog._default_pattern, + **data_catalog.dataset_patterns, + **data_catalog.default_pattern, } if catalog_factories: click.echo(yaml.dump(list(catalog_factories.keys()))) diff --git a/kedro/io/data_catalog_redesign.py b/kedro/io/data_catalog_redesign.py index 5d2f629cc2..6e15cec62c 100644 --- a/kedro/io/data_catalog_redesign.py +++ b/kedro/io/data_catalog_redesign.py @@ -152,6 +152,14 @@ def resolved_ds_configs(self, value: Any): msg = "Operation not allowed! Please change datasets through configuration." raise AttributeError(msg) + @property + def dataset_patterns(self): + return self._dataset_patterns + + @property + def default_pattern(self): + return self._default_pattern + def __iter__(self): yield from self._datasets.values() From 1af02a4a2ef6efa4dc95bdd55bd9b22c9855fa39 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 7 Aug 2024 13:52:19 +0100 Subject: [PATCH 10/14] Updated catalog create Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 1e330fc6cf..a6aad3d7c8 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -264,7 +264,7 @@ def create_catalog_new(metadata: ProjectMetadata, pipeline_name: str, env: str) data_catalog = context.catalog_new catalog_datasets = { ds_name - for ds_name in data_catalog.datasets.keys() + for ds_name in data_catalog.list() if not ds_name.startswith("params:") and ds_name != "parameters" } From 32c76070bb9984c43235197902abc9b86f559b8a Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 7 Aug 2024 19:07:32 +0100 Subject: [PATCH 11/14] Added __contains__ and comments on lazy loading Signed-off-by: Elena Khaustova --- kedro/io/data_catalog_redesign.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/kedro/io/data_catalog_redesign.py b/kedro/io/data_catalog_redesign.py index 6e15cec62c..775264115f 100644 --- a/kedro/io/data_catalog_redesign.py +++ b/kedro/io/data_catalog_redesign.py @@ -166,6 +166,13 @@ def __iter__(self): def __getitem__(self, ds_name: str) -> Any: return self.get_dataset(ds_name) + def __contains__(self, dataset_name: str) -> bool: + """Check if an item is in the catalog as a materialised dataset or pattern""" + matched_pattern = self.match_pattern(dataset_name) + if dataset_name in self._datasets or matched_pattern: + return True + return False + def _ipython_key_completions_(self) -> list[str]: return list(self._datasets.keys()) @@ -249,9 +256,7 @@ def _init_datasets( for ds_name, ds_config in config.items(): if not self._is_pattern(ds_name): validate_dataset_config(ds_name, ds_config) - resolved_ds_config = _resolve_credentials( # noqa: PLW2901 - ds_config, credentials - ) + resolved_ds_config = _resolve_credentials(ds_config, credentials) self._init_dataset(ds_name, resolved_ds_config) @classmethod @@ -268,9 +273,7 @@ def _get_patterns( for ds_name, ds_config in config.items(): if cls._is_pattern(ds_name): validate_dataset_config(ds_name, ds_config) - resolved_ds_config = _resolve_credentials( # noqa: PLW2901 - ds_config, credentials - ) + resolved_ds_config = _resolve_credentials(ds_config, credentials) dataset_patterns[ds_name] = resolved_ds_config sorted_patterns = cls._sort_patterns(dataset_patterns) @@ -418,6 +421,9 @@ def _validate_missing_keys(self) -> None: ) def _init_dataset(self, ds_name: str, config: dict[str, Any]): + # Add LazyAbstractDataset to store the configuration but not to init actual dataset + # Initialise actual dataset when load or save + # Add is_ds_init property self._datasets[ds_name] = AbstractDataset.from_config( ds_name, config, From df4b569917c1225917fdfb82a20898bb7c1c77e0 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Mon, 12 Aug 2024 19:35:48 +0100 Subject: [PATCH 12/14] Updated resolve_new Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 245f14d9e7..c3b1e4e2bf 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -417,25 +417,25 @@ def resolve_patterns_new(metadata: ProjectMetadata, env: str) -> None: ) target_pipelines = pipelines.keys() - datasets = set() + pipeline_datasets = set() for pipe in target_pipelines: pl_obj = pipelines.get(pipe) if pl_obj: - datasets.update(pl_obj.datasets()) + pipeline_datasets.update(pl_obj.datasets()) catalog_datasets = set(data_catalog.list()) datasets_lst = [ - ds_name - for ds_name in datasets + pp_ds_name + for pp_ds_name in pipeline_datasets # Excluding parameters and free outputs - if not (ds_name.startswith("params:") or ds_name == "parameters") - and (ds_name in catalog_datasets or data_catalog.match_pattern(ds_name)) + if not (pp_ds_name.startswith("params:") or pp_ds_name == "parameters") + and (pp_ds_name in catalog_datasets or data_catalog.match_pattern(pp_ds_name)) ] # Add datasets from catalog that are not used in target pipelines - for ds_name in catalog_datasets: - if not (ds_name in datasets or data_catalog.match_pattern(ds_name)): - datasets_lst.append(ds_name) + for cat_ds_name in catalog_datasets: + if cat_ds_name not in pipeline_datasets: + datasets_lst.append(cat_ds_name) resolved_configs = data_catalog.resolve_patterns(datasets_lst) From 0f784d9e06be7e78d1774efe5fc38bd114e1297f Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Mon, 12 Aug 2024 19:46:13 +0100 Subject: [PATCH 13/14] Added TODO for create new Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index c3b1e4e2bf..941dba8816 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -239,6 +239,7 @@ def _map_type_to_datasets_new( ) @click.pass_obj def create_catalog_new(metadata: ProjectMetadata, pipeline_name: str, env: str) -> None: + # TODO: consider patterns? Currently datasets matching patterns are added as MemoryDataset """Create Data Catalog YAML configuration with missing datasets. Add ``MemoryDataset`` datasets to Data Catalog YAML configuration From 3babfe68f27cd14c6cc5f441aca319b7772a4e3e Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Mon, 12 Aug 2024 20:10:29 +0100 Subject: [PATCH 14/14] Added TODO for create new Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 941dba8816..99395ba77a 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -239,7 +239,7 @@ def _map_type_to_datasets_new( ) @click.pass_obj def create_catalog_new(metadata: ProjectMetadata, pipeline_name: str, env: str) -> None: - # TODO: consider patterns? Currently datasets matching patterns are added as MemoryDataset + # TODO: consider patterns? Currently datasets matching patterns are shown as MemoryDataset """Create Data Catalog YAML configuration with missing datasets. Add ``MemoryDataset`` datasets to Data Catalog YAML configuration