Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 40 additions & 32 deletions portia/execution_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from pydantic import BaseModel, ConfigDict

from portia.builder.step_v2 import StepV2
from portia.clarification import (
Clarification,
ClarificationCategory,
Expand All @@ -18,8 +19,8 @@
from portia.errors import ToolHardError
from portia.execution_agents.output import Output
from portia.logger import logger
from portia.plan import Plan, Step

Check failure on line 22 in portia/execution_hooks.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

portia/execution_hooks.py:22:31: F401 `portia.plan.Step` imported but unused

Check failure on line 22 in portia/execution_hooks.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

portia/execution_hooks.py:22:25: F401 `portia.plan.Plan` imported but unused
from portia.plan_run import PlanRun
from portia.plan_run import PlanRun, PlanRunV2

Check failure on line 23 in portia/execution_hooks.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

portia/execution_hooks.py:23:29: F401 `portia.plan_run.PlanRun` imported but unused
from portia.tool import Tool


Expand Down Expand Up @@ -53,75 +54,71 @@
clarification_handler: ClarificationHandler | None = None
"""Handler for clarifications raised during execution."""

before_step_execution: Callable[[Plan, PlanRun, Step], BeforeStepExecutionOutcome] | None = None
before_step_execution: Callable[[PlanRunV2, StepV2], BeforeStepExecutionOutcome] | None = None
"""Called before executing each step.

Args:
plan: The plan being executed
plan_run: The current plan run
step: The step about to be executed
plan_run_v2: The V2 plan run being executed
step_v2: The V2 step about to be executed

Returns:
BeforeStepExecutionOutcome | None: Whether to continue with the step execution or skip it.
If None is returned, the default behaviour is to continue with the step execution.
"""

after_step_execution: Callable[[Plan, PlanRun, Step, Output], None] | None = None
after_step_execution: Callable[[PlanRunV2, StepV2, Output], None] | None = None
"""Called after executing each step.

When there's an error, this is called with the error as the output value.

Args:
plan: The plan being executed
plan_run: The current plan run
step: The step that was executed
plan_run_v2: The V2 plan run being executed
step_v2: The V2 step that was executed
output: The output from the step execution
"""

before_plan_run: Callable[[Plan, PlanRun], None] | None = None
before_plan_run: Callable[[PlanRunV2], None] | None = None
"""Called before executing the first step of the plan run.

Args:
plan: The plan being executed
plan_run: The current plan run
plan_run_v2: The V2 plan run being executed
"""

after_plan_run: Callable[[Plan, PlanRun, Output], None] | None = None
after_plan_run: Callable[[PlanRunV2, Output], None] | None = None
"""Called after executing the plan run.

This is not called if a clarification is raised, as it is expected that the plan
will be resumed after the clarification is handled.

Args:
plan: The plan that was executed
plan_run: The completed plan run
plan_run_v2: The V2 plan run that was executed
output: The final output from the plan execution
"""

before_tool_call: (
Callable[[Tool, dict[str, Any], PlanRun, Step], Clarification | None] | None
Callable[[Tool, dict[str, Any], PlanRunV2, StepV2], Clarification | None] | None
) = None
"""Called before the tool is called.

Args:
tool: The tool about to be called
args: The args for the tool call. These are mutable and so can be modified in place as
required.
plan_run: The current plan run
step: The step being executed
plan_run_v2: The V2 plan run being executed
step_v2: The V2 step being executed

Returns:
Clarification | None: A clarification to raise, or None to proceed with the tool call
"""

after_tool_call: Callable[[Tool, Any, PlanRun, Step], Clarification | None] | None = None
after_tool_call: Callable[[Tool, Any, PlanRunV2, StepV2], Clarification | None] | None = None
"""Called after the tool is called.

Args:
tool: The tool that was called
output: The output returned from the tool call
plan_run: The current plan run
step: The step being executed
plan_run_v2: The V2 plan run being executed
step_v2: The V2 step being executed

Returns:
Clarification | None: A clarification to raise, or None to proceed. If a clarification
Expand All @@ -135,8 +132,8 @@
def clarify_on_all_tool_calls(
tool: Tool,
args: dict[str, Any],
plan_run: PlanRun,
step: Step,
plan_run_v2: PlanRunV2,
step_v2: StepV2,
) -> Clarification | None:
"""Raise a clarification to check the user is happy with all tool calls before proceeding.

Expand All @@ -147,12 +144,12 @@
)
)
"""
return _clarify_on_tool_call_hook(tool, args, plan_run, step, tool_ids=None)
return _clarify_on_tool_call_hook(tool, args, plan_run_v2, step_v2, tool_ids=None)


def clarify_on_tool_calls(
tool: str | Tool | list[str] | list[Tool],
) -> Callable[[Tool, dict[str, Any], PlanRun, Step], Clarification | None]:
) -> Callable[[Tool, dict[str, Any], PlanRunV2, StepV2], Clarification | None]:
"""Return a hook that raises a clarification before calls to the specified tool.

Args:
Expand Down Expand Up @@ -185,15 +182,15 @@
def _clarify_on_tool_call_hook(
tool: Tool,
args: dict[str, Any],
plan_run: PlanRun,
step: Step, # noqa: ARG001
plan_run_v2: PlanRunV2,
step_v2: StepV2, # noqa: ARG001
tool_ids: list[str] | None,
) -> Clarification | None:
"""Raise a clarification to check the user is happy with all tool calls before proceeding."""
if tool_ids and tool.id not in tool_ids:
return None

previous_clarification = plan_run.get_clarification_for_step(
previous_clarification = plan_run_v2.get_clarification_for_step(
ClarificationCategory.USER_VERIFICATION
)
serialised_args = (
Expand All @@ -202,7 +199,7 @@

if not previous_clarification or not previous_clarification.resolved:
return UserVerificationClarification(
plan_run_id=plan_run.id,
plan_run_id=plan_run_v2.id,
user_guidance=f"Are you happy to proceed with the call to {tool.name} with args "
f"{serialised_args}? Enter 'y' or 'yes' to proceed",
source="User verification tool hook",
Expand All @@ -217,8 +214,19 @@
return None


def log_step_outputs(plan: Plan, plan_run: PlanRun, step: Step, output: Output) -> None: # noqa: ARG001
"""Log the output of a step in the plan."""
def log_step_outputs(plan_run_v2: PlanRunV2, step_v2: StepV2, output: Output) -> None: # noqa: ARG001

Check failure on line 217 in portia/execution_hooks.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (RUF100)

portia/execution_hooks.py:217:89: RUF100 Unused `noqa` directive (unused: `ARG001`)
"""Log the output of a step in the plan.

Example usage:
portia = Portia(
execution_hooks=ExecutionHooks(
after_step_execution=log_step_outputs,
)
)
"""
# Convert to legacy step for logging purposes
legacy_step = step_v2.to_legacy_step(plan_run_v2.plan)
logger().info(
f"Step with task {step.task} using tool {step.tool_id} completed with result: {output}"
f"Step with task {legacy_step.task} using tool {legacy_step.tool_id} "
f"completed with result: {output}"
)
118 changes: 118 additions & 0 deletions portia/plan_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from pydantic import BaseModel, ConfigDict, Field

from portia.clarification import (
Expand All @@ -28,6 +30,11 @@
from portia.execution_agents.output import LocalDataValue, Output
from portia.prefixed_uuid import PlanRunUUID, PlanUUID

if TYPE_CHECKING:
from portia.builder.plan_v2 import PlanV2

Check failure on line 34 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (TC004)

portia/plan_run.py:34:40: TC004 Move import `portia.builder.plan_v2.PlanV2` out of type-checking block. Import is used for more than type hinting.
from portia.config import Config

Check failure on line 35 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (TC004)

portia/plan_run.py:35:31: TC004 Move import `portia.config.Config` out of type-checking block. Import is used for more than type hinting.
from portia.end_user import EndUser

Check failure on line 36 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (TC004)

portia/plan_run.py:36:33: TC004 Move import `portia.end_user.EndUser` out of type-checking block. Import is used for more than type hinting.


class PlanRunState(PortiaEnum):
"""The current state of the Plan Run.
Expand Down Expand Up @@ -228,3 +235,114 @@
plan_run_inputs=plan_run.plan_run_inputs,
structured_output_schema=plan_run.structured_output_schema,
)


class PlanRunV2(BaseModel):
"""A V2-native plan run that holds execution state for PlanV2.

This class provides a modern representation of plan execution state that works
directly with PlanV2 objects, eliminating the need for legacy Plan objects.

Attributes:
id: A unique ID for this plan run.
state: The current state of the plan run.
current_step_index: The current step being executed.
plan: The PlanV2 being executed.
end_user: The end user executing the plan.
step_output_values: List of step output values from executed steps.
final_output: The final output from the plan execution.
plan_run_inputs: Dict mapping plan input names to their values.
config: The Portia config.

"""

model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)

id: PlanRunUUID = Field(
default_factory=PlanRunUUID,
description="A unique ID for this plan_run.",
)
state: PlanRunState = Field(
default=PlanRunState.NOT_STARTED,
description="The current state of the PlanRun.",
)
current_step_index: int = Field(
default=0,
description="The current step that is being executed",
)
plan: PlanV2 = Field(
description="The PlanV2 being executed.",
)
end_user: EndUser = Field(
description="The end user executing the plan.",
)
step_output_values: list[Any] = Field(
default_factory=list,
description="List of step output values from executed steps.",
)
final_output: Output | None = Field(
default=None,
description="The final consolidated output of the plan execution.",
)
plan_run_inputs: dict[str, LocalDataValue] = Field(
default_factory=dict,
description="Dict mapping plan input names to their values.",
)
config: Config = Field(
description="The Portia config.",
)

def get_outstanding_clarifications(self) -> ClarificationListType:
"""Return all outstanding clarifications.

Note: This method is provided for compatibility but PlanRunV2 stores
clarifications differently than legacy PlanRun.

Returns:
ClarificationListType: A list of outstanding clarifications that have not been resolved.

"""
# TODO: Implement clarification handling for V2

Check failure on line 305 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (FIX002)

portia/plan_run.py:305:11: FIX002 Line contains TODO, consider resolving the issue

Check failure on line 305 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (TD003)

portia/plan_run.py:305:11: TD003 Missing issue link on the line following this TODO

Check failure on line 305 in portia/plan_run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (TD002)

portia/plan_run.py:305:11: TD002 Missing author in TODO; try: `# TODO(<author_name>): ...` or `# TODO @<author_name>: ...`
return []

def get_clarifications_for_step(self, step: int | None = None) -> ClarificationListType:
"""Return clarifications for the given step.

Args:
step: the step to get clarifications for. Defaults to current step.

Returns:
ClarificationListType: A list of clarifications for the given step.

"""
# TODO: Implement clarification handling for V2
return []

def get_clarification_for_step(
self, category: ClarificationCategory, step: int | None = None
) -> Clarification | None:
"""Return a clarification of the given category for the given step if it exists.

Args:
step: the step to get a clarification for. Defaults to current step.
category: the category of the clarification to get.

Returns:
Clarification | None: The clarification if found, None otherwise.

"""
# TODO: Implement clarification handling for V2
return None

def __str__(self) -> str:
"""Return the string representation of the PlanRunV2.

Returns:
str: A string representation containing key run attributes.

"""
return (
f"PlanRunV2(id={self.id}, plan_id={self.plan.id}, "
f"state={self.state}, current_step_index={self.current_step_index}, "
f"final_output={'set' if self.final_output else 'unset'})"
)
28 changes: 16 additions & 12 deletions portia/run_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from portia.end_user import EndUser
from portia.execution_hooks import ExecutionHooks
from portia.plan import Plan
from portia.plan_run import PlanRun
from portia.plan_run import PlanRun, PlanRunV2
from portia.storage import Storage
from portia.telemetry.telemetry_service import BaseProductTelemetry
from portia.tool import ToolRunContext
Expand All @@ -26,29 +26,33 @@ class StepOutputValue(BaseModel):


class RunContext(BaseModel):
"""Data that is returned from a step."""
"""Data that is returned from a step.

This context object provides access to the PlanRunV2 execution state along with
all necessary services and configuration for plan execution.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

plan: PlanV2 = Field(description="The Portia plan being executed.")
legacy_plan: Plan = Field(description="The legacy plan representation.")
plan_run: PlanRun = Field(description="The current plan run instance.")
end_user: EndUser = Field(description="The end user executing the plan.")
plan_run_v2: PlanRunV2 = Field(description="The V2-native plan run instance.")
step_output_values: list[StepOutputValue] = Field(
default_factory=list, description="Outputs set by the step."
)
config: Config = Field(description="The Portia config.")
storage: Storage = Field(description="The Portia storage.")
tool_registry: ToolRegistry = Field(description="The Portia tool registry.")
execution_hooks: ExecutionHooks = Field(description="The Portia execution hooks.")
telemetry: BaseProductTelemetry = Field(description="The Portia telemetry service.")

# Legacy fields for backward compatibility
plan: PlanV2 | None = Field(default=None, description="The Portia plan being executed.")
legacy_plan: Plan | None = Field(default=None, description="The legacy plan representation.")
plan_run: PlanRun | None = Field(default=None, description="The current plan run instance.")
end_user: EndUser | None = Field(default=None, description="The end user executing the plan.")
config: Config | None = Field(default=None, description="The Portia config.")

def get_tool_run_ctx(self) -> ToolRunContext:
"""Get the tool run context."""
return ToolRunContext(
end_user=self.end_user,
plan_run=self.plan_run,
plan=self.legacy_plan,
config=self.config,
clarifications=self.plan_run.get_clarifications_for_step(),
plan_run_v2=self.plan_run_v2,
clarifications=self.plan_run_v2.get_clarifications_for_step(),
)
Loading
Loading