Skip to content

Commit

Permalink
Drop dependency on toposort in favour of built-in graphlib (#3728)
Browse files Browse the repository at this point in the history
* Replace toposort with graphlib (built-in from Python 3.9)

Signed-off-by: Ivan Danov <[email protected]>

* Create toposort groups only when needed

Signed-off-by: Ivan Danov <[email protected]>

* Update RELEASE.md and graphlib version constraints

Signed-off-by: Ivan Danov <[email protected]>

* Remove mypy-toposort

Signed-off-by: Ivan Danov <[email protected]>

* Ensure that the suggest resume test has no node ordering requirement

Signed-off-by: Ivan Danov <[email protected]>

* Ensure stable toposorting by grouping and ungrouping the result

Signed-off-by: Ivan Danov <[email protected]>

---------

Signed-off-by: Ivan Danov <[email protected]>
  • Loading branch information
idanov authored Mar 26, 2024
1 parent 0fc8089 commit 2095e24
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 31 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Cookiecutter errors are shown in short format without the `--verbose` flag.
* Kedro commands now work from any subdirectory within a Kedro project.
* Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`.
* Dropped the dependency on `toposort` in favour of the built-in `graphlib` module.

## Bug fixes and other changes
* Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings.
Expand Down
62 changes: 35 additions & 27 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
"""
from __future__ import annotations

import copy
import json
from collections import Counter, defaultdict
from itertools import chain
from typing import Any, Iterable

from toposort import CircularDependencyError as ToposortCircleError
from toposort import toposort
from graphlib import CycleError, TopologicalSorter

import kedro
from kedro.pipeline.node import Node, _to_list
Expand Down Expand Up @@ -164,7 +162,7 @@ def __init__(
self._nodes_by_output[_strip_transcoding(output)] = node

self._nodes = tagged_nodes
self._topo_sorted_nodes = _topologically_sorted(self.node_dependencies)
self._toposorted_nodes = _toposort(self.node_dependencies)

def __repr__(self) -> str: # pragma: no cover
"""Pipeline ([node1, ..., node10 ...], name='pipeline_name')"""
Expand Down Expand Up @@ -349,7 +347,7 @@ def nodes(self) -> list[Node]:
The list of all pipeline nodes in topological order.
"""
return list(chain.from_iterable(self._topo_sorted_nodes))
return list(self._toposorted_nodes)

@property
def grouped_nodes(self) -> list[list[Node]]:
Expand All @@ -361,7 +359,8 @@ def grouped_nodes(self) -> list[list[Node]]:
The pipeline nodes in topologically ordered groups.
"""
return copy.copy(self._topo_sorted_nodes)

return _group_toposorted(self._toposorted_nodes, self.node_dependencies)

def only_nodes(self, *node_names: str) -> Pipeline:
"""Create a new ``Pipeline`` which will contain only the specified
Expand Down Expand Up @@ -884,35 +883,44 @@ def _validate_transcoded_inputs_outputs(nodes: list[Node]) -> None:
)


def _topologically_sorted(node_dependencies: dict[Node, set[Node]]) -> list[list[Node]]:
"""Topologically group and sort (order) nodes such that no node depends on
a node that appears in the same or a later group.
def _group_toposorted(
toposorted: Iterable[Node], deps: dict[Node, set[Node]]
) -> list[list[Node]]:
"""Group already toposorted nodes into independent toposorted groups"""
processed: set[Node] = set()
groups = []
group = []
for x in toposorted:
if set(deps.get(x, set())) <= processed:
group.append(x)
elif group:
processed |= set(group)
groups.append(sorted(group))
group = [x]

if group:
groups.append(sorted(group))
return groups


def _toposort(node_dependencies: dict[Node, set[Node]]) -> list[Node]:
"""Topologically sort (order) nodes such that no node depends on
a node that appears earlier in the list.
Raises:
CircularDependencyError: When it is not possible to topologically order
provided nodes.
Returns:
The list of node sets in order of execution. First set is nodes that should
be executed first (no dependencies), second set are nodes that should be
executed on the second step, etc.
The list of nodes in order of execution.
"""

def _circle_error_message(error_data: dict[Any, set]) -> str:
"""Error messages provided by the toposort library will
refer to indices that are used as an intermediate step.
This method can be used to replace that message with
one that refers to the nodes' string representations.
"""
circular = [str(node) for node in error_data.keys()]
return f"Circular dependencies exist among these items: {circular}"

try:
# Sort it so it has consistent order when run with SequentialRunner
result = [sorted(dependencies) for dependencies in toposort(node_dependencies)]
return result
except ToposortCircleError as exc:
message = _circle_error_message(exc.data)
sorter = TopologicalSorter(node_dependencies)
# Ensure stable toposort by sorting the nodes in a group
groups = _group_toposorted(sorter.static_order(), node_dependencies)
return [n for group in groups for n in group]
except CycleError as exc:
message = f"Circular dependencies exist among these items: {exc.args[1]}"
raise CircularDependencyError(message) from exc


Expand Down
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
"rich>=12.0,<14.0",
"rope>=0.21,<2.0", # subject to LGPLv3 license
"toml>=0.10.0",
"toposort>=1.5", # Needs to be at least 1.5 to be able to raise CircularDependencyError
"graphlib_backport>=1.0.0; python_version < '3.9'",
]
keywords = [
"pipelines",
Expand Down Expand Up @@ -81,8 +81,7 @@ test = [
"pandas-stubs",
"types-PyYAML",
"types-cachetools",
"types-toml",
"types-toposort"
"types-toml"
]
docs = [
"docutils<0.21",
Expand Down
2 changes: 1 addition & 1 deletion tests/runner/test_sequential_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def test_confirms(self, mocker, test_pipeline, is_async):
@pytest.mark.parametrize(
"failing_node_names,expected_pattern",
[
(["node1_A"], r"No nodes ran."),
(["node1_A", "node1_B"], r"No nodes ran."),
(["node2"], r"(node1_A,node1_B|node1_B,node1_A)"),
(["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"),
(["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"),
Expand Down

0 comments on commit 2095e24

Please sign in to comment.