diff --git a/portia/builder/__init__.py b/portia/builder/__init__.py index 054a7a75..eb414f30 100644 --- a/portia/builder/__init__.py +++ b/portia/builder/__init__.py @@ -1 +1,6 @@ """Builder module for constructing Portia plans.""" + +from portia.builder.parallel_step import ParallelStep +from portia.builder.plan_builder_v2 import PlanBuilderV2 + +__all__ = ["ParallelStep", "PlanBuilderV2"] diff --git a/portia/builder/parallel_step.py b/portia/builder/parallel_step.py new file mode 100644 index 00000000..09f0ec9b --- /dev/null +++ b/portia/builder/parallel_step.py @@ -0,0 +1,101 @@ +"""Implementation of the parallel step for concurrent task execution.""" + +from __future__ import annotations + +import asyncio +import sys +from typing import TYPE_CHECKING, Any + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override # pragma: no cover + +from langsmith import traceable +from pydantic import Field + +from portia.builder.step_v2 import StepV2 +from portia.plan import Step + +if TYPE_CHECKING: + from portia.builder.plan_v2 import PlanV2 + from portia.run_context import RunContext + + +class ParallelStep(StepV2): + """A step that executes multiple child steps concurrently. + + This step allows developers to define a block of steps that should be executed + in parallel rather than sequentially. This is useful for performance optimization + when multiple independent tasks (e.g., calling different tools, running multiple + LLM queries) can be performed simultaneously. + + The output of a ParallelStep is a list containing the outputs of its child steps, + in the order they were provided. Subsequent steps can reference the outputs of + individual child steps via indexing (e.g., StepOutput("parallel_step", path="0")). + + If any child step fails, all other concurrently running steps are cancelled, + and the ParallelStep as a whole fails, propagating the error. + """ + + steps: list[StepV2] = Field( + description="List of steps to execute in parallel. These steps should be independent " + "with no data dependencies on one another." + ) + + def __str__(self) -> str: + """Return a description of this step for logging purposes.""" + step_types = [step.__class__.__name__ for step in self.steps] + return f"ParallelStep(steps=[{', '.join(step_types)}])" + + @override + @traceable(name="Parallel Step - Run") + async def run(self, run_data: RunContext) -> list[Any]: # pyright: ignore[reportIncompatibleMethodOverride] - needed due to Langsmith decorator + """Execute all child steps concurrently and return their outputs as a list. + + Uses asyncio.gather to run all child steps in parallel. If any step fails, + all other steps are cancelled and the exception is propagated. + + Args: + run_data: The runtime context containing step outputs, inputs, and other + execution data. + + Returns: + A list of outputs from each child step, in the order they were provided. + + Raises: + Exception: If any child step fails during execution. + + """ + # Create tasks for all child steps + tasks = [step.run(run_data) for step in self.steps] + + # Execute all tasks concurrently + # If any task fails, gather will cancel remaining tasks and raise the exception + results = await asyncio.gather(*tasks) + return list(results) + + @override + def to_legacy_step(self, plan: PlanV2) -> Step: + """Convert this ParallelStep to a legacy Step. + + This creates a legacy step representation primarily for dashboard display. + + Args: + plan: The PlanV2 instance containing this step. + + Returns: + A legacy Step object representing this parallel step. + + """ + child_step_descriptions = [str(step) for step in self.steps] + task_description = ( + f"Execute {len(self.steps)} steps in parallel: {', '.join(child_step_descriptions)}" + ) + + return Step( + task=task_description, + inputs=[], + output=plan.step_output_name(self), + condition=self._get_legacy_condition(plan), + ) diff --git a/portia/builder/plan_builder_v2.py b/portia/builder/plan_builder_v2.py index a1b3cd92..7868c170 100644 --- a/portia/builder/plan_builder_v2.py +++ b/portia/builder/plan_builder_v2.py @@ -18,6 +18,7 @@ from portia.builder.llm_step import LLMStep from portia.builder.loop_step import LoopStep from portia.builder.loops import LoopBlock, LoopStepType, LoopType +from portia.builder.parallel_step import ParallelStep from portia.builder.plan_v2 import PlanV2 from portia.builder.react_agent_step import ReActAgentStep from portia.builder.reference import Reference, default_step_name @@ -779,6 +780,66 @@ def add_sub_plan( return self + def add_parallel( + self, + steps: list[StepV2], + step_name: str | None = None, + ) -> PlanBuilderV2: + """Add a parallel execution step to the plan. + + This creates a step that executes multiple child steps concurrently using + asyncio.gather. All child steps are executed in parallel, and their outputs + are collected into a list. This is useful for performance optimization when + you have independent operations that can run simultaneously. + + The output of the parallel step is a list containing the outputs of all child + steps in the order they were provided. You can reference individual outputs + using path notation: StepOutput("parallel_step_name", path="0") for the first + child step's output, path="1" for the second, and so on. + + If any child step fails during execution, all other running steps are cancelled + and the parallel step fails, propagating the error. + + Args: + steps: List of fully defined StepV2 instances to execute in parallel. + These steps should be independent with no data dependencies on each other. + step_name: Optional explicit name for the step. This allows its output to be + referenced via StepOutput("name_of_step") rather than by index. + + Returns: + Self for method chaining. + + Example: + ```python + from portia.builder import PlanBuilderV2 + from portia.builder.invoke_tool_step import InvokeToolStep + from portia.builder.reference import StepOutput + + builder = PlanBuilderV2() + builder.add_parallel( + steps=[ + InvokeToolStep(tool="search_tool", step_name="search1", + args={"query": "Python"}), + InvokeToolStep(tool="search_tool", step_name="search2", + args={"query": "JavaScript"}), + ], + step_name="parallel_search" + ) + # Reference first result: StepOutput("parallel_search", path="0") + # Reference second result: StepOutput("parallel_search", path="1") + ``` + + """ + self.plan.steps.append( + ParallelStep( + steps=steps, + step_name=step_name or default_step_name(len(self.plan.steps)), + conditional_block=self._current_conditional_block, + loop_block=self._current_loop_block, + ) + ) + return self + def exit( self, *, diff --git a/tests/integration/test_plan_v2.py b/tests/integration/test_plan_v2.py index 6841c253..9cd5f034 100644 --- a/tests/integration/test_plan_v2.py +++ b/tests/integration/test_plan_v2.py @@ -9,6 +9,7 @@ from pydantic import BaseModel, Field from portia import Config, LogLevel, Portia +from portia.builder.invoke_tool_step import InvokeToolStep from portia.builder.plan_builder_v2 import PlanBuilderError, PlanBuilderV2 from portia.builder.reference import Input, StepOutput from portia.clarification import UserVerificationClarification @@ -2004,3 +2005,124 @@ def before_tool_call_execution_hook( # Verify step has summary assert weather_step_output.get_summary() is not None + + +@pytest.mark.asyncio +async def test_parallel_step_concurrent_execution() -> None: + """Test parallel step executes multiple independent steps concurrently. + + This test demonstrates the concurrent execution of two independent tool invocations + and validates the correctness of the aggregated output. The test uses search tools + to fetch different information in parallel and then uses the results. + """ + config = Config.from_default( + default_log_level=LogLevel.DEBUG, + storage_class=StorageClass.MEMORY, + ) + + portia = Portia(config=config) + + # Create a plan with parallel steps + plan = ( + PlanBuilderV2("Search for Python and JavaScript information in parallel") + .add_parallel( + steps=[ + InvokeToolStep( + tool="search_tool", + step_name="search_python", + args={"search_query": "What is Python programming language?"}, + ), + InvokeToolStep( + tool="search_tool", + step_name="search_javascript", + args={"search_query": "What is JavaScript programming language?"}, + ), + ], + step_name="parallel_search", + ) + .llm_step( + task="Compare the two programming languages based on the search results", + inputs=[ + StepOutput("parallel_search", path="0"), + StepOutput("parallel_search", path="1"), + ], + step_name="compare_languages", + ) + .build() + ) + + plan_run = await portia.arun_plan(plan) + + assert plan_run.state == PlanRunState.COMPLETE + assert plan_run.outputs.final_output is not None + + # Verify the parallel step output exists and is a list + parallel_output = plan_run.outputs.step_outputs["$parallel_search_output"] + assert parallel_output is not None + parallel_value = parallel_output.get_value() + assert isinstance(parallel_value, list) + assert len(parallel_value) == 2 + + # Verify both search results contain relevant content + python_result = parallel_value[0] + javascript_result = parallel_value[1] + + assert isinstance(python_result, str) + assert isinstance(javascript_result, str) + assert len(python_result) > 0 + assert len(javascript_result) > 0 + + # The results should mention the respective languages + python_lower = python_result.lower() + javascript_lower = javascript_result.lower() + assert "python" in python_lower + assert "javascript" in javascript_lower or "js" in javascript_lower + + # Verify the comparison step used the parallel outputs + comparison_output = plan_run.outputs.step_outputs["$compare_languages_output"] + assert comparison_output is not None + comparison_value = comparison_output.get_value() + assert isinstance(comparison_value, str) + assert len(comparison_value) > 0 + + # The comparison should mention both languages + comparison_lower = comparison_value.lower() + assert "python" in comparison_lower + assert "javascript" in comparison_lower or "js" in comparison_lower + + +@pytest.mark.asyncio +async def test_parallel_step_with_error_handling() -> None: + """Test that parallel step handles errors correctly by cancelling other tasks.""" + config = Config.from_default( + default_log_level=LogLevel.DEBUG, + storage_class=StorageClass.MEMORY, + ) + + portia = Portia(config=config) + + # Create a plan where one parallel step will fail + plan = ( + PlanBuilderV2("Test parallel execution with error") + .add_parallel( + steps=[ + InvokeToolStep( + tool="search_tool", + step_name="valid_search", + args={"search_query": "Valid query"}, + ), + InvokeToolStep( + tool="nonexistent_tool", # This will fail + step_name="failing_step", + args={"arg": "value"}, + ), + ], + step_name="parallel_with_error", + ) + .build() + ) + + plan_run = await portia.arun_plan(plan) + + # The plan should fail due to the error in one of the parallel steps + assert plan_run.state == PlanRunState.FAILED diff --git a/tests/unit/builder/test_parallel_step.py b/tests/unit/builder/test_parallel_step.py new file mode 100644 index 00000000..aacf91e7 --- /dev/null +++ b/tests/unit/builder/test_parallel_step.py @@ -0,0 +1,275 @@ +"""Test the parallel step.""" + +from __future__ import annotations + +import asyncio +from typing import Any +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from portia.builder.conditionals import ConditionalBlock +from portia.builder.invoke_tool_step import InvokeToolStep +from portia.builder.llm_step import LLMStep +from portia.builder.loops import LoopBlock +from portia.builder.parallel_step import ParallelStep +from portia.execution_agents.output import LocalDataValue +from portia.plan import Step as PlanStep + + +def test_parallel_step_initialization() -> None: + """Test ParallelStep initialization with child steps.""" + step1 = InvokeToolStep(tool="tool1", step_name="step1") + step2 = InvokeToolStep(tool="tool2", step_name="step2") + + parallel_step = ParallelStep(steps=[step1, step2], step_name="parallel_execution") + + assert parallel_step.step_name == "parallel_execution" + assert len(parallel_step.steps) == 2 + assert parallel_step.steps[0] is step1 + assert parallel_step.steps[1] is step2 + + +def test_parallel_step_str() -> None: + """Test ParallelStep str method.""" + step1 = InvokeToolStep(tool="tool1", step_name="step1") + step2 = LLMStep(task="test task", step_name="step2") + + parallel_step = ParallelStep(steps=[step1, step2], step_name="parallel_execution") + + result = str(parallel_step) + assert "ParallelStep" in result + assert "InvokeToolStep" in result + assert "LLMStep" in result + + +@pytest.mark.asyncio +async def test_parallel_step_run_success() -> None: + """Test ParallelStep run with successful execution of all child steps.""" + # Create mock steps + step1 = Mock() + step1.run = AsyncMock(return_value="result1") + + step2 = Mock() + step2.run = AsyncMock(return_value="result2") + + step3 = Mock() + step3.run = AsyncMock(return_value="result3") + + parallel_step = ParallelStep(steps=[step1, step2, step3], step_name="parallel_execution") + + mock_run_data = Mock() + + result = await parallel_step.run(mock_run_data) + + assert result == ["result1", "result2", "result3"] + step1.run.assert_called_once_with(mock_run_data) + step2.run.assert_called_once_with(mock_run_data) + step3.run.assert_called_once_with(mock_run_data) + + +@pytest.mark.asyncio +async def test_parallel_step_run_with_real_steps() -> None: + """Test ParallelStep run with real InvokeToolStep instances.""" + # Create real steps but with mocked execution + step1 = InvokeToolStep(tool="tool1", step_name="step1", args={"arg1": "value1"}) + step2 = InvokeToolStep(tool="tool2", step_name="step2", args={"arg2": "value2"}) + + parallel_step = ParallelStep(steps=[step1, step2], step_name="parallel_execution") + + mock_run_data = Mock() + mock_tool1 = Mock() + mock_tool1.structured_output_schema = None + mock_output1 = Mock() + mock_output1.get_value.return_value = "tool1_result" + mock_tool1._arun = AsyncMock(return_value=mock_output1) + + mock_tool2 = Mock() + mock_tool2.structured_output_schema = None + mock_output2 = Mock() + mock_output2.get_value.return_value = "tool2_result" + mock_tool2._arun = AsyncMock(return_value=mock_output2) + + with ( + patch("portia.builder.invoke_tool_step.ToolCallWrapper.from_tool_id") as mock_get_tool, + patch.object(mock_run_data, "get_tool_run_ctx") as mock_get_tool_run_ctx, + ): + + def get_tool_side_effect(tool_id: str, *args: Any, **kwargs: Any) -> Any: # noqa: ANN401, ARG001 + if tool_id == "tool1": + return mock_tool1 + return mock_tool2 + + mock_get_tool.side_effect = get_tool_side_effect + mock_tool_ctx = Mock() + mock_get_tool_run_ctx.return_value = mock_tool_ctx + + result = await parallel_step.run(mock_run_data) + + assert result == ["tool1_result", "tool2_result"] + assert mock_tool1._arun.call_count == 1 + assert mock_tool2._arun.call_count == 1 + + +@pytest.mark.asyncio +async def test_parallel_step_run_with_failure() -> None: + """Test ParallelStep run when one child step fails.""" + # Create mock steps where one fails + step1 = Mock() + step1.run = AsyncMock(return_value="result1") + + step2 = Mock() + step2.run = AsyncMock(side_effect=RuntimeError("Step 2 failed")) + + step3 = Mock() + + # This step should be cancelled + async def slow_step(run_data: Any) -> str: # noqa: ANN401, ARG001 + await asyncio.sleep(10) + return "result3" + + step3.run = slow_step + + parallel_step = ParallelStep(steps=[step1, step2, step3], step_name="parallel_execution") + + mock_run_data = Mock() + + with pytest.raises(RuntimeError, match="Step 2 failed"): + await parallel_step.run(mock_run_data) + + # Verify step1 was called + step1.run.assert_called_once_with(mock_run_data) + step2.run.assert_called_once_with(mock_run_data) + # Step3 should have been started but may have been cancelled + + +@pytest.mark.asyncio +async def test_parallel_step_run_empty_steps() -> None: + """Test ParallelStep run with no child steps.""" + parallel_step = ParallelStep(steps=[], step_name="empty_parallel") + + mock_run_data = Mock() + + result = await parallel_step.run(mock_run_data) + + assert result == [] + + +@pytest.mark.asyncio +async def test_parallel_step_run_single_step() -> None: + """Test ParallelStep run with a single child step.""" + step1 = Mock() + step1.run = AsyncMock(return_value="single_result") + + parallel_step = ParallelStep(steps=[step1], step_name="single_parallel") + + mock_run_data = Mock() + + result = await parallel_step.run(mock_run_data) + + assert result == ["single_result"] + step1.run.assert_called_once_with(mock_run_data) + + +@pytest.mark.asyncio +async def test_parallel_step_run_preserves_order() -> None: + """Test that ParallelStep preserves the order of outputs regardless of completion time.""" + + # Create steps that complete in reverse order + async def fast_step(run_data: Any) -> str: # noqa: ANN401, ARG001 + await asyncio.sleep(0.001) + return "fast" + + async def slow_step(run_data: Any) -> str: # noqa: ANN401, ARG001 + await asyncio.sleep(0.005) + return "slow" + + async def medium_step(run_data: Any) -> str: # noqa: ANN401, ARG001 + await asyncio.sleep(0.003) + return "medium" + + step1 = Mock() + step1.run = slow_step + + step2 = Mock() + step2.run = medium_step + + step3 = Mock() + step3.run = fast_step + + parallel_step = ParallelStep(steps=[step1, step2, step3], step_name="ordered_parallel") + + mock_run_data = Mock() + + result = await parallel_step.run(mock_run_data) + + # Results should be in the order of steps, not completion order + assert result == ["slow", "medium", "fast"] + + +def test_parallel_step_to_legacy_step() -> None: + """Test ParallelStep to_legacy_step method.""" + step1 = InvokeToolStep(tool="tool1", step_name="step1") + step2 = LLMStep(task="test task", step_name="step2") + + parallel_step = ParallelStep(steps=[step1, step2], step_name="parallel_execution") + + mock_plan = Mock() + mock_plan.step_output_name.return_value = "$parallel_execution_output" + + legacy_step = parallel_step.to_legacy_step(mock_plan) + + assert isinstance(legacy_step, PlanStep) + assert "Execute 2 steps in parallel" in legacy_step.task + assert legacy_step.output == "$parallel_execution_output" + assert legacy_step.inputs == [] + + +@pytest.mark.asyncio +async def test_parallel_step_with_local_data_value_outputs() -> None: + """Test ParallelStep run when child steps return LocalDataValue.""" + step1 = Mock() + step1.run = AsyncMock(return_value=LocalDataValue(value="value1", summary="Summary 1")) + + step2 = Mock() + step2.run = AsyncMock(return_value=LocalDataValue(value="value2", summary="Summary 2")) + + parallel_step = ParallelStep(steps=[step1, step2], step_name="parallel_with_local_data") + + mock_run_data = Mock() + + result = await parallel_step.run(mock_run_data) + + assert len(result) == 2 + assert isinstance(result[0], LocalDataValue) + assert result[0].value == "value1" + assert result[0].summary == "Summary 1" + assert isinstance(result[1], LocalDataValue) + assert result[1].value == "value2" + assert result[1].summary == "Summary 2" + + +def test_parallel_step_with_conditional_and_loop_blocks() -> None: + """Test ParallelStep initialization with conditional and loop blocks.""" + step1 = InvokeToolStep(tool="tool1", step_name="step1") + step2 = InvokeToolStep(tool="tool2", step_name="step2") + + conditional_block = ConditionalBlock( + clause_step_indexes=[0, 5], + parent_conditional_block=None, + ) + + loop_block = LoopBlock( + start_step_index=0, + end_step_index=10, + ) + + parallel_step = ParallelStep( + steps=[step1, step2], + step_name="parallel_in_control_flow", + conditional_block=conditional_block, + loop_block=loop_block, + ) + + assert parallel_step.conditional_block is conditional_block + assert parallel_step.loop_block is loop_block