Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions portia/plan_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@
- **ExecutionContext**: Provides contextual metadata useful for logging and performance analysis.
"""

from __future__ import annotations

from pydantic import BaseModel, ConfigDict, Field

from portia.clarification import (
Clarification,
ClarificationCategory,
ClarificationListType,
)
from portia.common import PortiaEnum
from portia.execution_agents.output import LocalDataValue, Output
from portia.prefixed_uuid import PlanRunUUID, PlanUUID

# Import TYPE_CHECKING to avoid circular imports
from typing import TYPE_CHECKING

if TYPE_CHECKING:

Check failure on line 34 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

portia/plan_run.py:18:1: I001 Import block is un-sorted or un-formatted
from portia.builder.plan_v2 import PlanV2
from portia.config import Config
from portia.end_user import EndUser


class PlanRunState(PortiaEnum):
"""The current state of the Plan Run.
Expand Down Expand Up @@ -198,6 +206,135 @@
)


class PlanRunV2(BaseModel):
"""Consolidated plan run data structure for the V2 migration.

This class consolidates plan execution data into a single structure,
containing all fields needed for plan execution and management.

Attributes:

Check failure on line 215 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D413)

portia/plan_run.py:215:5: D413 Missing blank line after last section ("Attributes")
id (PlanRunUUID): A unique ID for this plan run.
state (PlanRunState): The current state of the PlanRun.
current_step_index (int): The current step that is being executed.
plan (PlanV2 | None): The PlanV2 instance being executed (V2 only).
end_user (EndUser): The end user executing the plan.
step_output_values (list[LocalDataValue]): Step output values from execution.
final_output (Output | None): The final consolidated output.
plan_run_inputs (dict[str, LocalDataValue]): Plan input values.
config (Config): The Portia configuration.
"""

model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)

id: PlanRunUUID = Field(
default_factory=PlanRunUUID,
description="A unique ID for this plan run.",
)
state: PlanRunState = Field(
default=PlanRunState.NOT_STARTED,
description="The current state of the PlanRun.",
)
current_step_index: int = Field(
default=0,
description="The current step that is being executed",
)
plan: "PlanV2 | None" = Field(
default=None,
description="The PlanV2 instance being executed (V2 only).",
)
end_user: "EndUser" = Field(
description="The end user executing the plan."
)
step_output_values: list["LocalDataValue"] = Field(
default_factory=list,
description="Step output values from execution.",
)
final_output: Output | None = Field(
default=None,
description="The final consolidated output of the PlanRun if available.",
)
plan_run_inputs: dict[str, "LocalDataValue"] = Field(
default_factory=dict,
description="Dict mapping plan input names to their values.",
)
config: "Config" = Field(
description="The Portia configuration."
)

# Legacy compatibility fields for migration
_legacy_outputs: PlanRunOutputs = Field(
default=PlanRunOutputs(),
description="Legacy outputs structure for compatibility.",
exclude=True,
)

def get_outstanding_clarifications(self) -> ClarificationListType:
"""Return all outstanding clarifications.

Returns:

Check failure on line 274 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D413)

portia/plan_run.py:274:9: D413 Missing blank line after last section ("Returns")
ClarificationListType: A list of outstanding clarifications that have not been resolved.
"""
return [
clarification
for clarification in self._legacy_outputs.clarifications
if not clarification.resolved
]

def get_clarifications_for_step(self, step: int | None = None) -> ClarificationListType:
"""Return clarifications for the given step.

Args:
step (int | None): the step to get clarifications for. Defaults to current step.

Returns:

Check failure on line 289 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D413)

portia/plan_run.py:289:9: D413 Missing blank line after last section ("Returns")
ClarificationListType: A list of clarifications for the given step.
"""
if step is None:
step = self.current_step_index
return [
clarification
for clarification in self._legacy_outputs.clarifications
if clarification.step == step
]

def get_clarification_for_step(
self, category: ClarificationCategory, step: int | None = None
) -> Clarification | None:
"""Return a clarification of the given category for the given step if it exists.

Args:

Check failure on line 305 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D413)

portia/plan_run.py:305:9: D413 Missing blank line after last section ("Args")
step (int | None): the step to get a clarification for. Defaults to current step.
category (ClarificationCategory | None): the category of the clarification to get.
"""
if step is None:
step = self.current_step_index
return next(
(
clarification
for clarification in self._legacy_outputs.clarifications
if clarification.step == step and clarification.category == category
),
None,
)

def get_potential_step_inputs(self) -> dict[str, Output]:
"""Return a dictionary of potential step inputs for future steps."""
return self._legacy_outputs.step_outputs | self.plan_run_inputs

def __str__(self) -> str:
"""Return the string representation of the PlanRunV2.

Returns:

Check failure on line 327 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D413)

portia/plan_run.py:327:9: D413 Missing blank line after last section ("Returns")
str: A string representation containing key run attributes.
"""
plan_id = self.plan.id if self.plan else "unknown"
return (
f"PlanRunV2(id={self.id}, plan_id={plan_id}, "
f"state={self.state}, current_step_index={self.current_step_index}, "
f"final_output={'set' if self.final_output else 'unset'})"
)


class ReadOnlyPlanRun(PlanRun):
"""A read-only copy of a Plan Run passed to agents for reference.

Expand Down Expand Up @@ -228,3 +365,63 @@
plan_run_inputs=plan_run.plan_run_inputs,
structured_output_schema=plan_run.structured_output_schema,
)


# Migration helpers for transitioning between old and new structures
def migrate_plan_run_to_v2(
legacy_plan_run: PlanRun,
plan_v2: "PlanV2 | None",

Check failure on line 373 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP037)

portia/plan_run.py:373:14: UP037 Remove quotes from type annotation
end_user: "EndUser",

Check failure on line 374 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP037)

portia/plan_run.py:374:15: UP037 Remove quotes from type annotation
config: "Config"

Check failure on line 375 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP037)

portia/plan_run.py:375:13: UP037 Remove quotes from type annotation
) -> "PlanRunV2":

Check failure on line 376 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP037)

portia/plan_run.py:376:6: UP037 Remove quotes from type annotation
"""Convert a legacy PlanRun to PlanRunV2.

Args:
legacy_plan_run: The legacy PlanRun instance to convert.
plan_v2: The PlanV2 instance if available.
end_user: The EndUser instance.
config: The Config instance.

Returns:
PlanRunV2: The converted PlanRunV2 instance.
"""
# Convert legacy step outputs to step_output_values
step_output_values = []
for output in legacy_plan_run.outputs.step_outputs.values():
if hasattr(output, 'value'):
step_output_values.append(LocalDataValue(value=output.value))

return PlanRunV2(
id=legacy_plan_run.id,
state=legacy_plan_run.state,
current_step_index=legacy_plan_run.current_step_index,
plan=plan_v2,
end_user=end_user,
step_output_values=step_output_values,
final_output=legacy_plan_run.outputs.final_output,
plan_run_inputs=legacy_plan_run.plan_run_inputs,
config=config,
_legacy_outputs=legacy_plan_run.outputs,
)


def migrate_v2_to_plan_run(plan_run_v2: "PlanRunV2") -> PlanRun:
"""Convert a PlanRunV2 back to legacy PlanRun for compatibility.

Args:
plan_run_v2: The PlanRunV2 instance to convert.

Returns:
PlanRun: The converted legacy PlanRun instance.
"""
plan_id = plan_run_v2.plan.id if plan_run_v2.plan else "unknown"

return PlanRun(
id=plan_run_v2.id,
plan_id=plan_id,
current_step_index=plan_run_v2.current_step_index,
state=plan_run_v2.state,
end_user_id=plan_run_v2.end_user.external_id,
outputs=plan_run_v2._legacy_outputs,
plan_run_inputs=plan_run_v2.plan_run_inputs,
)
115 changes: 114 additions & 1 deletion portia/run_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from portia.end_user import EndUser
from portia.execution_hooks import ExecutionHooks
from portia.plan import Plan
from portia.plan_run import PlanRun
from portia.plan_run import PlanRun, PlanRunV2
from portia.storage import Storage
from portia.telemetry.telemetry_service import BaseProductTelemetry
from portia.tool import ToolRunContext
Expand Down Expand Up @@ -52,3 +52,116 @@ def get_tool_run_ctx(self) -> ToolRunContext:
config=self.config,
clarifications=self.plan_run.get_clarifications_for_step(),
)


class RunContextV2(BaseModel):
"""Updated context for PlanV2 runs using consolidated data structures.

This is the V2 version of RunContext that uses the new consolidated
PlanRunV2 structure instead of separate plan/legacy_plan/plan_run fields.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

plan_run: PlanRunV2 = Field(description="The consolidated plan run instance.")
storage: Storage = Field(description="The Portia storage.")
tool_registry: ToolRegistry = Field(description="The Portia tool registry.")
execution_hooks: ExecutionHooks = Field(description="The Portia execution hooks.")
telemetry: BaseProductTelemetry = Field(description="The Portia telemetry service.")

def get_tool_run_ctx(self) -> ToolRunContext:
"""Get the tool run context for V2."""
# Create a legacy plan if needed for backwards compatibility
legacy_plan = None
if self.plan_run.plan:
from portia.plan import PlanContext
# Create a minimal plan context for legacy compatibility
plan_context = PlanContext(query="", tool_registry=self.tool_registry)
legacy_plan = self.plan_run.plan.to_legacy_plan(plan_context)

# Create a legacy PlanRun for ToolRunContext compatibility
legacy_plan_run = PlanRun(
id=self.plan_run.id,
plan_id=self.plan_run.plan.id if self.plan_run.plan else "unknown",
current_step_index=self.plan_run.current_step_index,
state=self.plan_run.state,
end_user_id=self.plan_run.end_user.external_id,
outputs=self.plan_run._legacy_outputs,
plan_run_inputs=self.plan_run.plan_run_inputs,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: PlanRun Constructor Receives Incorrect Type

The PlanRun constructor's plan_id field expects a PlanUUID. However, in RunContextV2.get_tool_run_ctx and migrate_v2_to_plan_run, it's passed the string "unknown" when the PlanV2 object is None. This type mismatch could lead to validation errors.

Additional Locations (1)

Fix in Cursor Fix in Web


return ToolRunContext(
end_user=self.plan_run.end_user,
plan_run=legacy_plan_run,
plan=legacy_plan,
config=self.plan_run.config,
clarifications=self.plan_run.get_clarifications_for_step(),
)


# Migration helpers for RunContext structures
def migrate_run_context_to_v2(
legacy_context: RunContext,
) -> RunContextV2:
"""Convert a legacy RunContext to RunContextV2.

Args:
legacy_context: The legacy RunContext instance to convert.

Returns:
RunContextV2: The converted RunContextV2 instance.
"""
from portia.plan_run import migrate_plan_run_to_v2

# Convert RunContext to RunContextV2 by migrating the PlanRun
plan_run_v2 = migrate_plan_run_to_v2(
legacy_plan_run=legacy_context.plan_run,
plan_v2=legacy_context.plan,
end_user=legacy_context.end_user,
config=legacy_context.config,
)

return RunContextV2(
plan_run=plan_run_v2,
storage=legacy_context.storage,
tool_registry=legacy_context.tool_registry,
execution_hooks=legacy_context.execution_hooks,
telemetry=legacy_context.telemetry,
)


def migrate_v2_to_run_context(
context_v2: RunContextV2,
) -> RunContext:
"""Convert a RunContextV2 back to legacy RunContext for compatibility.

Args:
context_v2: The RunContextV2 instance to convert.

Returns:
RunContext: The converted legacy RunContext instance.
"""
from portia.plan_run import migrate_v2_to_plan_run

# Convert back to legacy structures
legacy_plan_run = migrate_v2_to_plan_run(context_v2.plan_run)

# Create legacy plan if available
legacy_plan = None
if context_v2.plan_run.plan:
from portia.plan import PlanContext
plan_context = PlanContext(query="", tool_registry=context_v2.tool_registry)
legacy_plan = context_v2.plan_run.plan.to_legacy_plan(plan_context)

return RunContext(
plan=context_v2.plan_run.plan if context_v2.plan_run.plan else None,
legacy_plan=legacy_plan,
plan_run=legacy_plan_run,
end_user=context_v2.plan_run.end_user,
step_output_values=context_v2.plan_run.step_output_values,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: RunContext Validation Fails Due to Type Mismatches

The migrate_v2_to_run_context function has type and nullability mismatches. It assigns None to the non-nullable RunContext.plan field, causing a Pydantic validation error. Additionally, the step_output_values field receives an incompatible list type (list[LocalDataValue] instead of list[StepOutputValue]), which will cause runtime type errors.

Fix in Cursor Fix in Web

config=context_v2.plan_run.config,
storage=context_v2.storage,
tool_registry=context_v2.tool_registry,
execution_hooks=context_v2.execution_hooks,
telemetry=context_v2.telemetry,
)
Loading
Loading