diff --git a/RELEASE.md b/RELEASE.md index 2c12870828..d50e1b936d 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -6,6 +6,7 @@ * Kedro commands now work from any subdirectory within a Kedro project. * Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`. * Dropped the dependency on `toposort` in favour of the built-in `graphlib` module. +* Improve the performance of `Pipeline` object creation and summing. ## Bug fixes and other changes * Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings. diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index f873cb80df..29d647b387 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -143,7 +143,10 @@ def __init__( _validate_transcoded_inputs_outputs(nodes_chain) _tags = set(_to_list(tags)) - tagged_nodes = [n.tag(_tags) for n in nodes_chain] + if _tags: + tagged_nodes = [n.tag(_tags) for n in nodes_chain] + else: + tagged_nodes = nodes_chain self._nodes_by_name = {node.name: node for node in tagged_nodes} _validate_unique_outputs(tagged_nodes) @@ -162,7 +165,17 @@ def __init__( self._nodes_by_output[_strip_transcoding(output)] = node self._nodes = tagged_nodes - self._toposorted_nodes = _toposort(self.node_dependencies) + self._toposorter = TopologicalSorter(self.node_dependencies) + + # test for circular dependencies without executing the toposort for efficiency + try: + self._toposorter.prepare() + except CycleError as exc: + message = f"Circular dependencies exist among these items: {exc.args[1]}" + raise CircularDependencyError(message) from exc + + self._toposorted_nodes: list[Node] = [] + self._toposorted_groups: list[list[Node]] = [] def __repr__(self) -> str: # pragma: no cover """Pipeline ([node1, ..., node10 ...], name='pipeline_name')""" @@ -179,7 +192,7 @@ def __repr__(self) -> str: # pragma: no cover def __add__(self, other: Any) -> Pipeline: if not isinstance(other, Pipeline): return NotImplemented - return Pipeline(set(self.nodes + other.nodes)) + return Pipeline(set(self._nodes + other._nodes)) def __radd__(self, other: Any) -> Pipeline: if isinstance(other, int) and other == 0: @@ -189,17 +202,17 @@ def __radd__(self, other: Any) -> Pipeline: def __sub__(self, other: Any) -> Pipeline: if not isinstance(other, Pipeline): return NotImplemented - return Pipeline(set(self.nodes) - set(other.nodes)) + return Pipeline(set(self._nodes) - set(other._nodes)) def __and__(self, other: Any) -> Pipeline: if not isinstance(other, Pipeline): return NotImplemented - return Pipeline(set(self.nodes) & set(other.nodes)) + return Pipeline(set(self._nodes) & set(other._nodes)) def __or__(self, other: Any) -> Pipeline: if not isinstance(other, Pipeline): return NotImplemented - return Pipeline(set(self.nodes + other.nodes)) + return Pipeline(set(self._nodes + other._nodes)) def all_inputs(self) -> set[str]: """All inputs for all nodes in the pipeline. @@ -208,7 +221,7 @@ def all_inputs(self) -> set[str]: All node input names as a Set. """ - return set.union(set(), *(node.inputs for node in self.nodes)) + return set.union(set(), *(node.inputs for node in self._nodes)) def all_outputs(self) -> set[str]: """All outputs of all nodes in the pipeline. @@ -217,7 +230,7 @@ def all_outputs(self) -> set[str]: All node outputs. """ - return set.union(set(), *(node.outputs for node in self.nodes)) + return set.union(set(), *(node.outputs for node in self._nodes)) def _remove_intermediates(self, datasets: set[str]) -> set[str]: intermediate = {_strip_transcoding(i) for i in self.all_inputs()} & { @@ -347,6 +360,9 @@ def nodes(self) -> list[Node]: The list of all pipeline nodes in topological order. """ + if not self._toposorted_nodes: + self._toposorted_nodes = [n for group in self.grouped_nodes for n in group] + return list(self._toposorted_nodes) @property @@ -360,7 +376,13 @@ def grouped_nodes(self) -> list[list[Node]]: """ - return _group_toposorted(self._toposorted_nodes, self.node_dependencies) + if not self._toposorted_groups: + while self._toposorter: + group = sorted(self._toposorter.get_ready()) + self._toposorted_groups.append(group) + self._toposorter.done(*group) + + return [list(group) for group in self._toposorted_groups] def only_nodes(self, *node_names: str) -> Pipeline: """Create a new ``Pipeline`` which will contain only the specified @@ -416,7 +438,7 @@ def only_nodes_with_namespace(self, node_namespace: str) -> Pipeline: """ nodes = [ n - for n in self.nodes + for n in self._nodes if n.namespace and n.namespace.startswith(node_namespace) ] if not nodes: @@ -675,7 +697,7 @@ def only_nodes_with_tags(self, *tags: str) -> Pipeline: of the tags provided are being copied. """ unique_tags = set(tags) - nodes = [node for node in self.nodes if unique_tags & node.tags] + nodes = [node for node in self._nodes if unique_tags & node.tags] return Pipeline(nodes) def filter( # noqa: PLR0913 @@ -759,7 +781,7 @@ def filter( # noqa: PLR0913 # would give different outcomes depending on the order of filter methods: # only_nodes and then from_inputs would give node1, while only_nodes and then # from_inputs would give node1 and node3. - filtered_pipeline = Pipeline(self.nodes) + filtered_pipeline = Pipeline(self._nodes) for subset_pipeline in subset_pipelines: filtered_pipeline &= subset_pipeline @@ -778,7 +800,7 @@ def tag(self, tags: str | Iterable[str]) -> Pipeline: Returns: New ``Pipeline`` object with nodes tagged. """ - nodes = [n.tag(tags) for n in self.nodes] + nodes = [n.tag(tags) for n in self._nodes] return Pipeline(nodes) def to_json(self) -> str: @@ -790,7 +812,7 @@ def to_json(self) -> str: "outputs": list(n.outputs), "tags": list(n.tags), } - for n in self.nodes + for n in self._nodes ] pipeline_versioned = { "kedro_version": kedro.__version__, @@ -883,47 +905,6 @@ def _validate_transcoded_inputs_outputs(nodes: list[Node]) -> None: ) -def _group_toposorted( - toposorted: Iterable[Node], deps: dict[Node, set[Node]] -) -> list[list[Node]]: - """Group already toposorted nodes into independent toposorted groups""" - processed: set[Node] = set() - groups = [] - group = [] - for x in toposorted: - if set(deps.get(x, set())) <= processed: - group.append(x) - elif group: - processed |= set(group) - groups.append(sorted(group)) - group = [x] - - if group: - groups.append(sorted(group)) - return groups - - -def _toposort(node_dependencies: dict[Node, set[Node]]) -> list[Node]: - """Topologically sort (order) nodes such that no node depends on - a node that appears earlier in the list. - - Raises: - CircularDependencyError: When it is not possible to topologically order - provided nodes. - - Returns: - The list of nodes in order of execution. - """ - try: - sorter = TopologicalSorter(node_dependencies) - # Ensure stable toposort by sorting the nodes in a group - groups = _group_toposorted(sorter.static_order(), node_dependencies) - return [n for group in groups for n in group] - except CycleError as exc: - message = f"Circular dependencies exist among these items: {exc.args[1]}" - raise CircularDependencyError(message) from exc - - class CircularDependencyError(Exception): """Raised when it is not possible to provide a topological execution order for nodes, due to a circular dependency existing in the node