diff --git a/portia/__init__.py b/portia/__init__.py index 4a2ce014..a1af8e21 100644 --- a/portia/__init__.py +++ b/portia/__init__.py @@ -91,7 +91,7 @@ # Plan and execution related classes from portia.plan import Plan, PlanBuilder, PlanContext, PlanInput, PlanUUID, Step, Variable -from portia.plan_run import PlanRun, PlanRunState +from portia.plan_run import PlanRun, PlanRunState, PlanRunV2 # Core classes from portia.portia import ExecutionHooks, Portia @@ -163,6 +163,7 @@ "PlanRun", "PlanRunNotFoundError", "PlanRunState", + "PlanRunV2", "PlanUUID", "PlanV2", "PlanningAgentType", diff --git a/portia/plan_run.py b/portia/plan_run.py index 016619cd..559e5a56 100644 --- a/portia/plan_run.py +++ b/portia/plan_run.py @@ -228,3 +228,197 @@ def from_plan_run(cls, plan_run: PlanRun) -> ReadOnlyPlanRun: plan_run_inputs=plan_run.plan_run_inputs, structured_output_schema=plan_run.structured_output_schema, ) + + +class PlanRunV2(BaseModel): + """A V2 plan run represents a running instance of a PlanV2. + + This is the successor to PlanRun and is designed to work specifically with PlanV2 + instances created via PlanBuilderV2. It provides improved performance, better + structure, and enhanced functionality while maintaining backward compatibility. + + Attributes: + id (PlanRunUUID): A unique ID for this plan_run. + plan_id (PlanUUID): The ID of the Plan this run uses. + current_step_index (int): The current step that is being executed. + state (PlanRunState): The current state of the PlanRun. + outputs (PlanRunOutputs): Outputs of the PlanRun including clarifications. + plan_run_inputs (dict[str, LocalDataValue]): Dict mapping plan input names to their values. + end_user_id (str): The id of the end user this plan was run for. + + """ + + model_config = ConfigDict(extra="forbid") + + id: PlanRunUUID = Field( + default_factory=PlanRunUUID, + description="A unique ID for this plan_run.", + ) + plan_id: PlanUUID = Field( + description="The ID of the Plan this run uses.", + ) + current_step_index: int = Field( + default=0, + description="The current step that is being executed", + ) + state: PlanRunState = Field( + default=PlanRunState.NOT_STARTED, + description="The current state of the PlanRun.", + ) + end_user_id: str = Field( + ..., + description="The id of the end user this plan was run for", + ) + outputs: PlanRunOutputs = Field( + default=PlanRunOutputs(), + description="Outputs of the run including clarifications.", + ) + plan_run_inputs: dict[str, LocalDataValue] = Field( + default_factory=dict, + description="Dict mapping plan input names to their values.", + ) + + structured_output_schema: type[BaseModel] | None = Field( + default=None, + exclude=True, + description="The optional structured output schema for the plan run.", + ) + + def get_outstanding_clarifications(self) -> ClarificationListType: + """Return all outstanding clarifications. + + Returns: + ClarificationListType: A list of outstanding clarifications that have not been resolved. + + """ + return [ + clarification + for clarification in self.outputs.clarifications + if not clarification.resolved + ] + + def get_clarifications_for_step(self, step: int | None = None) -> ClarificationListType: + """Return clarifications for the given step. + + Args: + step (int | None): the step to get clarifications for. Defaults to current step. + + Returns: + ClarificationListType: A list of clarifications for the given step. + + """ + if step is None: + step = self.current_step_index + return [ + clarification + for clarification in self.outputs.clarifications + if clarification.step == step + ] + + def get_clarification_for_step( + self, category: ClarificationCategory, step: int | None = None + ) -> Clarification | None: + """Return a clarification of the given category for the given step if it exists. + + Args: + step (int | None): the step to get a clarification for. Defaults to current step. + category (ClarificationCategory | None): the category of the clarification to get. + + """ + if step is None: + step = self.current_step_index + return next( + ( + clarification + for clarification in self.outputs.clarifications + if clarification.step == step and clarification.category == category + ), + None, + ) + + def get_potential_step_inputs(self) -> dict[str, Output]: + """Return a dictionary of potential step inputs for future steps.""" + return self.outputs.step_outputs | self.plan_run_inputs + + def __str__(self) -> str: + """Return the string representation of the PlanRunV2. + + Returns: + str: A string representation containing key run attributes. + + """ + return ( + f"RunV2(id={self.id}, plan_id={self.plan_id}, " + f"state={self.state}, current_step_index={self.current_step_index}, " + f"final_output={'set' if self.outputs.final_output else 'unset'})" + ) + + @classmethod + def from_plan_run(cls, plan_run: PlanRun) -> PlanRunV2: + """Create a PlanRunV2 from a legacy PlanRun. + + Args: + plan_run (PlanRun): The original run instance to convert. + + Returns: + PlanRunV2: A new PlanRunV2 instance with the same data. + """ + return cls( + id=plan_run.id, + plan_id=plan_run.plan_id, + current_step_index=plan_run.current_step_index, + outputs=plan_run.outputs, + state=plan_run.state, + end_user_id=plan_run.end_user_id, + plan_run_inputs=plan_run.plan_run_inputs, + structured_output_schema=plan_run.structured_output_schema, + ) + + def to_legacy_plan_run(self) -> PlanRun: + """Convert this PlanRunV2 to a legacy PlanRun for backward compatibility. + + Returns: + PlanRun: A legacy PlanRun instance with the same data. + """ + return PlanRun( + id=self.id, + plan_id=self.plan_id, + current_step_index=self.current_step_index, + outputs=self.outputs, + state=self.state, + end_user_id=self.end_user_id, + plan_run_inputs=self.plan_run_inputs, + structured_output_schema=self.structured_output_schema, + ) + + +class ReadOnlyPlanRunV2(PlanRunV2): + """A read-only copy of a Plan Run V2 passed to agents for reference. + + This class provides a non-modifiable view of a plan run instance, + ensuring that agents can access run details without altering them. + """ + + model_config = ConfigDict(frozen=True, extra="forbid") + + @classmethod + def from_plan_run_v2(cls, plan_run: PlanRunV2) -> ReadOnlyPlanRunV2: + """Create a read-only plan run from a normal PlanRunV2. + + Args: + plan_run (PlanRunV2): The original run instance to create a read-only copy from. + + Returns: + ReadOnlyPlanRunV2: A new read-only instance of the provided PlanRunV2. + + """ + return cls( + id=plan_run.id, + plan_id=plan_run.plan_id, + current_step_index=plan_run.current_step_index, + outputs=plan_run.outputs, + state=plan_run.state, + end_user_id=plan_run.end_user_id, + plan_run_inputs=plan_run.plan_run_inputs, + structured_output_schema=plan_run.structured_output_schema, + ) diff --git a/portia/portia.py b/portia/portia.py index 84196564..b09153a5 100644 --- a/portia/portia.py +++ b/portia/portia.py @@ -73,7 +73,14 @@ from portia.logger import logger, logger_manager, truncate_message from portia.open_source_tools.llm_tool import LLMTool from portia.plan import Plan, PlanContext, PlanInput, PlanUUID, ReadOnlyPlan, ReadOnlyStep, Step -from portia.plan_run import PlanRun, PlanRunState, PlanRunUUID, ReadOnlyPlanRun +from portia.plan_run import ( + PlanRun, + PlanRunState, + PlanRunUUID, + PlanRunV2, + ReadOnlyPlanRun, + ReadOnlyPlanRunV2, +) from portia.planning_agents.default_planning_agent import DefaultPlanningAgent from portia.run_context import RunContext, StepOutputValue from portia.storage import ( @@ -1040,10 +1047,10 @@ async def _aget_plan_run_from_plan( def resume( self, - plan_run: PlanRun | None = None, + plan_run: PlanRun | PlanRunV2 | None = None, plan_run_id: PlanRunUUID | str | None = None, plan: PlanV2 | None = None, - ) -> PlanRun: + ) -> PlanRun | PlanRunV2: """Resume a PlanRun. If a clarification handler was provided as part of the execution hooks, it will be used @@ -1081,16 +1088,19 @@ def resume( raise NotImplementedError( "We do not yet support retrieving plan runs by ID with PlanV2" ) + # Convert PlanRun to PlanRunV2 if needed + if isinstance(plan_run, PlanRun): + plan_run = PlanRunV2.from_plan_run(plan_run) with asyncio.Runner() as runner: return runner.run(self.resume_builder_plan(plan, plan_run=plan_run)) return self._resume(plan_run, plan_run_id) async def aresume( self, - plan_run: PlanRun | None = None, + plan_run: PlanRun | PlanRunV2 | None = None, plan_run_id: PlanRunUUID | str | None = None, plan: PlanV2 | None = None, - ) -> PlanRun: + ) -> PlanRun | PlanRunV2: """Resume a PlanRun. If a clarification handler was provided as part of the execution hooks, it will be used @@ -1129,6 +1139,9 @@ async def aresume( raise NotImplementedError( "We do not yet support retrieving plan runs by ID with PlanV2" ) + # Convert PlanRun to PlanRunV2 if needed + if isinstance(plan_run, PlanRun): + plan_run = PlanRunV2.from_plan_run(plan_run) return await self.resume_builder_plan(plan, plan_run=plan_run) return await self._aresume(plan_run, plan_run_id) @@ -1616,10 +1629,13 @@ def wait_for_ready( # noqa: C901 return plan_run - def _set_plan_run_state(self, plan_run: PlanRun, state: PlanRunState) -> None: + def _set_plan_run_state(self, plan_run: PlanRun | PlanRunV2, state: PlanRunState) -> None: """Set the state of a plan run and persist it to storage.""" plan_run.state = state - self.storage.save_plan_run(plan_run) + if isinstance(plan_run, PlanRunV2): + self.storage.save_plan_run(plan_run.to_legacy_plan_run()) + else: + self.storage.save_plan_run(plan_run) def create_plan_run( self, @@ -1709,6 +1725,103 @@ def _create_plan_run( self.storage.save_plan_run(plan_run) return plan_run + async def _create_plan_run_v2_from_plan_v2( + self, + plan: PlanV2, + end_user: str | EndUser | None = None, + plan_run_inputs: list[PlanInput] + | list[dict[str, Serializable]] + | dict[str, Serializable] + | None = None, + structured_output_schema: type[BaseModel] | None = None, + ) -> PlanRunV2: + """Create a PlanRunV2 from a PlanV2. + + Args: + plan (PlanV2): The plan to create a plan run from. + end_user (str | EndUser | None = None): The end user this plan run is for. + plan_run_inputs: The plan inputs for the plan run with their values. + structured_output_schema: Optional structured output schema. + + Returns: + PlanRunV2: The created PlanRunV2 object. + + """ + end_user = self.initialize_end_user(end_user) + coerced_plan_run_inputs = self._coerce_plan_run_inputs(plan_run_inputs) + + plan_run_v2 = PlanRunV2( + plan_id=plan.id, + state=PlanRunState.NOT_STARTED, + end_user_id=end_user.external_id, + structured_output_schema=structured_output_schema or plan.final_output_schema, + ) + + # Process plan inputs for V2 + await self._process_plan_input_values_v2(plan, plan_run_v2, coerced_plan_run_inputs) + + # Save the plan run + self.storage.save_plan_run(plan_run_v2.to_legacy_plan_run()) + return plan_run_v2 + + async def _process_plan_input_values_v2( + self, + plan: PlanV2, + plan_run: PlanRunV2, + plan_run_inputs: list[PlanInput] | None = None, + ) -> None: + """Process plan input values and add them to the plan run V2. + + Args: + plan (PlanV2): The plan containing required inputs. + plan_run (PlanRunV2): The plan run to update with input values. + plan_run_inputs (list[PlanInput] | None): Values for plan inputs. + + Raises: + ValueError: If required plan inputs are missing. + + """ + if plan.plan_inputs and not plan_run_inputs: + missing_inputs = [ + input_obj.name for input_obj in plan.plan_inputs if input_obj.value is None + ] + if missing_inputs: + raise ValueError(f"Missing required plan input values: {', '.join(missing_inputs)}") + + for plan_input in plan.plan_inputs: + if plan_input.value is not None: + plan_run.plan_run_inputs[plan_input.name] = LocalDataValue( + value=plan_input.value + ) + return + + if plan_run_inputs and not plan.plan_inputs: + logger().warning( + "Inputs are not required for this plan but plan inputs were provided", + ) + + if plan_run_inputs and plan.plan_inputs: + input_values_by_name = {input_obj.name: input_obj for input_obj in plan_run_inputs} + + # Validate all required inputs are provided or have default values + missing_inputs = [ + input_obj.name + for input_obj in plan.plan_inputs + if input_obj.name not in input_values_by_name and input_obj.value is None + ] + if missing_inputs: + raise ValueError(f"Missing required plan input values: {', '.join(missing_inputs)}") + + for plan_input in plan.plan_inputs: + if plan_input.name in input_values_by_name: + plan_run.plan_run_inputs[plan_input.name] = LocalDataValue( + value=input_values_by_name[plan_input.name].value + ) + elif plan_input.value is not None: + plan_run.plan_run_inputs[plan_input.name] = LocalDataValue( + value=plan_input.value + ) + async def _acreate_plan_run( self, plan: Plan, @@ -2109,7 +2222,7 @@ def _handle_new_clarifications( combined_clarifications = new_clarifications + ready_clarifications return combined_clarifications - def _log_execute_start(self, plan_run: PlanRun, plan: Plan) -> None: + def _log_execute_start(self, plan_run: PlanRun | PlanRunV2, plan: Plan) -> None: dashboard_url = self.config.must_get("portia_dashboard_url", str) dashboard_message = ( ( @@ -2195,7 +2308,7 @@ def _log_final_output(self, plan_run: PlanRun, plan: Plan) -> None: f"Final output: {truncate_message(summary)!s}", ) - def _get_last_executed_step_output(self, plan: Plan, plan_run: PlanRun) -> Output | None: + def _get_last_executed_step_output(self, plan: Plan, plan_run: PlanRun | PlanRunV2) -> Output | None: """Get the output of the last executed step. Args: @@ -2651,7 +2764,7 @@ async def run_builder_plan( | dict[str, Serializable] | None = None, structured_output_schema: type[BaseModel] | None = None, - ) -> PlanRun: + ) -> PlanRunV2: """Run a Portia plan.""" if structured_output_schema: if plan.final_output_schema: @@ -2666,24 +2779,25 @@ async def run_builder_plan( tool_ids=[tool.id for tool in self.tool_registry.get_tools()], ), ) - plan_run = await self._aget_plan_run_from_plan( - legacy_plan, end_user, plan_run_inputs, structured_output_schema + # Create PlanRunV2 directly instead of legacy PlanRun + plan_run_v2 = await self._create_plan_run_v2_from_plan_v2( + plan, end_user, plan_run_inputs, structured_output_schema ) - plan_run = await self.resume_builder_plan( - plan, plan_run, end_user=end_user, legacy_plan=legacy_plan + plan_run_v2 = await self.resume_builder_plan( + plan, plan_run_v2, end_user=end_user, legacy_plan=legacy_plan ) rt = get_current_run_tree() if rt: - rt.add_metadata({"plan_run_id": str(plan_run.id)}) - return plan_run + rt.add_metadata({"plan_run_id": str(plan_run_v2.id)}) + return plan_run_v2 async def resume_builder_plan( self, plan: PlanV2, - plan_run: PlanRun, + plan_run: PlanRunV2, end_user: EndUser | None = None, legacy_plan: Plan | None = None, - ) -> PlanRun: + ) -> PlanRunV2: """Resume a Portia plan.""" if not legacy_plan: legacy_plan = plan.to_legacy_plan( @@ -2695,7 +2809,8 @@ async def resume_builder_plan( if not end_user: end_user = self.storage.get_end_user(plan_run.end_user_id) - ready, plan_run = self._check_initial_readiness(legacy_plan, plan_run) + ready, legacy_plan_run = self._check_initial_readiness(legacy_plan, plan_run.to_legacy_plan_run()) + plan_run = PlanRunV2.from_plan_run(legacy_plan_run) if not ready: return plan_run @@ -2718,7 +2833,8 @@ async def resume_builder_plan( ]: plan_run = await self._execute_builder_plan(plan, run_data) - plan_run = self._handle_clarifications(plan_run) + legacy_plan_run = self._handle_clarifications(plan_run.to_legacy_plan_run()) + plan_run = PlanRunV2.from_plan_run(legacy_plan_run) if len(plan_run.get_outstanding_clarifications()) > 0: return plan_run @@ -2728,7 +2844,7 @@ async def resume_builder_plan( return plan_run - async def _execute_builder_plan(self, plan: PlanV2, run_data: RunContext) -> PlanRun: # noqa: C901, PLR0912 + async def _execute_builder_plan(self, plan: PlanV2, run_data: RunContext) -> PlanRunV2: # noqa: C901, PLR0912 """Execute a Portia plan.""" self._set_plan_run_state(run_data.plan_run, PlanRunState.IN_PROGRESS) self._log_execute_start(run_data.plan_run, run_data.legacy_plan) diff --git a/portia/run_context.py b/portia/run_context.py index f8881ea1..0ca6e3e9 100644 --- a/portia/run_context.py +++ b/portia/run_context.py @@ -9,7 +9,7 @@ from portia.end_user import EndUser from portia.execution_hooks import ExecutionHooks from portia.plan import Plan -from portia.plan_run import PlanRun +from portia.plan_run import PlanRun, PlanRunV2 from portia.storage import Storage from portia.telemetry.telemetry_service import BaseProductTelemetry from portia.tool import ToolRunContext @@ -32,7 +32,7 @@ class RunContext(BaseModel): plan: PlanV2 = Field(description="The Portia plan being executed.") legacy_plan: Plan = Field(description="The legacy plan representation.") - plan_run: PlanRun = Field(description="The current plan run instance.") + plan_run: PlanRunV2 = Field(description="The current plan run instance.") end_user: EndUser = Field(description="The end user executing the plan.") step_output_values: list[StepOutputValue] = Field( default_factory=list, description="Outputs set by the step." @@ -47,7 +47,7 @@ def get_tool_run_ctx(self) -> ToolRunContext: """Get the tool run context.""" return ToolRunContext( end_user=self.end_user, - plan_run=self.plan_run, + plan_run=self.plan_run.to_legacy_plan_run(), plan=self.legacy_plan, config=self.config, clarifications=self.plan_run.get_clarifications_for_step(),