Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions portia/builder/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
"""Builder module for constructing Portia plans."""

from portia.builder.parallel_step import ParallelStep
from portia.builder.plan_builder_v2 import PlanBuilderV2

__all__ = ["ParallelStep", "PlanBuilderV2"]
101 changes: 101 additions & 0 deletions portia/builder/parallel_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Implementation of the parallel step for concurrent task execution."""

from __future__ import annotations

import asyncio
import sys
from typing import TYPE_CHECKING, Any

if sys.version_info >= (3, 12):
from typing import override
else:
from typing_extensions import override # pragma: no cover

from langsmith import traceable
from pydantic import Field

from portia.builder.step_v2 import StepV2
from portia.plan import Step

if TYPE_CHECKING:
from portia.builder.plan_v2 import PlanV2
from portia.run_context import RunContext


class ParallelStep(StepV2):
"""A step that executes multiple child steps concurrently.

This step allows developers to define a block of steps that should be executed
in parallel rather than sequentially. This is useful for performance optimization
when multiple independent tasks (e.g., calling different tools, running multiple
LLM queries) can be performed simultaneously.

The output of a ParallelStep is a list containing the outputs of its child steps,
in the order they were provided. Subsequent steps can reference the outputs of
individual child steps via indexing (e.g., StepOutput("parallel_step", path="0")).

If any child step fails, all other concurrently running steps are cancelled,
and the ParallelStep as a whole fails, propagating the error.
"""

steps: list[StepV2] = Field(
description="List of steps to execute in parallel. These steps should be independent "
"with no data dependencies on one another."
)

def __str__(self) -> str:
"""Return a description of this step for logging purposes."""
step_types = [step.__class__.__name__ for step in self.steps]
return f"ParallelStep(steps=[{', '.join(step_types)}])"

@override
@traceable(name="Parallel Step - Run")
async def run(self, run_data: RunContext) -> list[Any]: # pyright: ignore[reportIncompatibleMethodOverride] - needed due to Langsmith decorator
"""Execute all child steps concurrently and return their outputs as a list.

Uses asyncio.gather to run all child steps in parallel. If any step fails,
all other steps are cancelled and the exception is propagated.

Args:
run_data: The runtime context containing step outputs, inputs, and other
execution data.

Returns:
A list of outputs from each child step, in the order they were provided.

Raises:
Exception: If any child step fails during execution.

"""
# Create tasks for all child steps
tasks = [step.run(run_data) for step in self.steps]

# Execute all tasks concurrently
# If any task fails, gather will cancel remaining tasks and raise the exception
results = await asyncio.gather(*tasks)
return list(results)

@override
def to_legacy_step(self, plan: PlanV2) -> Step:
"""Convert this ParallelStep to a legacy Step.

This creates a legacy step representation primarily for dashboard display.

Args:
plan: The PlanV2 instance containing this step.

Returns:
A legacy Step object representing this parallel step.

"""
child_step_descriptions = [str(step) for step in self.steps]
task_description = (
f"Execute {len(self.steps)} steps in parallel: {', '.join(child_step_descriptions)}"
)

return Step(
task=task_description,
inputs=[],
output=plan.step_output_name(self),
condition=self._get_legacy_condition(plan),
)
61 changes: 61 additions & 0 deletions portia/builder/plan_builder_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from portia.builder.llm_step import LLMStep
from portia.builder.loop_step import LoopStep
from portia.builder.loops import LoopBlock, LoopStepType, LoopType
from portia.builder.parallel_step import ParallelStep
from portia.builder.plan_v2 import PlanV2
from portia.builder.react_agent_step import ReActAgentStep
from portia.builder.reference import Reference, default_step_name
Expand Down Expand Up @@ -779,6 +780,66 @@ def add_sub_plan(

return self

def add_parallel(
self,
steps: list[StepV2],
step_name: str | None = None,
) -> PlanBuilderV2:
"""Add a parallel execution step to the plan.

This creates a step that executes multiple child steps concurrently using
asyncio.gather. All child steps are executed in parallel, and their outputs
are collected into a list. This is useful for performance optimization when
you have independent operations that can run simultaneously.

The output of the parallel step is a list containing the outputs of all child
steps in the order they were provided. You can reference individual outputs
using path notation: StepOutput("parallel_step_name", path="0") for the first
child step's output, path="1" for the second, and so on.

If any child step fails during execution, all other running steps are cancelled
and the parallel step fails, propagating the error.

Args:
steps: List of fully defined StepV2 instances to execute in parallel.
These steps should be independent with no data dependencies on each other.
step_name: Optional explicit name for the step. This allows its output to be
referenced via StepOutput("name_of_step") rather than by index.

Returns:
Self for method chaining.

Example:
```python
from portia.builder import PlanBuilderV2
from portia.builder.invoke_tool_step import InvokeToolStep
from portia.builder.reference import StepOutput

builder = PlanBuilderV2()
builder.add_parallel(
steps=[
InvokeToolStep(tool="search_tool", step_name="search1",
args={"query": "Python"}),
InvokeToolStep(tool="search_tool", step_name="search2",
args={"query": "JavaScript"}),
],
step_name="parallel_search"
)
# Reference first result: StepOutput("parallel_search", path="0")
# Reference second result: StepOutput("parallel_search", path="1")
```

"""
self.plan.steps.append(
ParallelStep(
steps=steps,
step_name=step_name or default_step_name(len(self.plan.steps)),
conditional_block=self._current_conditional_block,
loop_block=self._current_loop_block,
)
)
return self

def exit(
self,
*,
Expand Down
122 changes: 122 additions & 0 deletions tests/integration/test_plan_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pydantic import BaseModel, Field

from portia import Config, LogLevel, Portia
from portia.builder.invoke_tool_step import InvokeToolStep
from portia.builder.plan_builder_v2 import PlanBuilderError, PlanBuilderV2
from portia.builder.reference import Input, StepOutput
from portia.clarification import UserVerificationClarification
Expand Down Expand Up @@ -2004,3 +2005,124 @@ def before_tool_call_execution_hook(

# Verify step has summary
assert weather_step_output.get_summary() is not None


@pytest.mark.asyncio
async def test_parallel_step_concurrent_execution() -> None:
"""Test parallel step executes multiple independent steps concurrently.

This test demonstrates the concurrent execution of two independent tool invocations
and validates the correctness of the aggregated output. The test uses search tools
to fetch different information in parallel and then uses the results.
"""
config = Config.from_default(
default_log_level=LogLevel.DEBUG,
storage_class=StorageClass.MEMORY,
)

portia = Portia(config=config)

# Create a plan with parallel steps
plan = (
PlanBuilderV2("Search for Python and JavaScript information in parallel")
.add_parallel(
steps=[
InvokeToolStep(
tool="search_tool",
step_name="search_python",
args={"search_query": "What is Python programming language?"},
),
InvokeToolStep(
tool="search_tool",
step_name="search_javascript",
args={"search_query": "What is JavaScript programming language?"},
),
],
step_name="parallel_search",
)
.llm_step(
task="Compare the two programming languages based on the search results",
inputs=[
StepOutput("parallel_search", path="0"),
StepOutput("parallel_search", path="1"),
],
step_name="compare_languages",
)
.build()
)

plan_run = await portia.arun_plan(plan)

assert plan_run.state == PlanRunState.COMPLETE
assert plan_run.outputs.final_output is not None

# Verify the parallel step output exists and is a list
parallel_output = plan_run.outputs.step_outputs["$parallel_search_output"]
assert parallel_output is not None
parallel_value = parallel_output.get_value()
assert isinstance(parallel_value, list)
assert len(parallel_value) == 2

# Verify both search results contain relevant content
python_result = parallel_value[0]
javascript_result = parallel_value[1]

assert isinstance(python_result, str)
assert isinstance(javascript_result, str)
assert len(python_result) > 0
assert len(javascript_result) > 0

# The results should mention the respective languages
python_lower = python_result.lower()
javascript_lower = javascript_result.lower()
assert "python" in python_lower
assert "javascript" in javascript_lower or "js" in javascript_lower

# Verify the comparison step used the parallel outputs
comparison_output = plan_run.outputs.step_outputs["$compare_languages_output"]
assert comparison_output is not None
comparison_value = comparison_output.get_value()
assert isinstance(comparison_value, str)
assert len(comparison_value) > 0

# The comparison should mention both languages
comparison_lower = comparison_value.lower()
assert "python" in comparison_lower
assert "javascript" in comparison_lower or "js" in comparison_lower


@pytest.mark.asyncio
async def test_parallel_step_with_error_handling() -> None:
"""Test that parallel step handles errors correctly by cancelling other tasks."""
config = Config.from_default(
default_log_level=LogLevel.DEBUG,
storage_class=StorageClass.MEMORY,
)

portia = Portia(config=config)

# Create a plan where one parallel step will fail
plan = (
PlanBuilderV2("Test parallel execution with error")
.add_parallel(
steps=[
InvokeToolStep(
tool="search_tool",
step_name="valid_search",
args={"search_query": "Valid query"},
),
InvokeToolStep(
tool="nonexistent_tool", # This will fail
step_name="failing_step",
args={"arg": "value"},
),
],
step_name="parallel_with_error",
)
.build()
)

plan_run = await portia.arun_plan(plan)

# The plan should fail due to the error in one of the parallel steps
assert plan_run.state == PlanRunState.FAILED
Loading
Loading