Skip to content

Commit

Permalink
Improve resume suggestions (#3719)
Browse files Browse the repository at this point in the history
* Improve suggestions to resume a failed pipeline

- if dataset (or param) is persistent & shared, don't keep looking for ancestors
- only look for ancestors producing impersistent inputs
- minimize number of suggested nodes (= shorter message for the same pipeline)
- testable logic, tests cases outside of scenarios for sequential runner

- Use _EPHEMERAL attribute
- Move tests to separate file
- Docstring updates

---------

Signed-off-by: Ondrej Zacha <[email protected]>
Co-authored-by: Nok Lam Chan <[email protected]>
  • Loading branch information
ondrejzacha and noklam authored Apr 2, 2024
1 parent 5b2ba2e commit 44b2ad8
Show file tree
Hide file tree
Showing 5 changed files with 467 additions and 68 deletions.
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`.
* Dropped the dependency on `toposort` in favour of the built-in `graphlib` module.
* Improve the performance of `Pipeline` object creation and summing.
* Improve suggestions to resume failed pipeline runs.

## Bug fixes and other changes
* Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings.
Expand All @@ -18,6 +19,9 @@
## Documentation changes

## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:

* [ondrejzacha](https://github.com/ondrejzacha)

# Release 0.19.3

Expand Down
180 changes: 131 additions & 49 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
as_completed,
wait,
)
from typing import Any, Iterable, Iterator
from typing import Any, Collection, Iterable, Iterator

from more_itertools import interleave
from pluggy import PluginManager
Expand Down Expand Up @@ -198,98 +198,180 @@ def _suggest_resume_scenario(

postfix = ""
if done_nodes:
node_names = (n.name for n in remaining_nodes)
resume_p = pipeline.only_nodes(*node_names)
start_p = resume_p.only_nodes_with_inputs(*resume_p.inputs())

# find the nearest persistent ancestors of the nodes in start_p
start_p_persistent_ancestors = _find_persistent_ancestors(
pipeline, start_p.nodes, catalog
start_node_names = _find_nodes_to_resume_from(
pipeline=pipeline,
unfinished_nodes=remaining_nodes,
catalog=catalog,
)

start_node_names = (n.name for n in start_p_persistent_ancestors)
postfix += f" --from-nodes \"{','.join(start_node_names)}\""
start_nodes_str = ",".join(sorted(start_node_names))
postfix += f' --from-nodes "{start_nodes_str}"'

if not postfix:
self._logger.warning(
"No nodes ran. Repeat the previous command to attempt a new run."
)
else:
self._logger.warning(
"There are %d nodes that have not run.\n"
f"There are {len(remaining_nodes)} nodes that have not run.\n"
"You can resume the pipeline run from the nearest nodes with "
"persisted inputs by adding the following "
"argument to your previous command:\n%s",
len(remaining_nodes),
postfix,
f"argument to your previous command:\n{postfix}"
)


def _find_persistent_ancestors(
pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog
def _find_nodes_to_resume_from(
pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: DataCatalog
) -> set[str]:
"""Given a collection of unfinished nodes in a pipeline using
a certain catalog, find the node names to pass to pipeline.from_nodes()
to cover all unfinished nodes, including any additional nodes
that should be re-run if their outputs are not persisted.
Args:
pipeline: the ``Pipeline`` to find starting nodes for.
unfinished_nodes: collection of ``Node``s that have not finished yet
catalog: the ``DataCatalog`` of the run.
Returns:
Set of node names to pass to pipeline.from_nodes() to continue
the run.
"""
nodes_to_be_run = _find_all_nodes_for_resumed_pipeline(
pipeline, unfinished_nodes, catalog
)

# Find which of the remaining nodes would need to run first (in topo sort)
persistent_ancestors = _find_initial_node_group(pipeline, nodes_to_be_run)

return {n.name for n in persistent_ancestors}


def _find_all_nodes_for_resumed_pipeline(
pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: DataCatalog
) -> set[Node]:
"""Breadth-first search approach to finding the complete set of
persistent ancestors of an iterable of ``Node``s. Persistent
ancestors exclusively have persisted ``Dataset``s as inputs.
``Node``s which need to run to cover all unfinished nodes,
including any additional nodes that should be re-run if their outputs
are not persisted.
Args:
pipeline: the ``Pipeline`` to find ancestors in.
children: the iterable containing ``Node``s to find ancestors of.
pipeline: the ``Pipeline`` to analyze.
unfinished_nodes: the iterable of ``Node``s which have not finished yet.
catalog: the ``DataCatalog`` of the run.
Returns:
A set containing first persistent ancestors of the given
``Node``s.
A set containing all input unfinished ``Node``s and all remaining
``Node``s that need to run in case their outputs are not persisted.
"""
ancestor_nodes_to_run = set()
queue, visited = deque(children), set(children)
nodes_to_run = set(unfinished_nodes)
initial_nodes = _nodes_with_external_inputs(unfinished_nodes)

queue, visited = deque(initial_nodes), set(initial_nodes)
while queue:
current_node = queue.popleft()
if _has_persistent_inputs(current_node, catalog):
ancestor_nodes_to_run.add(current_node)
continue
for parent in _enumerate_parents(pipeline, current_node):
if parent in visited:
nodes_to_run.add(current_node)
# Look for parent nodes which produce non-persistent inputs (if those exist)
non_persistent_inputs = _enumerate_non_persistent_inputs(current_node, catalog)
for node in _enumerate_nodes_with_outputs(pipeline, non_persistent_inputs):
if node in visited:
continue
visited.add(parent)
queue.append(parent)
return ancestor_nodes_to_run
visited.add(node)
queue.append(node)

# Make sure no downstream tasks are skipped
nodes_to_run = set(pipeline.from_nodes(*(n.name for n in nodes_to_run)).nodes)

def _enumerate_parents(pipeline: Pipeline, child: Node) -> list[Node]:
"""For a given ``Node``, returns a list containing the direct parents
of that ``Node`` in the given ``Pipeline``.
return nodes_to_run


def _nodes_with_external_inputs(nodes_of_interest: Iterable[Node]) -> set[Node]:
"""For given ``Node``s , find their subset which depends on
external inputs of the ``Pipeline`` they constitute. External inputs
are pipeline inputs not produced by other ``Node``s in the ``Pipeline``.
Args:
pipeline: the ``Pipeline`` to search for direct parents in.
child: the ``Node`` to find parents of.
nodes_of_interest: the ``Node``s to analyze.
Returns:
A list of all ``Node``s that are direct parents of ``child``.
A set of ``Node``s that depend on external inputs
of nodes of interest.
"""
parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs)
return parent_pipeline.nodes
p_nodes_of_interest = Pipeline(nodes_of_interest)
p_nodes_with_external_inputs = p_nodes_of_interest.only_nodes_with_inputs(
*p_nodes_of_interest.inputs()
)
return set(p_nodes_with_external_inputs.nodes)


def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool:
"""Check if a ``Node`` exclusively has persisted Datasets as inputs.
If at least one input is a ``MemoryDataset``, return False.
def _enumerate_non_persistent_inputs(node: Node, catalog: DataCatalog) -> set[str]:
"""Enumerate non-persistent input datasets of a ``Node``.
Args:
node: the ``Node`` to check the inputs of.
catalog: the ``DataCatalog`` of the run.
Returns:
True if the ``Node`` being checked exclusively has inputs that
are not ``MemoryDataset``, else False.
Set of names of non-persistent inputs of given ``Node``.
"""
# We use _datasets because they pertain parameter name format
catalog_datasets = catalog._datasets
non_persistent_inputs: set[str] = set()
for node_input in node.inputs:
if isinstance(catalog._datasets[node_input], MemoryDataset):
return False
return True
if node_input.startswith("params:"):
continue

if (
node_input not in catalog_datasets
or catalog_datasets[node_input]._EPHEMERAL
):
non_persistent_inputs.add(node_input)

return non_persistent_inputs


def _enumerate_nodes_with_outputs(
pipeline: Pipeline, outputs: Collection[str]
) -> list[Node]:
"""For given outputs, returns a list containing nodes that
generate them in the given ``Pipeline``.
Args:
pipeline: the ``Pipeline`` to search for nodes in.
outputs: the dataset names to find source nodes for.
Returns:
A list of all ``Node``s that are producing ``outputs``.
"""
parent_pipeline = pipeline.only_nodes_with_outputs(*outputs)
return parent_pipeline.nodes


def _find_initial_node_group(pipeline: Pipeline, nodes: Iterable[Node]) -> list[Node]:
"""Given a collection of ``Node``s in a ``Pipeline``,
find the initial group of ``Node``s to be run (in topological order).
This can be used to define a sub-pipeline with the smallest possible
set of nodes to pass to --from-nodes.
Args:
pipeline: the ``Pipeline`` to search for initial ``Node``s in.
nodes: the ``Node``s to find initial group for.
Returns:
A list of initial ``Node``s to run given inputs (in topological order).
"""
node_names = set(n.name for n in nodes)
if len(node_names) == 0:
return []
sub_pipeline = pipeline.only_nodes(*node_names)
initial_nodes = sub_pipeline.grouped_nodes[0]
return initial_nodes


def run_node(
Expand Down
Loading

0 comments on commit 44b2ad8

Please sign in to comment.