Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataCatalog2.0]: Refactor catalog CLI (work in progress) #4071

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e3168db
Refarctored catalog resolve
ElenaKhaustova Aug 6, 2024
54c6e9a
Removed debug output
ElenaKhaustova Aug 6, 2024
318f583
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 6, 2024
055aa95
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 6, 2024
eb24284
Reimplemented catalog rank
ElenaKhaustova Aug 6, 2024
e0082af
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 6, 2024
45c7c5e
Reimplemented catalog list
ElenaKhaustova Aug 6, 2024
9248205
Reimplemented catalog create
ElenaKhaustova Aug 6, 2024
af89de2
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 6, 2024
29412f2
Added datasets and resolved_ds_configs properties
ElenaKhaustova Aug 7, 2024
a527442
Removed TODOs
ElenaKhaustova Aug 7, 2024
5581956
Updated catalog resolve
ElenaKhaustova Aug 7, 2024
6f8f54a
Added public properties for dataset_patterns and default_pattern
ElenaKhaustova Aug 7, 2024
1af02a4
Updated catalog create
ElenaKhaustova Aug 7, 2024
4579724
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 7, 2024
f025922
Merge branch 'main' into refactor-datasets-access-logic
ElenaKhaustova Aug 7, 2024
9c2c236
Merge branch 'refactor-datasets-access-logic' into refactor-pattern-l…
ElenaKhaustova Aug 7, 2024
32c7607
Added __contains__ and comments on lazy loading
ElenaKhaustova Aug 7, 2024
44a257d
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 7, 2024
2a1de49
Merge branch 'refactor-datasets-access-logic' into refactor-pattern-l…
ElenaKhaustova Aug 7, 2024
20d2b91
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 8, 2024
13e6da9
Merge branch 'main' into refactor-pattern-logic-catalog-cli
ElenaKhaustova Aug 12, 2024
306c696
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 12, 2024
df4b569
Updated resolve_new
ElenaKhaustova Aug 12, 2024
0f784d9
Added TODO for create new
ElenaKhaustova Aug 12, 2024
3babfe6
Added TODO for create new
ElenaKhaustova Aug 12, 2024
d2ad7c9
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 13, 2024
b745181
Merge branch 'refactor-pattern-logic' into refactor-pattern-logic-cat…
ElenaKhaustova Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions kedro/framework/cli/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from kedro.framework.project import pipelines, settings
from kedro.framework.session import KedroSession
from kedro.io.data_catalog import DataCatalog
from kedro.io.data_catalog_redesign import KedroDataCatalog

if TYPE_CHECKING:
from pathlib import Path
Expand All @@ -37,6 +38,80 @@ def catalog() -> None:
"""Commands for working with catalog."""


@catalog.command("list_new")
@env_option
@click.option(
"--pipeline",
"-p",
type=str,
default="",
help="Name of the modular pipeline to run. If not set, "
"the project pipeline is run by default.",
callback=split_string,
)
@click.pass_obj
def list_datasets_new(metadata: ProjectMetadata, pipeline: str, env: str) -> None:
"""Show datasets per type."""
title = "Datasets in '{}' pipeline"
not_mentioned = "Datasets not mentioned in pipeline"
mentioned = "Datasets mentioned in pipeline"
factories = "Datasets generated from factories"

session = _create_session(metadata.package_name, env=env)
context = session.load_context()

try:
data_catalog = context.catalog_new
datasets_meta = data_catalog.datasets
catalog_ds = set(data_catalog.list())
except Exception as exc:
raise KedroCliError(
f"Unable to instantiate Kedro Catalog.\nError: {exc}"
) from exc

target_pipelines = pipeline or pipelines.keys()

result = {}
for pipe in target_pipelines:
pl_obj = pipelines.get(pipe)
if pl_obj:
pipeline_ds = pl_obj.datasets()
else:
existing_pls = ", ".join(sorted(pipelines.keys()))
raise KedroCliError(
f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}"
)

unused_ds = catalog_ds - pipeline_ds
default_ds = pipeline_ds - catalog_ds
used_ds = catalog_ds - unused_ds

# resolve any factory datasets in the pipeline
factory_ds_by_type = defaultdict(list)
resolved_configs = data_catalog.resolve_patterns(default_ds)
for ds_name, ds_config in zip(default_ds, resolved_configs):
if data_catalog.match_pattern(ds_name):
factory_ds_by_type[ds_config.get("type", "DefaultDataset")].append(
ds_name
)

default_ds = default_ds - set(chain.from_iterable(factory_ds_by_type.values()))

unused_by_type = _map_type_to_datasets_new(unused_ds, datasets_meta)
used_by_type = _map_type_to_datasets_new(used_ds, datasets_meta)

if default_ds:
used_by_type["DefaultDataset"].extend(default_ds)

data = (
(mentioned, dict(used_by_type)),
(factories, dict(factory_ds_by_type)),
(not_mentioned, dict(unused_by_type)),
)
result[title.format(pipe)] = {key: value for key, value in data if value}
secho(yaml.dump(result))


@catalog.command("list")
@env_option
@click.option(
Expand Down Expand Up @@ -136,6 +211,82 @@ def _map_type_to_datasets(
return mapping


def _map_type_to_datasets_new(
datasets: set[str], datasets_meta: dict[str, type]
) -> dict:
"""Build dictionary with a dataset type as a key and list of
datasets of the specific type as a value.
"""
mapping = defaultdict(list) # type: ignore[var-annotated]
for dataset in datasets:
is_param = dataset.startswith("params:") or dataset == "parameters"
if not is_param:
ds_type = datasets_meta[dataset].__class__.__name__
if dataset not in mapping[ds_type]:
mapping[ds_type].append(dataset)
return mapping


@catalog.command("create_new")
@env_option(help="Environment to create Data Catalog YAML file in. Defaults to `base`.")
@click.option(
"--pipeline",
"-p",
"pipeline_name",
type=str,
required=True,
help="Name of a pipeline.",
)
@click.pass_obj
def create_catalog_new(metadata: ProjectMetadata, pipeline_name: str, env: str) -> None:
# TODO: consider patterns? Currently datasets matching patterns are shown as MemoryDataset
"""Create Data Catalog YAML configuration with missing datasets.

Add ``MemoryDataset`` datasets to Data Catalog YAML configuration
file for each dataset in a registered pipeline if it is missing from
the ``DataCatalog``.

The catalog configuration will be saved to
`<conf_source>/<env>/catalog_<pipeline_name>.yml` file.
"""
env = env or "base"
session = _create_session(metadata.package_name, env=env)
context = session.load_context()

pipeline = pipelines.get(pipeline_name)
if not pipeline:
existing_pipelines = ", ".join(sorted(pipelines.keys()))
raise KedroCliError(
f"'{pipeline_name}' pipeline not found! Existing pipelines: {existing_pipelines}"
)
pipeline_datasets = {
ds_name
for ds_name in pipeline.datasets()
if not ds_name.startswith("params:") and ds_name != "parameters"
}

data_catalog = context.catalog_new
catalog_datasets = {
ds_name
for ds_name in data_catalog.list()
if not ds_name.startswith("params:") and ds_name != "parameters"
}

# Datasets that are missing in Data Catalog
missing_ds = sorted(pipeline_datasets - catalog_datasets)
if missing_ds:
catalog_path = (
context.project_path
/ settings.CONF_SOURCE
/ env
/ f"catalog_{pipeline_name}.yml"
)
_add_missing_datasets_to_catalog(missing_ds, catalog_path)
click.echo(f"Data Catalog YAML configuration was created: {catalog_path}")
else:
click.echo("All datasets are already configured.")


@catalog.command("create")
@env_option(help="Environment to create Data Catalog YAML file in. Defaults to `base`.")
@click.option(
Expand Down Expand Up @@ -212,6 +363,25 @@ def _add_missing_datasets_to_catalog(missing_ds: list[str], catalog_path: Path)
yaml.safe_dump(catalog_config, catalog_file, default_flow_style=False)


@catalog.command("rank_new")
@env_option
@click.pass_obj
def rank_catalog_factories_new(metadata: ProjectMetadata, env: str) -> None:
"""List all dataset factories in the catalog, ranked by priority by which they are matched."""
session = _create_session(metadata.package_name, env=env)
context = session.load_context()
data_catalog = context.catalog_new

catalog_factories = {
**data_catalog.dataset_patterns,
**data_catalog.default_pattern,
}
if catalog_factories:
click.echo(yaml.dump(list(catalog_factories.keys())))
else:
click.echo("There are no dataset factories in the catalog.")


@catalog.command("rank")
@env_option
@click.pass_obj
Expand All @@ -230,6 +400,56 @@ def rank_catalog_factories(metadata: ProjectMetadata, env: str) -> None:
click.echo("There are no dataset factories in the catalog.")


@catalog.command("resolve_new")
@env_option
@click.pass_obj
def resolve_patterns_new(metadata: ProjectMetadata, env: str) -> None:
"""Resolve catalog factories against pipeline datasets. Note that this command is runner
agnostic and thus won't take into account any default dataset creation defined in the runner."""

session = _create_session(metadata.package_name, env=env)
context = session.load_context()

catalog_config = context.config_loader["catalog"]
credentials_config = context.config_loader.get("credentials", None)

data_catalog = KedroDataCatalog(
config=catalog_config, credentials=credentials_config
)

target_pipelines = pipelines.keys()
pipeline_datasets = set()
for pipe in target_pipelines:
pl_obj = pipelines.get(pipe)
if pl_obj:
pipeline_datasets.update(pl_obj.datasets())

catalog_datasets = set(data_catalog.list())
datasets_lst = [
pp_ds_name
for pp_ds_name in pipeline_datasets
# Excluding parameters and free outputs
if not (pp_ds_name.startswith("params:") or pp_ds_name == "parameters")
and (pp_ds_name in catalog_datasets or data_catalog.match_pattern(pp_ds_name))
]

# Add datasets from catalog that are not used in target pipelines
for cat_ds_name in catalog_datasets:
if cat_ds_name not in pipeline_datasets:
datasets_lst.append(cat_ds_name)

resolved_configs = data_catalog.resolve_patterns(datasets_lst)

secho(
yaml.dump(
{
ds_name: ds_config
for ds_name, ds_config in zip(datasets_lst, resolved_configs)
}
)
)


@catalog.command("resolve")
@env_option
@click.pass_obj
Expand Down
52 changes: 51 additions & 1 deletion kedro/framework/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

from kedro.config import AbstractConfigLoader, MissingConfigException
from kedro.framework.project import settings
from kedro.io import DataCatalog # noqa: TCH001
from kedro.io import (
AbstractDataCatalog,
DataCatalog,
KedroDataCatalog,
)
from kedro.pipeline.transcoding import _transcode_split

if TYPE_CHECKING:
Expand Down Expand Up @@ -139,6 +143,12 @@ def _validate_transcoded_datasets(catalog: DataCatalog) -> None:
_transcode_split(dataset_name)


def _validate_transcoded_datasets_new(catalog: AbstractDataCatalog) -> None:
"""Validates transcoded datasets are correctly named"""
for dataset_name in catalog.datasets.keys():
_transcode_split(dataset_name)


def _expand_full_path(project_path: str | Path) -> Path:
return Path(project_path).expanduser().resolve()

Expand Down Expand Up @@ -188,6 +198,10 @@ def catalog(self) -> DataCatalog:
"""
return self._get_catalog()

@property
def catalog_new(self) -> AbstractDataCatalog:
return self._get_catalog_new()

@property
def params(self) -> dict[str, Any]:
"""Read-only property referring to Kedro's parameters for this context.
Expand All @@ -208,6 +222,42 @@ def params(self) -> dict[str, Any]:

return OmegaConf.to_container(params) if OmegaConf.is_config(params) else params # type: ignore[no-any-return]

def _get_catalog_new(
self,
save_version: str | None = None,
load_versions: dict[str, str] | None = None,
) -> AbstractDataCatalog:
conf_catalog = self.config_loader["catalog"]
conf_catalog = _convert_paths_to_absolute_posix(
project_path=self.project_path, conf_dictionary=conf_catalog
)
conf_creds = self._get_config_credentials()

if isinstance(settings.DATA_CATALOG_CLASS_NEW, KedroDataCatalog):
catalog = settings.DATA_CATALOG_CLASS_NEW(
config=conf_catalog,
credentials=conf_creds,
load_versions=load_versions,
save_version=save_version,
)
else:
catalog = settings.DATA_CATALOG_CLASS_NEW(
config=conf_catalog,
credentials=conf_creds,
)
feed_dict = self._get_feed_dict()
catalog.add_from_dict(feed_dict)
_validate_transcoded_datasets_new(catalog)
self._hook_manager.hook.after_catalog_created_new(
catalog=catalog,
conf_catalog=conf_catalog,
conf_creds=conf_creds,
feed_dict=feed_dict,
save_version=save_version,
load_versions=load_versions,
)
return catalog

def _get_catalog(
self,
save_version: str | None = None,
Expand Down
28 changes: 27 additions & 1 deletion kedro/framework/hooks/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

if TYPE_CHECKING:
from kedro.framework.context import KedroContext
from kedro.io import DataCatalog
from kedro.io import AbstractDataCatalog, DataCatalog
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node

Expand Down Expand Up @@ -44,6 +44,32 @@ def after_catalog_created( # noqa: PLR0913
"""
pass

@hook_spec
def after_catalog_created_new( # noqa: PLR0913
self,
catalog: AbstractDataCatalog,
conf_catalog: dict[str, Any],
conf_creds: dict[str, Any],
feed_dict: dict[str, Any],
save_version: str,
load_versions: dict[str, str],
) -> None:
"""Hooks to be invoked after a data catalog is created.
It receives the ``catalog`` as well as
all the arguments for ``KedroContext._create_catalog``.

Args:
catalog: The catalog that was created.
conf_catalog: The config from which the catalog was created.
conf_creds: The credentials conf from which the catalog was created.
feed_dict: The feed_dict that was added to the catalog after creation.
save_version: The save_version used in ``save`` operations
for all datasets in the catalog.
load_versions: The load_versions used in ``load`` operations
for each dataset in the catalog.
"""
pass


class NodeSpecs:
"""Namespace that defines all specifications for a node's lifecycle hooks."""
Expand Down
5 changes: 5 additions & 0 deletions kedro/framework/project/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ class _ProjectSettings(LazySettings):
_DATA_CATALOG_CLASS = _IsSubclassValidator(
"DATA_CATALOG_CLASS", default=_get_default_class("kedro.io.DataCatalog")
)
_DATA_CATALOG_CLASS_NEW = _IsSubclassValidator(
"DATA_CATALOG_CLASS_NEW",
default=_get_default_class("kedro.io.AbstractDataCatalog"),
)

def __init__(self, *args: Any, **kwargs: Any):
kwargs.update(
Expand All @@ -130,6 +134,7 @@ def __init__(self, *args: Any, **kwargs: Any):
self._CONFIG_LOADER_CLASS,
self._CONFIG_LOADER_ARGS,
self._DATA_CATALOG_CLASS,
self._DATA_CATALOG_CLASS_NEW,
]
)
super().__init__(*args, **kwargs)
Expand Down