diff --git a/apps/pipelines/exceptions.py b/apps/pipelines/exceptions.py index 7632dd87d..19ab7aef3 100644 --- a/apps/pipelines/exceptions.py +++ b/apps/pipelines/exceptions.py @@ -2,11 +2,34 @@ class PipelineBuildError(Exception): """Exception to raise for errors detected at build time.""" def __init__(self, message: str, node_id: str = None, edge_ids: list[str] = None): + """ + Initialize a PipelineBuildError with detailed error information. + + Parameters: + message (str): A descriptive error message explaining the pipeline build failure. + node_id (str, optional): Identifier of the specific node where the error occurred. Defaults to None. + edge_ids (list[str], optional): List of edge identifiers related to the error. Defaults to None. + + Attributes: + message (str): Stores the error description. + node_id (str): Stores the node identifier, if provided. + edge_ids (list[str]): Stores the related edge identifiers, if provided. + """ self.message = message self.node_id = node_id self.edge_ids = edge_ids def to_json(self): + """ + Convert the error details to a structured JSON-like dictionary. + + If a node_id is provided, returns a dictionary with the node details and its error message. + Otherwise, returns a dictionary with a general pipeline error message. + + Returns: + dict: A dictionary containing error details, with either node-specific or pipeline-level information. + Includes an optional list of edge IDs associated with the error. + """ if self.node_id: return {"node": {self.node_id: {"root": self.message}}, "edge": self.edge_ids} return {"pipeline": self.message, "edge": self.edge_ids} diff --git a/apps/pipelines/graph.py b/apps/pipelines/graph.py index e5f9cf0e6..411c875f9 100644 --- a/apps/pipelines/graph.py +++ b/apps/pipelines/graph.py @@ -48,6 +48,12 @@ class PipelineGraph(pydantic.BaseModel): @cached_property def nodes_by_id(self) -> dict[str, Node]: + """ + Generates a mapping of node IDs to their corresponding Node instances. + + Returns: + dict[str, Node]: A dictionary where keys are node IDs and values are the corresponding Node objects from the graph's nodes. + """ return {node.id: node for node in self.nodes} @cached_property @@ -100,6 +106,29 @@ def build_from_pipeline(cls, pipeline: Pipeline) -> Self: return cls(nodes=node_data, edges=edge_data) def build_runnable(self) -> CompiledStateGraph: + """ + Build a runnable state graph from the pipeline graph. + + Validates the graph structure and compiles a state graph for execution. This method performs several key steps: + - Checks for the presence of nodes + - Validates start and end nodes + - Optionally validates parallel nodes + - Checks for graph cycles + - Constructs a state graph with entry and finish points + - Identifies reachable nodes + - Adds nodes and edges to the graph + - Compiles the graph into a runnable state + + Raises: + PipelineBuildError: If the graph is invalid due to: + - No nodes present + - Cycle detected + - Invalid start or end nodes + - Compilation errors + + Returns: + CompiledStateGraph: A compiled state graph ready for execution + """ from apps.pipelines.nodes.base import PipelineState if not self.nodes: @@ -127,8 +156,23 @@ def build_runnable(self) -> CompiledStateGraph: return compiled_graph def _validate_no_parallel_nodes(self): - """This is a simple check to ensure that no two edges are connected to the same output - which serves as a proxy for parallel nodes.""" + """ + Validate that no multiple edges are connected to the same output handle for a source node. + + This method checks for parallel nodes by ensuring no source node has multiple edges + connecting to the same output handle. If such a condition is detected, it raises + a PipelineBuildError with details about the conflicting edges. + + Raises: + PipelineBuildError: If more than one edge is found connected to the same output handle + for a single source node. The error includes the source node ID and the conflicting + edge IDs. + + Notes: + - Uses a Counter to track the frequency of source handles + - Identifies the most common handle and checks if it appears more than once + - Helps prevent ambiguous graph structures with parallel node connections + """ outgoing_edges = defaultdict(list) for edge in self.edges: outgoing_edges[edge.source].append(edge) @@ -143,7 +187,20 @@ def _validate_no_parallel_nodes(self): ) def _check_for_cycles(self): - """Detect cycles in a directed graph.""" + """ + Detect cycles in the directed pipeline graph using depth-first search (DFS). + + This method checks for the presence of cycles in the graph by traversing nodes and tracking their visit states. + A cycle is detected if a node is revisited during the depth-first search before being fully processed. + + Returns: + bool: True if a cycle is detected in the graph, False otherwise. + + Notes: + - Uses a three-state tracking system: 'unvisited', 'visiting', and 'visited' + - Implements a recursive depth-first search algorithm + - Checks all nodes to ensure complete graph traversal + """ adjacency_list = defaultdict(list) for edge in self.edges: adjacency_list[edge.source].append(edge.target) @@ -181,6 +238,22 @@ def _get_reachable_nodes(self, start_node: Node) -> list[Node]: return list(self.nodes_by_id[node_id] for node_id in visited) def _add_nodes_to_graph(self, state_graph: StateGraph, nodes: list[Node]): + """ + Add nodes to the state graph with their processing functions, ensuring the end node is reachable and handling potential validation errors. + + Parameters: + state_graph (StateGraph): The state graph to which nodes will be added + nodes (list[Node]): List of nodes to be added to the graph + + Raises: + PipelineBuildError: If the end node is not reachable from the start node + PipelineNodeBuildError: If node validation fails during graph construction + + Notes: + - Checks that the end node is present in the provided nodes + - Adds each node to the state graph with a partial function for processing + - Captures and re-raises validation errors with a more specific exception + """ if self.end_node not in nodes: raise PipelineBuildError( f"{EndNode.model_config['json_schema_extra'].label} node is not reachable " diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index c8ef19c95..2bfe0b93f 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -132,7 +132,26 @@ def update_nodes_from_data(self) -> None: created_node.update_from_params() def validate(self) -> dict: - """Validate the pipeline nodes and return a dictionary of errors""" + """ + Validates the pipeline's nodes and their configurations. + + Performs comprehensive validation of all nodes in the pipeline by: + 1. Checking individual node parameter configurations + 2. Attempting to build a runnable pipeline graph + + Parameters: + None + + Returns: + dict: A dictionary containing validation errors, if any: + - Empty dictionary if validation succeeds + - Nested dictionary with node-specific validation errors + - JSON-formatted error if pipeline graph construction fails + + Raises: + pydantic.ValidationError: If node parameters fail validation + PipelineBuildError: If pipeline graph cannot be constructed + """ from apps.pipelines.graph import PipelineGraph from apps.pipelines.nodes import nodes as pipeline_nodes @@ -156,6 +175,22 @@ def validate(self) -> dict: @cached_property def flow_data(self) -> dict: + """ + Constructs and returns a flow data dictionary from the pipeline's nodes and data. + + This cached property method transforms the pipeline's raw data into a structured flow representation by: + 1. Creating a base Flow object from the pipeline's data + 2. Mapping existing nodes to their corresponding flow node definitions + 3. Generating FlowNode instances with detailed node information + 4. Updating the flow's nodes with the generated node list + 5. Converting the flow object to a dictionary + + Parameters: + self (Pipeline): The current pipeline instance + + Returns: + dict: A dictionary representation of the flow, including nodes with their positions, types, and parameters + """ flow = Flow(**self.data) flow_nodes_by_id = {node.id: node for node in flow.nodes} nodes = [] diff --git a/apps/pipelines/tasks.py b/apps/pipelines/tasks.py index 9785771d7..9db65587a 100644 --- a/apps/pipelines/tasks.py +++ b/apps/pipelines/tasks.py @@ -20,6 +20,22 @@ def send_email_from_pipeline(recipient_list, subject, message): @shared_task def get_response_for_pipeline_test_message(pipeline_id: int, message_text: str, user_id: int): + """ + Retrieve a response from a pipeline for a test message, with error handling. + + Attempts to invoke a pipeline with a given message and user, handling potential pipeline build errors. + + Parameters: + pipeline_id (int): Unique identifier of the pipeline to invoke + message_text (str): Text message to be processed by the pipeline + user_id (int): Identifier of the user invoking the pipeline + + Returns: + dict: Result of pipeline invocation or error details if pipeline build fails + + Raises: + Pipeline.DoesNotExist: If no pipeline is found with the given pipeline_id + """ pipeline = Pipeline.objects.get(id=pipeline_id) try: return pipeline.simple_invoke(message_text, user_id) diff --git a/apps/pipelines/tests/test_runnable_builder.py b/apps/pipelines/tests/test_runnable_builder.py index a1f65b7ff..8d08a5cb8 100644 --- a/apps/pipelines/tests/test_runnable_builder.py +++ b/apps/pipelines/tests/test_runnable_builder.py @@ -178,6 +178,24 @@ def test_render_template(pipeline): @django_db_with_data(available_apps=("apps.service_providers",)) @mock.patch("apps.pipelines.nodes.base.PipelineNode.logger", mock.Mock()) def test_branching_pipeline(pipeline, experiment_session): + """ + Test a pipeline with multiple branching paths to verify correct message processing and output generation. + + This test validates a pipeline that demonstrates branching behavior, where a single input can be processed through multiple paths simultaneously. It checks that: + - The input is correctly propagated through different template rendering nodes + - Branching paths can have different processing steps + - The end node collects outputs from multiple branches + + Parameters: + pipeline (Pipeline): The pipeline configuration factory + experiment_session (ExperimentSession): The current experiment session context + + Verifies: + - Branching pipeline creates multiple output paths + - Each branch processes the input through its specific template + - End node aggregates outputs from different branches + - Lenient mode allows complex graph structures + """ start = start_node() template_a = render_template_node("A ({{input }})") template_b = render_template_node("B ({{ input}})") @@ -703,6 +721,22 @@ def test_split_graphs_should_not_build(pipeline): @django_db_with_data(available_apps=("apps.service_providers",)) def test_cyclical_graph(pipeline): # Ensure that cyclical graphs throw an error + """ + Test the pipeline builder's handling of cyclical graph structures. + + This test verifies that the pipeline creation process correctly detects and prevents the creation of graphs with cyclic dependencies, even when using lenient mode. + + Parameters: + pipeline (Pipeline): A pytest fixture representing the pipeline configuration. + + Raises: + PipelineBuildError: An error indicating that a cycle was detected in the graph structure. + + Notes: + - Creates a graph with nodes that form a cycle between passthrough_1 and passthrough_2 + - Uses lenient mode to allow more flexible graph validation + - Expects a specific error message when attempting to create a runnable with cyclic dependencies + """ start = start_node() passthrough_1 = passthrough_node() passthrough_2 = passthrough_node() @@ -737,6 +771,17 @@ def test_cyclical_graph(pipeline): @django_db_with_data(available_apps=("apps.service_providers",)) def test_parallel_nodes(pipeline): + """ + Test the pipeline's error handling when multiple edges are connected to the same output node. + + This test verifies that attempting to create a runnable pipeline with multiple edges connecting to the same end node raises a PipelineBuildError when lenient mode is disabled. + + Parameters: + pipeline (Pipeline): A pipeline fixture used for creating the runnable. + + Raises: + PipelineBuildError: An error indicating multiple edges are connected to the same output when lenient mode is False. + """ start = start_node() passthrough_1 = passthrough_node() passthrough_2 = passthrough_node() diff --git a/apps/pipelines/tests/utils.py b/apps/pipelines/tests/utils.py index f4fc8a221..936423eb9 100644 --- a/apps/pipelines/tests/utils.py +++ b/apps/pipelines/tests/utils.py @@ -24,6 +24,25 @@ def _make_edges(nodes) -> list[dict]: def create_runnable( pipeline: Pipeline, nodes: list[dict], edges: list[dict] | None = None, lenient=False ) -> CompiledStateGraph: + """ + Create a runnable graph from a pipeline with specified nodes and optional edges. + + Constructs a pipeline graph by populating the pipeline's data with nodes and edges, + and optionally enabling lenient validation mode. + + Parameters: + pipeline (Pipeline): The pipeline to transform into a runnable graph + nodes (list[dict]): List of node configurations to include in the graph + edges (list[dict], optional): List of edge connections between nodes. + If not provided, edges are automatically generated using _make_edges() + lenient (bool, optional): Flag to enable lenient validation mode. Defaults to False. + + Returns: + CompiledStateGraph: A compiled and runnable state graph constructed from the pipeline + + Raises: + Potential exceptions from PipelineGraph.build_from_pipeline() or graph.build_runnable() + """ if edges is None: edges = _make_edges(nodes) flow_nodes = [] @@ -37,6 +56,12 @@ def create_runnable( def start_node(): + """ + Create a start node for a pipeline graph with a unique identifier. + + Returns: + dict: A dictionary representing a start node with a unique UUID and type. + """ return {"id": str(uuid4()), "type": nodes.StartNode.__name__} diff --git a/assets/javascript/apps/pipeline/stores/pipelineManagerStore.ts b/assets/javascript/apps/pipeline/stores/pipelineManagerStore.ts index 1bde6b0c0..fd35bbb94 100644 --- a/assets/javascript/apps/pipeline/stores/pipelineManagerStore.ts +++ b/assets/javascript/apps/pipeline/stores/pipelineManagerStore.ts @@ -93,6 +93,18 @@ const usePipelineManagerStore = create((set, get) => ( })); +/** + * Updates the class names of edges in a pipeline based on error status. + * + * @remarks + * This function modifies the edge classes to visually indicate error states. Edges with errors + * receive an "edge-error" class, while error-free edges have their class name removed. + * + * @param pipeline - The pipeline containing edges to be checked + * @param errors - An object containing error information for different pipeline components + * + * @returns Void. Modifies edges in-place by adding or removing "edge-error" class. + */ function updateEdgeClasses(pipeline: PipelineType, errors: ErrorsType) { if (!pipeline.data) { return; diff --git a/assets/javascript/apps/pipeline/utils.ts b/assets/javascript/apps/pipeline/utils.ts index 7110702fa..92810a11e 100644 --- a/assets/javascript/apps/pipeline/utils.ts +++ b/assets/javascript/apps/pipeline/utils.ts @@ -8,10 +8,36 @@ export function getNodeId(nodeType: string) { return nodeType + "-" + uid.rnd(); } +/** + * Combines multiple class names into a single string. + * + * @param classes - A variable number of class names, which can include null or undefined values + * @returns A space-separated string of non-null class names + * + * @remarks + * This utility function filters out falsy values (null, undefined) and joins the remaining class names. + * + * @example + * classNames('btn', 'primary', null, 'active') // Returns 'btn primary active' + * classNames(undefined, 'disabled') // Returns 'disabled' + * classNames() // Returns an empty string + */ export function classNames(...classes: Array): string { return classes.filter(Boolean).join(" "); } +/** + * Determines the CSS border class for a node based on its error and selection state. + * + * @param nodeErrors - Indicates whether the node has errors + * @param selected - Indicates whether the node is currently selected + * @returns A string of CSS classes defining the node's border styling + * + * @remarks + * The border class is determined by two factors: + * - If the node has errors, it uses an error border + * - If the node is selected, it uses a different border color + */ export function nodeBorderClass(nodeErrors : boolean, selected : boolean ): string { const defaultBorder = nodeErrors ? "border-error " : "" const selectedBorder = nodeErrors ? "border-secondary" : "border-primary"