Skip to content

Commit

Permalink
📝 Add docstrings to sk/pipeline-validations
Browse files Browse the repository at this point in the history
Docstrings generation was requested by @snopoke.

* #1045 (comment)

The following files were modified:

* `apps/pipelines/exceptions.py`
* `apps/pipelines/graph.py`
* `apps/pipelines/models.py`
* `apps/pipelines/tasks.py`
* `apps/pipelines/tests/test_runnable_builder.py`
* `apps/pipelines/tests/utils.py`
* `assets/javascript/apps/pipeline/stores/pipelineManagerStore.ts`
* `assets/javascript/apps/pipeline/utils.ts`
  • Loading branch information
coderabbitai[bot] authored Jan 17, 2025
1 parent 49f60f7 commit 540a1c4
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 4 deletions.
23 changes: 23 additions & 0 deletions apps/pipelines/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,34 @@ class PipelineBuildError(Exception):
"""Exception to raise for errors detected at build time."""

def __init__(self, message: str, node_id: str = None, edge_ids: list[str] = None):
"""
Initialize a PipelineBuildError with detailed error information.
Parameters:
message (str): A descriptive error message explaining the pipeline build failure.
node_id (str, optional): Identifier of the specific node where the error occurred. Defaults to None.
edge_ids (list[str], optional): List of edge identifiers related to the error. Defaults to None.
Attributes:
message (str): Stores the error description.
node_id (str): Stores the node identifier, if provided.
edge_ids (list[str]): Stores the related edge identifiers, if provided.
"""
self.message = message
self.node_id = node_id
self.edge_ids = edge_ids

def to_json(self):
"""
Convert the error details to a structured JSON-like dictionary.
If a node_id is provided, returns a dictionary with the node details and its error message.
Otherwise, returns a dictionary with a general pipeline error message.
Returns:
dict: A dictionary containing error details, with either node-specific or pipeline-level information.
Includes an optional list of edge IDs associated with the error.
"""
if self.node_id:
return {"node": {self.node_id: {"root": self.message}}, "edge": self.edge_ids}
return {"pipeline": self.message, "edge": self.edge_ids}
Expand Down
79 changes: 76 additions & 3 deletions apps/pipelines/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ class PipelineGraph(pydantic.BaseModel):

@cached_property
def nodes_by_id(self) -> dict[str, Node]:
"""
Generates a mapping of node IDs to their corresponding Node instances.
Returns:
dict[str, Node]: A dictionary where keys are node IDs and values are the corresponding Node objects from the graph's nodes.
"""
return {node.id: node for node in self.nodes}

@cached_property
Expand Down Expand Up @@ -100,6 +106,29 @@ def build_from_pipeline(cls, pipeline: Pipeline) -> Self:
return cls(nodes=node_data, edges=edge_data)

def build_runnable(self) -> CompiledStateGraph:
"""
Build a runnable state graph from the pipeline graph.
Validates the graph structure and compiles a state graph for execution. This method performs several key steps:
- Checks for the presence of nodes
- Validates start and end nodes
- Optionally validates parallel nodes
- Checks for graph cycles
- Constructs a state graph with entry and finish points
- Identifies reachable nodes
- Adds nodes and edges to the graph
- Compiles the graph into a runnable state
Raises:
PipelineBuildError: If the graph is invalid due to:
- No nodes present
- Cycle detected
- Invalid start or end nodes
- Compilation errors
Returns:
CompiledStateGraph: A compiled state graph ready for execution
"""
from apps.pipelines.nodes.base import PipelineState

if not self.nodes:
Expand Down Expand Up @@ -127,8 +156,23 @@ def build_runnable(self) -> CompiledStateGraph:
return compiled_graph

def _validate_no_parallel_nodes(self):
"""This is a simple check to ensure that no two edges are connected to the same output
which serves as a proxy for parallel nodes."""
"""
Validate that no multiple edges are connected to the same output handle for a source node.
This method checks for parallel nodes by ensuring no source node has multiple edges
connecting to the same output handle. If such a condition is detected, it raises
a PipelineBuildError with details about the conflicting edges.
Raises:
PipelineBuildError: If more than one edge is found connected to the same output handle
for a single source node. The error includes the source node ID and the conflicting
edge IDs.
Notes:
- Uses a Counter to track the frequency of source handles
- Identifies the most common handle and checks if it appears more than once
- Helps prevent ambiguous graph structures with parallel node connections
"""
outgoing_edges = defaultdict(list)
for edge in self.edges:
outgoing_edges[edge.source].append(edge)
Expand All @@ -143,7 +187,20 @@ def _validate_no_parallel_nodes(self):
)

def _check_for_cycles(self):
"""Detect cycles in a directed graph."""
"""
Detect cycles in the directed pipeline graph using depth-first search (DFS).
This method checks for the presence of cycles in the graph by traversing nodes and tracking their visit states.
A cycle is detected if a node is revisited during the depth-first search before being fully processed.
Returns:
bool: True if a cycle is detected in the graph, False otherwise.
Notes:
- Uses a three-state tracking system: 'unvisited', 'visiting', and 'visited'
- Implements a recursive depth-first search algorithm
- Checks all nodes to ensure complete graph traversal
"""
adjacency_list = defaultdict(list)
for edge in self.edges:
adjacency_list[edge.source].append(edge.target)
Expand Down Expand Up @@ -181,6 +238,22 @@ def _get_reachable_nodes(self, start_node: Node) -> list[Node]:
return list(self.nodes_by_id[node_id] for node_id in visited)

def _add_nodes_to_graph(self, state_graph: StateGraph, nodes: list[Node]):
"""
Add nodes to the state graph with their processing functions, ensuring the end node is reachable and handling potential validation errors.
Parameters:
state_graph (StateGraph): The state graph to which nodes will be added
nodes (list[Node]): List of nodes to be added to the graph
Raises:
PipelineBuildError: If the end node is not reachable from the start node
PipelineNodeBuildError: If node validation fails during graph construction
Notes:
- Checks that the end node is present in the provided nodes
- Adds each node to the state graph with a partial function for processing
- Captures and re-raises validation errors with a more specific exception
"""
if self.end_node not in nodes:
raise PipelineBuildError(
f"{EndNode.model_config['json_schema_extra'].label} node is not reachable "
Expand Down
37 changes: 36 additions & 1 deletion apps/pipelines/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,26 @@ def update_nodes_from_data(self) -> None:
created_node.update_from_params()

def validate(self) -> dict:
"""Validate the pipeline nodes and return a dictionary of errors"""
"""
Validates the pipeline's nodes and their configurations.
Performs comprehensive validation of all nodes in the pipeline by:
1. Checking individual node parameter configurations
2. Attempting to build a runnable pipeline graph
Parameters:
None
Returns:
dict: A dictionary containing validation errors, if any:
- Empty dictionary if validation succeeds
- Nested dictionary with node-specific validation errors
- JSON-formatted error if pipeline graph construction fails
Raises:
pydantic.ValidationError: If node parameters fail validation
PipelineBuildError: If pipeline graph cannot be constructed
"""
from apps.pipelines.graph import PipelineGraph
from apps.pipelines.nodes import nodes as pipeline_nodes

Expand All @@ -156,6 +175,22 @@ def validate(self) -> dict:

@cached_property
def flow_data(self) -> dict:
"""
Constructs and returns a flow data dictionary from the pipeline's nodes and data.
This cached property method transforms the pipeline's raw data into a structured flow representation by:
1. Creating a base Flow object from the pipeline's data
2. Mapping existing nodes to their corresponding flow node definitions
3. Generating FlowNode instances with detailed node information
4. Updating the flow's nodes with the generated node list
5. Converting the flow object to a dictionary
Parameters:
self (Pipeline): The current pipeline instance
Returns:
dict: A dictionary representation of the flow, including nodes with their positions, types, and parameters
"""
flow = Flow(**self.data)
flow_nodes_by_id = {node.id: node for node in flow.nodes}
nodes = []
Expand Down
16 changes: 16 additions & 0 deletions apps/pipelines/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ def send_email_from_pipeline(recipient_list, subject, message):

@shared_task
def get_response_for_pipeline_test_message(pipeline_id: int, message_text: str, user_id: int):
"""
Retrieve a response from a pipeline for a test message, with error handling.
Attempts to invoke a pipeline with a given message and user, handling potential pipeline build errors.
Parameters:
pipeline_id (int): Unique identifier of the pipeline to invoke
message_text (str): Text message to be processed by the pipeline
user_id (int): Identifier of the user invoking the pipeline
Returns:
dict: Result of pipeline invocation or error details if pipeline build fails
Raises:
Pipeline.DoesNotExist: If no pipeline is found with the given pipeline_id
"""
pipeline = Pipeline.objects.get(id=pipeline_id)
try:
return pipeline.simple_invoke(message_text, user_id)
Expand Down
45 changes: 45 additions & 0 deletions apps/pipelines/tests/test_runnable_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,24 @@ def test_render_template(pipeline):
@django_db_with_data(available_apps=("apps.service_providers",))
@mock.patch("apps.pipelines.nodes.base.PipelineNode.logger", mock.Mock())
def test_branching_pipeline(pipeline, experiment_session):
"""
Test a pipeline with multiple branching paths to verify correct message processing and output generation.
This test validates a pipeline that demonstrates branching behavior, where a single input can be processed through multiple paths simultaneously. It checks that:
- The input is correctly propagated through different template rendering nodes
- Branching paths can have different processing steps
- The end node collects outputs from multiple branches
Parameters:
pipeline (Pipeline): The pipeline configuration factory
experiment_session (ExperimentSession): The current experiment session context
Verifies:
- Branching pipeline creates multiple output paths
- Each branch processes the input through its specific template
- End node aggregates outputs from different branches
- Lenient mode allows complex graph structures
"""
start = start_node()
template_a = render_template_node("A ({{input }})")
template_b = render_template_node("B ({{ input}})")
Expand Down Expand Up @@ -703,6 +721,22 @@ def test_split_graphs_should_not_build(pipeline):
@django_db_with_data(available_apps=("apps.service_providers",))
def test_cyclical_graph(pipeline):
# Ensure that cyclical graphs throw an error
"""
Test the pipeline builder's handling of cyclical graph structures.
This test verifies that the pipeline creation process correctly detects and prevents the creation of graphs with cyclic dependencies, even when using lenient mode.
Parameters:
pipeline (Pipeline): A pytest fixture representing the pipeline configuration.
Raises:
PipelineBuildError: An error indicating that a cycle was detected in the graph structure.
Notes:
- Creates a graph with nodes that form a cycle between passthrough_1 and passthrough_2
- Uses lenient mode to allow more flexible graph validation
- Expects a specific error message when attempting to create a runnable with cyclic dependencies
"""
start = start_node()
passthrough_1 = passthrough_node()
passthrough_2 = passthrough_node()
Expand Down Expand Up @@ -737,6 +771,17 @@ def test_cyclical_graph(pipeline):

@django_db_with_data(available_apps=("apps.service_providers",))
def test_parallel_nodes(pipeline):
"""
Test the pipeline's error handling when multiple edges are connected to the same output node.
This test verifies that attempting to create a runnable pipeline with multiple edges connecting to the same end node raises a PipelineBuildError when lenient mode is disabled.
Parameters:
pipeline (Pipeline): A pipeline fixture used for creating the runnable.
Raises:
PipelineBuildError: An error indicating multiple edges are connected to the same output when lenient mode is False.
"""
start = start_node()
passthrough_1 = passthrough_node()
passthrough_2 = passthrough_node()
Expand Down
25 changes: 25 additions & 0 deletions apps/pipelines/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ def _make_edges(nodes) -> list[dict]:
def create_runnable(
pipeline: Pipeline, nodes: list[dict], edges: list[dict] | None = None, lenient=False
) -> CompiledStateGraph:
"""
Create a runnable graph from a pipeline with specified nodes and optional edges.
Constructs a pipeline graph by populating the pipeline's data with nodes and edges,
and optionally enabling lenient validation mode.
Parameters:
pipeline (Pipeline): The pipeline to transform into a runnable graph
nodes (list[dict]): List of node configurations to include in the graph
edges (list[dict], optional): List of edge connections between nodes.
If not provided, edges are automatically generated using _make_edges()
lenient (bool, optional): Flag to enable lenient validation mode. Defaults to False.
Returns:
CompiledStateGraph: A compiled and runnable state graph constructed from the pipeline
Raises:
Potential exceptions from PipelineGraph.build_from_pipeline() or graph.build_runnable()
"""
if edges is None:
edges = _make_edges(nodes)
flow_nodes = []
Expand All @@ -37,6 +56,12 @@ def create_runnable(


def start_node():
"""
Create a start node for a pipeline graph with a unique identifier.
Returns:
dict: A dictionary representing a start node with a unique UUID and type.
"""
return {"id": str(uuid4()), "type": nodes.StartNode.__name__}


Expand Down
12 changes: 12 additions & 0 deletions assets/javascript/apps/pipeline/stores/pipelineManagerStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ const usePipelineManagerStore = create<PipelineManagerStoreType>((set, get) => (
}));


/**
* Updates the class names of edges in a pipeline based on error status.
*
* @remarks
* This function modifies the edge classes to visually indicate error states. Edges with errors
* receive an "edge-error" class, while error-free edges have their class name removed.
*
* @param pipeline - The pipeline containing edges to be checked
* @param errors - An object containing error information for different pipeline components
*
* @returns Void. Modifies edges in-place by adding or removing "edge-error" class.
*/
function updateEdgeClasses(pipeline: PipelineType, errors: ErrorsType) {
if (!pipeline.data) {
return;
Expand Down
26 changes: 26 additions & 0 deletions assets/javascript/apps/pipeline/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,36 @@ export function getNodeId(nodeType: string) {
return nodeType + "-" + uid.rnd();
}

/**
* Combines multiple class names into a single string.
*
* @param classes - A variable number of class names, which can include null or undefined values
* @returns A space-separated string of non-null class names
*
* @remarks
* This utility function filters out falsy values (null, undefined) and joins the remaining class names.
*
* @example
* classNames('btn', 'primary', null, 'active') // Returns 'btn primary active'
* classNames(undefined, 'disabled') // Returns 'disabled'
* classNames() // Returns an empty string
*/
export function classNames(...classes: Array<string | null | undefined>): string {
return classes.filter(Boolean).join(" ");
}

/**
* Determines the CSS border class for a node based on its error and selection state.
*
* @param nodeErrors - Indicates whether the node has errors
* @param selected - Indicates whether the node is currently selected
* @returns A string of CSS classes defining the node's border styling
*
* @remarks
* The border class is determined by two factors:
* - If the node has errors, it uses an error border
* - If the node is selected, it uses a different border color
*/
export function nodeBorderClass(nodeErrors : boolean, selected : boolean ): string {
const defaultBorder = nodeErrors ? "border-error " : ""
const selectedBorder = nodeErrors ? "border-secondary" : "border-primary"
Expand Down

0 comments on commit 540a1c4

Please sign in to comment.