Agent: : Feat: Implement a 'Parallel' step in the Plan Builder DSL for concurrent task execution #829
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Original Task
Summary
This story involves implementing a new
Parallel
step type in the Plan Builder DSL. This will allow developers to define a block of steps that should be executed concurrently, rather than sequentially. This is a critical performance enhancement for workflows where multiple independent tasks (e.g., calling different tools, running multiple LLM queries) can be performed simultaneously. The implementation will leverage the SDK's async-first architecture to manage the concurrent execution of the child steps.Acceptance Criteria
ParallelStep
class is created in theportia/builder/
module, inheriting fromStepV2
.PlanBuilderV2
class is updated with a new method,add_parallel(steps: List[StepV2])
, that takes a list of fully defined steps and adds them to aParallelStep
.ParallelStep
and execute its child steps concurrently usingasyncio.gather
.ParallelStep
is a list containing the outputs of its child steps, in the order they were provided.ParallelStep
via indexing (e.g.,parallel_step.output[0]
,parallel_step.output[1]
).ParallelStep
fails, all other concurrently running steps within that block are cancelled, and theParallelStep
as a whole fails, propagating the error.tests/unit/builder/
to validate theParallelStep
's construction and its integration with thePlanBuilderV2
.tests/integration/test_plan_v2.py
that demonstrates the concurrent execution of at least two independent steps (e.g., twoInvokeToolStep
s) and validates the correctness of the aggregated output. This test must meet the 100% integration coverage requirement.Technical Considerations
asyncio.gather
to await all tasks concurrently.ParallelStep
itself will have anoutput
that is a list of the results from its child steps. The reference system must be able to handle a downstream step referencing an element from this list (e.g.,StepOutput(step_name="parallel_step_1", output_name="output[0]")
).asyncio.gather
call fails, the exception should be caught, and thegather
call should be cancelled, ensuring that other running tasks are terminated gracefully.ParallelStep
should be the sum of the estimated costs of all its child steps. Theplan_run.py
logic for cost and token tracking must be updated to correctly aggregate these values from the parallel execution.Dependencies or Assumptions
ParallelStep
are independent and have no data dependencies on one another. The DSL will not enforce this; it is the responsibility of the developer using the builder.asyncio
-based architecture for step execution.Additional Context
Here is a concise summary of the relevant context for the coding agent.
1. Relevant Technical Context
ParallelStep
class should be created in theportia/builder/
directory, inheriting fromStepV2
(defined inportia/builder/step_v2.py
). Existing step classes in this directory (e.g.,InvokeToolStep
,LLMStep
) serve as implementation patterns.add_parallel
method should be added to thePlanBuilderV2
class inportia/builder/plan_builder_v2.py
.portia/plan_run.py
. This file will need to be modified to add a handler for theParallelStep
type.StepOutput
) is defined inportia/builder/reference.py
. The implementation for indexed output access (output[0]
) must be compatible with this system.asyncio
. The use ofasyncio.gather
is the correct and idiomatic approach for this task.portia/plan_run.py
. The logic forParallelStep
should sum the costs of its child steps and be integrated here.2. Important Clarifications or Requirements from User Conversations
3. Relevant Information from Uploaded Documents
4. Architectural Considerations, Dependencies, and Integration Points
tests/unit/builder/
directory.tests/integration/test_plan_v2.py
.@pytest.mark.asyncio
.asyncio.gather
fails, the implementation must ensure all other sibling tasks are cancelled before propagating the exception.Agent Logs
Summary
Successfully implemented a new
ParallelStep
class in the Plan Builder DSL that enables concurrent execution of multiple independent steps. The implementation leverages Python'sasyncio.gather
to execute child steps in parallel, providing significant performance improvements for workflows with independent operations.Files Changed:
portia/builder/parallel_step.py
- New ParallelStep classportia/builder/plan_builder_v2.py
- Addedadd_parallel()
methodportia/builder/__init__.py
- Exported ParallelSteptests/unit/builder/test_parallel_step.py
- Comprehensive unit teststests/integration/test_plan_v2.py
- Integration tests for concurrent executionKey Features Added
1. ParallelStep Class (
portia/builder/parallel_step.py
)StepV2
following the existing architecture patternasyncio.gather
2. PlanBuilderV2.add_parallel() Method
StepV2
instances3. Output Referencing
StepOutput("parallel_step_name", path="0")
for first child outputStepOutput("parallel_step_name", path="1")
for second child outputpydash.get()
infrastructure for path resolution4. Error Handling
5. Legacy Plan Conversion
to_legacy_step()
for dashboard compatibilityTesting
Unit Tests (
tests/unit/builder/test_parallel_step.py
)Comprehensive test coverage including:
LocalDataValue
outputsTotal: 11 unit tests covering all critical functionality
Integration Tests (
tests/integration/test_plan_v2.py
)Added two integration tests:
test_parallel_step_concurrent_execution
:test_parallel_step_with_error_handling
:Code Quality
Performance Considerations
The implementation correctly uses
asyncio.gather
which:All acceptance criteria have been met: