diff --git a/portia/plan_run.py b/portia/plan_run.py index 016619cd..e09015a5 100644 --- a/portia/plan_run.py +++ b/portia/plan_run.py @@ -28,6 +28,14 @@ from portia.execution_agents.output import LocalDataValue, Output from portia.prefixed_uuid import PlanRunUUID, PlanUUID +# Import TYPE_CHECKING to avoid circular imports +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from portia.builder.plan_v2 import PlanV2 + from portia.config import Config + from portia.end_user import EndUser + class PlanRunState(PortiaEnum): """The current state of the Plan Run. @@ -198,6 +206,135 @@ def __str__(self) -> str: ) +class PlanRunV2(BaseModel): + """Consolidated plan run data structure for the V2 migration. + + This class consolidates plan execution data into a single structure, + containing all fields needed for plan execution and management. + + Attributes: + id (PlanRunUUID): A unique ID for this plan run. + state (PlanRunState): The current state of the PlanRun. + current_step_index (int): The current step that is being executed. + plan (PlanV2 | None): The PlanV2 instance being executed (V2 only). + end_user (EndUser): The end user executing the plan. + step_output_values (list[LocalDataValue]): Step output values from execution. + final_output (Output | None): The final consolidated output. + plan_run_inputs (dict[str, LocalDataValue]): Plan input values. + config (Config): The Portia configuration. + """ + + model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + id: PlanRunUUID = Field( + default_factory=PlanRunUUID, + description="A unique ID for this plan run.", + ) + state: PlanRunState = Field( + default=PlanRunState.NOT_STARTED, + description="The current state of the PlanRun.", + ) + current_step_index: int = Field( + default=0, + description="The current step that is being executed", + ) + plan: "PlanV2 | None" = Field( + default=None, + description="The PlanV2 instance being executed (V2 only).", + ) + end_user: "EndUser" = Field( + description="The end user executing the plan." + ) + step_output_values: list["LocalDataValue"] = Field( + default_factory=list, + description="Step output values from execution.", + ) + final_output: Output | None = Field( + default=None, + description="The final consolidated output of the PlanRun if available.", + ) + plan_run_inputs: dict[str, "LocalDataValue"] = Field( + default_factory=dict, + description="Dict mapping plan input names to their values.", + ) + config: "Config" = Field( + description="The Portia configuration." + ) + + # Legacy compatibility fields for migration + _legacy_outputs: PlanRunOutputs = Field( + default=PlanRunOutputs(), + description="Legacy outputs structure for compatibility.", + exclude=True, + ) + + def get_outstanding_clarifications(self) -> ClarificationListType: + """Return all outstanding clarifications. + + Returns: + ClarificationListType: A list of outstanding clarifications that have not been resolved. + """ + return [ + clarification + for clarification in self._legacy_outputs.clarifications + if not clarification.resolved + ] + + def get_clarifications_for_step(self, step: int | None = None) -> ClarificationListType: + """Return clarifications for the given step. + + Args: + step (int | None): the step to get clarifications for. Defaults to current step. + + Returns: + ClarificationListType: A list of clarifications for the given step. + """ + if step is None: + step = self.current_step_index + return [ + clarification + for clarification in self._legacy_outputs.clarifications + if clarification.step == step + ] + + def get_clarification_for_step( + self, category: ClarificationCategory, step: int | None = None + ) -> Clarification | None: + """Return a clarification of the given category for the given step if it exists. + + Args: + step (int | None): the step to get a clarification for. Defaults to current step. + category (ClarificationCategory | None): the category of the clarification to get. + """ + if step is None: + step = self.current_step_index + return next( + ( + clarification + for clarification in self._legacy_outputs.clarifications + if clarification.step == step and clarification.category == category + ), + None, + ) + + def get_potential_step_inputs(self) -> dict[str, Output]: + """Return a dictionary of potential step inputs for future steps.""" + return self._legacy_outputs.step_outputs | self.plan_run_inputs + + def __str__(self) -> str: + """Return the string representation of the PlanRunV2. + + Returns: + str: A string representation containing key run attributes. + """ + plan_id = self.plan.id if self.plan else "unknown" + return ( + f"PlanRunV2(id={self.id}, plan_id={plan_id}, " + f"state={self.state}, current_step_index={self.current_step_index}, " + f"final_output={'set' if self.final_output else 'unset'})" + ) + + class ReadOnlyPlanRun(PlanRun): """A read-only copy of a Plan Run passed to agents for reference. @@ -228,3 +365,63 @@ def from_plan_run(cls, plan_run: PlanRun) -> ReadOnlyPlanRun: plan_run_inputs=plan_run.plan_run_inputs, structured_output_schema=plan_run.structured_output_schema, ) + + +# Migration helpers for transitioning between old and new structures +def migrate_plan_run_to_v2( + legacy_plan_run: PlanRun, + plan_v2: "PlanV2 | None", + end_user: "EndUser", + config: "Config" +) -> "PlanRunV2": + """Convert a legacy PlanRun to PlanRunV2. + + Args: + legacy_plan_run: The legacy PlanRun instance to convert. + plan_v2: The PlanV2 instance if available. + end_user: The EndUser instance. + config: The Config instance. + + Returns: + PlanRunV2: The converted PlanRunV2 instance. + """ + # Convert legacy step outputs to step_output_values + step_output_values = [] + for output in legacy_plan_run.outputs.step_outputs.values(): + if hasattr(output, 'value'): + step_output_values.append(LocalDataValue(value=output.value)) + + return PlanRunV2( + id=legacy_plan_run.id, + state=legacy_plan_run.state, + current_step_index=legacy_plan_run.current_step_index, + plan=plan_v2, + end_user=end_user, + step_output_values=step_output_values, + final_output=legacy_plan_run.outputs.final_output, + plan_run_inputs=legacy_plan_run.plan_run_inputs, + config=config, + _legacy_outputs=legacy_plan_run.outputs, + ) + + +def migrate_v2_to_plan_run(plan_run_v2: "PlanRunV2") -> PlanRun: + """Convert a PlanRunV2 back to legacy PlanRun for compatibility. + + Args: + plan_run_v2: The PlanRunV2 instance to convert. + + Returns: + PlanRun: The converted legacy PlanRun instance. + """ + plan_id = plan_run_v2.plan.id if plan_run_v2.plan else "unknown" + + return PlanRun( + id=plan_run_v2.id, + plan_id=plan_id, + current_step_index=plan_run_v2.current_step_index, + state=plan_run_v2.state, + end_user_id=plan_run_v2.end_user.external_id, + outputs=plan_run_v2._legacy_outputs, + plan_run_inputs=plan_run_v2.plan_run_inputs, + ) diff --git a/portia/run_context.py b/portia/run_context.py index f8881ea1..914a211a 100644 --- a/portia/run_context.py +++ b/portia/run_context.py @@ -9,7 +9,7 @@ from portia.end_user import EndUser from portia.execution_hooks import ExecutionHooks from portia.plan import Plan -from portia.plan_run import PlanRun +from portia.plan_run import PlanRun, PlanRunV2 from portia.storage import Storage from portia.telemetry.telemetry_service import BaseProductTelemetry from portia.tool import ToolRunContext @@ -52,3 +52,116 @@ def get_tool_run_ctx(self) -> ToolRunContext: config=self.config, clarifications=self.plan_run.get_clarifications_for_step(), ) + + +class RunContextV2(BaseModel): + """Updated context for PlanV2 runs using consolidated data structures. + + This is the V2 version of RunContext that uses the new consolidated + PlanRunV2 structure instead of separate plan/legacy_plan/plan_run fields. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + plan_run: PlanRunV2 = Field(description="The consolidated plan run instance.") + storage: Storage = Field(description="The Portia storage.") + tool_registry: ToolRegistry = Field(description="The Portia tool registry.") + execution_hooks: ExecutionHooks = Field(description="The Portia execution hooks.") + telemetry: BaseProductTelemetry = Field(description="The Portia telemetry service.") + + def get_tool_run_ctx(self) -> ToolRunContext: + """Get the tool run context for V2.""" + # Create a legacy plan if needed for backwards compatibility + legacy_plan = None + if self.plan_run.plan: + from portia.plan import PlanContext + # Create a minimal plan context for legacy compatibility + plan_context = PlanContext(query="", tool_registry=self.tool_registry) + legacy_plan = self.plan_run.plan.to_legacy_plan(plan_context) + + # Create a legacy PlanRun for ToolRunContext compatibility + legacy_plan_run = PlanRun( + id=self.plan_run.id, + plan_id=self.plan_run.plan.id if self.plan_run.plan else "unknown", + current_step_index=self.plan_run.current_step_index, + state=self.plan_run.state, + end_user_id=self.plan_run.end_user.external_id, + outputs=self.plan_run._legacy_outputs, + plan_run_inputs=self.plan_run.plan_run_inputs, + ) + + return ToolRunContext( + end_user=self.plan_run.end_user, + plan_run=legacy_plan_run, + plan=legacy_plan, + config=self.plan_run.config, + clarifications=self.plan_run.get_clarifications_for_step(), + ) + + +# Migration helpers for RunContext structures +def migrate_run_context_to_v2( + legacy_context: RunContext, +) -> RunContextV2: + """Convert a legacy RunContext to RunContextV2. + + Args: + legacy_context: The legacy RunContext instance to convert. + + Returns: + RunContextV2: The converted RunContextV2 instance. + """ + from portia.plan_run import migrate_plan_run_to_v2 + + # Convert RunContext to RunContextV2 by migrating the PlanRun + plan_run_v2 = migrate_plan_run_to_v2( + legacy_plan_run=legacy_context.plan_run, + plan_v2=legacy_context.plan, + end_user=legacy_context.end_user, + config=legacy_context.config, + ) + + return RunContextV2( + plan_run=plan_run_v2, + storage=legacy_context.storage, + tool_registry=legacy_context.tool_registry, + execution_hooks=legacy_context.execution_hooks, + telemetry=legacy_context.telemetry, + ) + + +def migrate_v2_to_run_context( + context_v2: RunContextV2, +) -> RunContext: + """Convert a RunContextV2 back to legacy RunContext for compatibility. + + Args: + context_v2: The RunContextV2 instance to convert. + + Returns: + RunContext: The converted legacy RunContext instance. + """ + from portia.plan_run import migrate_v2_to_plan_run + + # Convert back to legacy structures + legacy_plan_run = migrate_v2_to_plan_run(context_v2.plan_run) + + # Create legacy plan if available + legacy_plan = None + if context_v2.plan_run.plan: + from portia.plan import PlanContext + plan_context = PlanContext(query="", tool_registry=context_v2.tool_registry) + legacy_plan = context_v2.plan_run.plan.to_legacy_plan(plan_context) + + return RunContext( + plan=context_v2.plan_run.plan if context_v2.plan_run.plan else None, + legacy_plan=legacy_plan, + plan_run=legacy_plan_run, + end_user=context_v2.plan_run.end_user, + step_output_values=context_v2.plan_run.step_output_values, + config=context_v2.plan_run.config, + storage=context_v2.storage, + tool_registry=context_v2.tool_registry, + execution_hooks=context_v2.execution_hooks, + telemetry=context_v2.telemetry, + ) diff --git a/tests/unit/test_plan_run.py b/tests/unit/test_plan_run.py index 39035164..37323213 100644 --- a/tests/unit/test_plan_run.py +++ b/tests/unit/test_plan_run.py @@ -9,7 +9,15 @@ from portia.errors import ToolHardError, ToolSoftError from portia.execution_agents.output import LocalDataValue from portia.plan import PlanUUID, ReadOnlyStep, Step -from portia.plan_run import PlanRun, PlanRunOutputs, PlanRunState, ReadOnlyPlanRun +from portia.plan_run import ( + PlanRun, + PlanRunOutputs, + PlanRunState, + PlanRunV2, + ReadOnlyPlanRun, + migrate_plan_run_to_v2, + migrate_v2_to_plan_run, +) from portia.prefixed_uuid import PlanRunUUID @@ -190,3 +198,229 @@ def test_get_clarification_for_step_without_matching_clarification(plan_run: Pla # Try to get clarification for step 1 result = plan_run.get_clarification_for_step(ClarificationCategory.INPUT) assert result is None + + +# PlanRunV2 Tests +@pytest.fixture +def mock_end_user(): + """Create a mock EndUser for testing.""" + from portia.end_user import EndUser + return EndUser(external_id="test_user_123", name="Test User", email="test@example.com") + + +@pytest.fixture +def mock_config(): + """Create a mock Config for testing.""" + from portia.config import Config + return Config( + llm_provider=None, + storage_class="MEMORY", + ) + + +@pytest.fixture +def mock_plan_v2(): + """Create a mock PlanV2 for testing.""" + from portia.builder.plan_v2 import PlanV2 + from portia.builder.step_v2 import StepV2 + from portia.prefixed_uuid import PlanUUID + + return PlanV2( + id=PlanUUID(), + steps=[ + StepV2(step_name="test_step", task="Test task"), + ], + label="Test Plan V2" + ) + + +@pytest.fixture +def plan_run_v2(mock_end_user, mock_config, mock_plan_v2) -> PlanRunV2: + """Create PlanRunV2 instance for testing.""" + return PlanRunV2( + end_user=mock_end_user, + config=mock_config, + plan=mock_plan_v2, + current_step_index=1, + state=PlanRunState.IN_PROGRESS, + step_output_values=[LocalDataValue(value="test_output")], + plan_run_inputs={"$input1": LocalDataValue(value="test_input")}, + ) + + +def test_plan_run_v2_initialization(mock_end_user, mock_config, mock_plan_v2) -> None: + """Test initialization of PlanRunV2 instance.""" + plan_run_inputs = {"$input1": LocalDataValue(value="test_input_value")} + plan_run_v2 = PlanRunV2( + end_user=mock_end_user, + config=mock_config, + plan=mock_plan_v2, + plan_run_inputs=plan_run_inputs, + ) + + assert plan_run_v2.id is not None + assert plan_run_v2.plan == mock_plan_v2 + assert plan_run_v2.end_user == mock_end_user + assert plan_run_v2.config == mock_config + assert isinstance(plan_run_v2.plan.id.uuid, UUID) + assert plan_run_v2.current_step_index == 0 + assert plan_run_v2.state == PlanRunState.NOT_STARTED + assert plan_run_v2.step_output_values == [] + assert plan_run_v2.final_output is None + assert len(plan_run_v2.plan_run_inputs) == 1 + assert plan_run_v2.plan_run_inputs["$input1"].get_value() == "test_input_value" + + +def test_plan_run_v2_str_representation(plan_run_v2: PlanRunV2) -> None: + """Test string representation of PlanRunV2.""" + result = str(plan_run_v2) + expected = ( + f"PlanRunV2(id={plan_run_v2.id}, plan_id={plan_run_v2.plan.id}, " + f"state={plan_run_v2.state}, current_step_index={plan_run_v2.current_step_index}, " + f"final_output=unset)" + ) + assert result == expected + + +def test_plan_run_v2_str_with_final_output(plan_run_v2: PlanRunV2) -> None: + """Test string representation of PlanRunV2 with final output set.""" + plan_run_v2.final_output = LocalDataValue(value="final result") + result = str(plan_run_v2) + expected = ( + f"PlanRunV2(id={plan_run_v2.id}, plan_id={plan_run_v2.plan.id}, " + f"state={plan_run_v2.state}, current_step_index={plan_run_v2.current_step_index}, " + f"final_output=set)" + ) + assert result == expected + + +def test_plan_run_v2_str_without_plan(mock_end_user, mock_config) -> None: + """Test string representation of PlanRunV2 without plan.""" + plan_run_v2 = PlanRunV2( + end_user=mock_end_user, + config=mock_config, + plan=None, + ) + result = str(plan_run_v2) + expected = ( + f"PlanRunV2(id={plan_run_v2.id}, plan_id=unknown, " + f"state={plan_run_v2.state}, current_step_index={plan_run_v2.current_step_index}, " + f"final_output=unset)" + ) + assert result == expected + + +def test_plan_run_v2_clarifications(plan_run_v2: PlanRunV2) -> None: + """Test clarification methods in PlanRunV2.""" + # Add a clarification to the legacy outputs + clarification = InputClarification( + plan_run_id=plan_run_v2.id, + step=1, + argument_name="test_arg", + user_guidance="test guidance", + resolved=False, + source="Test plan run", + ) + plan_run_v2._legacy_outputs.clarifications = [clarification] + + # Test get_outstanding_clarifications + outstanding = plan_run_v2.get_outstanding_clarifications() + assert len(outstanding) == 1 + assert outstanding[0] == clarification + + # Test get_clarifications_for_step + step_clarifications = plan_run_v2.get_clarifications_for_step(1) + assert len(step_clarifications) == 1 + assert step_clarifications[0] == clarification + + # Test get_clarification_for_step + result = plan_run_v2.get_clarification_for_step(ClarificationCategory.INPUT, 1) + assert result == clarification + + +def test_plan_run_v2_potential_step_inputs(plan_run_v2: PlanRunV2) -> None: + """Test get_potential_step_inputs method in PlanRunV2.""" + # Add step outputs to legacy outputs + step_output = LocalDataValue(value="step_output_value") + plan_run_v2._legacy_outputs.step_outputs = {"step1": step_output} + + potential_inputs = plan_run_v2.get_potential_step_inputs() + + # Should contain both step outputs and plan run inputs + assert "step1" in potential_inputs + assert "$input1" in potential_inputs + assert potential_inputs["step1"] == step_output + assert potential_inputs["$input1"] == plan_run_v2.plan_run_inputs["$input1"] + + +# Migration Tests +def test_migrate_plan_run_to_v2(plan_run: PlanRun, mock_end_user, mock_config, mock_plan_v2) -> None: + """Test migration from legacy PlanRun to PlanRunV2.""" + plan_run_v2 = migrate_plan_run_to_v2( + legacy_plan_run=plan_run, + plan_v2=mock_plan_v2, + end_user=mock_end_user, + config=mock_config, + ) + + # Check all fields are migrated correctly + assert plan_run_v2.id == plan_run.id + assert plan_run_v2.state == plan_run.state + assert plan_run_v2.current_step_index == plan_run.current_step_index + assert plan_run_v2.plan == mock_plan_v2 + assert plan_run_v2.end_user == mock_end_user + assert plan_run_v2.config == mock_config + assert plan_run_v2.final_output == plan_run.outputs.final_output + assert plan_run_v2.plan_run_inputs == plan_run.plan_run_inputs + assert plan_run_v2._legacy_outputs == plan_run.outputs + + # Check step outputs are converted + assert len(plan_run_v2.step_output_values) == len(plan_run.outputs.step_outputs) + + +def test_migrate_v2_to_plan_run(plan_run_v2: PlanRunV2) -> None: + """Test migration from PlanRunV2 back to legacy PlanRun.""" + legacy_plan_run = migrate_v2_to_plan_run(plan_run_v2) + + # Check all fields are migrated correctly + assert legacy_plan_run.id == plan_run_v2.id + assert legacy_plan_run.plan_id == plan_run_v2.plan.id + assert legacy_plan_run.current_step_index == plan_run_v2.current_step_index + assert legacy_plan_run.state == plan_run_v2.state + assert legacy_plan_run.end_user_id == plan_run_v2.end_user.external_id + assert legacy_plan_run.outputs == plan_run_v2._legacy_outputs + assert legacy_plan_run.plan_run_inputs == plan_run_v2.plan_run_inputs + + +def test_migrate_v2_to_plan_run_without_plan(mock_end_user, mock_config) -> None: + """Test migration from PlanRunV2 to legacy PlanRun when plan is None.""" + plan_run_v2 = PlanRunV2( + end_user=mock_end_user, + config=mock_config, + plan=None, + ) + + legacy_plan_run = migrate_v2_to_plan_run(plan_run_v2) + + assert legacy_plan_run.plan_id == "unknown" + assert legacy_plan_run.end_user_id == mock_end_user.external_id + + +def test_migration_roundtrip(plan_run: PlanRun, mock_end_user, mock_config, mock_plan_v2) -> None: + """Test that migration works correctly in both directions.""" + # Migrate to V2 and back + plan_run_v2 = migrate_plan_run_to_v2( + legacy_plan_run=plan_run, + plan_v2=mock_plan_v2, + end_user=mock_end_user, + config=mock_config, + ) + migrated_back = migrate_v2_to_plan_run(plan_run_v2) + + # Check that key fields are preserved after roundtrip + assert migrated_back.id == plan_run.id + assert migrated_back.current_step_index == plan_run.current_step_index + assert migrated_back.state == plan_run.state + assert migrated_back.outputs.clarifications == plan_run.outputs.clarifications + assert migrated_back.outputs.step_outputs == plan_run.outputs.step_outputs + assert migrated_back.plan_run_inputs == plan_run.plan_run_inputs diff --git a/tests/unit/test_run_context.py b/tests/unit/test_run_context.py new file mode 100644 index 00000000..0b786748 --- /dev/null +++ b/tests/unit/test_run_context.py @@ -0,0 +1,347 @@ +"""Tests for RunContext primitives and migrations.""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from portia.builder.plan_v2 import PlanV2 +from portia.builder.step_v2 import StepV2 +from portia.clarification import InputClarification +from portia.config import Config +from portia.end_user import EndUser +from portia.execution_agents.output import LocalDataValue +from portia.execution_hooks import ExecutionHooks +from portia.plan import Plan, PlanContext +from portia.plan_run import PlanRun, PlanRunOutputs, PlanRunState, PlanRunV2 +from portia.prefixed_uuid import PlanRunUUID, PlanUUID +from portia.run_context import ( + RunContext, + RunContextV2, + StepOutputValue, + migrate_run_context_to_v2, + migrate_v2_to_run_context, +) +from portia.storage import Storage +from portia.telemetry.telemetry_service import BaseProductTelemetry +from portia.tool_registry import ToolRegistry + + +@pytest.fixture +def mock_end_user() -> EndUser: + """Create a mock EndUser for testing.""" + return EndUser(external_id="test_user_123", name="Test User", email="test@example.com") + + +@pytest.fixture +def mock_config() -> Config: + """Create a mock Config for testing.""" + return Config( + llm_provider=None, + storage_class="MEMORY", + ) + + +@pytest.fixture +def mock_plan_v2() -> PlanV2: + """Create a mock PlanV2 for testing.""" + return PlanV2( + id=PlanUUID(), + steps=[ + StepV2(step_name="test_step", task="Test task"), + ], + label="Test Plan V2" + ) + + +@pytest.fixture +def mock_legacy_plan() -> Plan: + """Create a mock legacy Plan for testing.""" + from portia.plan import Step + + return Plan( + id=PlanUUID(), + plan_context=PlanContext(query="Test query"), + steps=[ + Step(task="Test task", output="$output"), + ], + ) + + +@pytest.fixture +def mock_storage() -> Storage: + """Create a mock Storage for testing.""" + from portia.storage import InMemoryStorage + return InMemoryStorage() + + +@pytest.fixture +def mock_tool_registry() -> ToolRegistry: + """Create a mock ToolRegistry for testing.""" + return ToolRegistry() + + +@pytest.fixture +def mock_execution_hooks() -> ExecutionHooks: + """Create a mock ExecutionHooks for testing.""" + return ExecutionHooks() + + +@pytest.fixture +def mock_telemetry() -> BaseProductTelemetry: + """Create a mock BaseProductTelemetry for testing.""" + from portia.telemetry.telemetry_service import NoOpTelemetry + return NoOpTelemetry() + + +@pytest.fixture +def mock_plan_run(mock_legacy_plan, mock_end_user) -> PlanRun: + """Create a mock PlanRun for testing.""" + return PlanRun( + plan_id=mock_legacy_plan.id, + end_user_id=mock_end_user.external_id, + current_step_index=1, + state=PlanRunState.IN_PROGRESS, + outputs=PlanRunOutputs( + step_outputs={"step1": LocalDataValue(value="test_output")}, + ), + plan_run_inputs={"$input1": LocalDataValue(value="test_input")}, + ) + + +@pytest.fixture +def legacy_run_context( + mock_plan_v2, + mock_legacy_plan, + mock_plan_run, + mock_end_user, + mock_config, + mock_storage, + mock_tool_registry, + mock_execution_hooks, + mock_telemetry, +) -> RunContext: + """Create a legacy RunContext for testing.""" + return RunContext( + plan=mock_plan_v2, + legacy_plan=mock_legacy_plan, + plan_run=mock_plan_run, + end_user=mock_end_user, + step_output_values=[StepOutputValue(value="test", description="test", step_name="test", step_num=1)], + config=mock_config, + storage=mock_storage, + tool_registry=mock_tool_registry, + execution_hooks=mock_execution_hooks, + telemetry=mock_telemetry, + ) + + +@pytest.fixture +def plan_run_v2(mock_end_user, mock_config, mock_plan_v2) -> PlanRunV2: + """Create PlanRunV2 instance for testing.""" + return PlanRunV2( + end_user=mock_end_user, + config=mock_config, + plan=mock_plan_v2, + current_step_index=1, + state=PlanRunState.IN_PROGRESS, + step_output_values=[LocalDataValue(value="test_output")], + plan_run_inputs={"$input1": LocalDataValue(value="test_input")}, + ) + + +@pytest.fixture +def run_context_v2( + plan_run_v2, + mock_storage, + mock_tool_registry, + mock_execution_hooks, + mock_telemetry, +) -> RunContextV2: + """Create RunContextV2 instance for testing.""" + return RunContextV2( + plan_run=plan_run_v2, + storage=mock_storage, + tool_registry=mock_tool_registry, + execution_hooks=mock_execution_hooks, + telemetry=mock_telemetry, + ) + + +def test_step_output_value_creation(): + """Test creation of StepOutputValue.""" + step_output = StepOutputValue( + value="test_value", + description="test description", + step_name="test_step", + step_num=1 + ) + + assert step_output.value == "test_value" + assert step_output.description == "test description" + assert step_output.step_name == "test_step" + assert step_output.step_num == 1 + + +def test_legacy_run_context_initialization(legacy_run_context: RunContext): + """Test initialization of legacy RunContext.""" + assert legacy_run_context.plan is not None + assert legacy_run_context.legacy_plan is not None + assert legacy_run_context.plan_run is not None + assert legacy_run_context.end_user is not None + assert legacy_run_context.config is not None + assert legacy_run_context.storage is not None + assert legacy_run_context.tool_registry is not None + assert legacy_run_context.execution_hooks is not None + assert legacy_run_context.telemetry is not None + assert len(legacy_run_context.step_output_values) == 1 + + +def test_legacy_run_context_get_tool_run_ctx(legacy_run_context: RunContext): + """Test get_tool_run_ctx method of legacy RunContext.""" + tool_run_ctx = legacy_run_context.get_tool_run_ctx() + + assert tool_run_ctx.end_user == legacy_run_context.end_user + assert tool_run_ctx.plan_run == legacy_run_context.plan_run + assert tool_run_ctx.plan == legacy_run_context.legacy_plan + assert tool_run_ctx.config == legacy_run_context.config + + +def test_run_context_v2_initialization(run_context_v2: RunContextV2): + """Test initialization of RunContextV2.""" + assert run_context_v2.plan_run is not None + assert run_context_v2.storage is not None + assert run_context_v2.tool_registry is not None + assert run_context_v2.execution_hooks is not None + assert run_context_v2.telemetry is not None + + +def test_run_context_v2_get_tool_run_ctx(run_context_v2: RunContextV2): + """Test get_tool_run_ctx method of RunContextV2.""" + tool_run_ctx = run_context_v2.get_tool_run_ctx() + + assert tool_run_ctx.end_user == run_context_v2.plan_run.end_user + assert tool_run_ctx.config == run_context_v2.plan_run.config + # The plan_run should be a converted legacy plan run + assert tool_run_ctx.plan_run.id == run_context_v2.plan_run.id + assert tool_run_ctx.plan_run.current_step_index == run_context_v2.plan_run.current_step_index + + +def test_run_context_v2_get_tool_run_ctx_without_plan( + mock_end_user, + mock_config, + mock_storage, + mock_tool_registry, + mock_execution_hooks, + mock_telemetry, +): + """Test get_tool_run_ctx method when PlanRunV2 has no plan.""" + plan_run_v2 = PlanRunV2( + end_user=mock_end_user, + config=mock_config, + plan=None, + ) + + run_context_v2 = RunContextV2( + plan_run=plan_run_v2, + storage=mock_storage, + tool_registry=mock_tool_registry, + execution_hooks=mock_execution_hooks, + telemetry=mock_telemetry, + ) + + tool_run_ctx = run_context_v2.get_tool_run_ctx() + + assert tool_run_ctx.end_user == mock_end_user + assert tool_run_ctx.config == mock_config + assert tool_run_ctx.plan is None + assert tool_run_ctx.plan_run.plan_id == "unknown" + + +# Migration Tests +def test_migrate_run_context_to_v2(legacy_run_context: RunContext): + """Test migration from legacy RunContext to RunContextV2.""" + context_v2 = migrate_run_context_to_v2(legacy_run_context) + + # Check that all fields are migrated correctly + assert context_v2.storage == legacy_run_context.storage + assert context_v2.tool_registry == legacy_run_context.tool_registry + assert context_v2.execution_hooks == legacy_run_context.execution_hooks + assert context_v2.telemetry == legacy_run_context.telemetry + + # Check that PlanRunV2 is created with correct data + assert context_v2.plan_run.id == legacy_run_context.plan_run.id + assert context_v2.plan_run.state == legacy_run_context.plan_run.state + assert context_v2.plan_run.current_step_index == legacy_run_context.plan_run.current_step_index + assert context_v2.plan_run.plan == legacy_run_context.plan + assert context_v2.plan_run.end_user == legacy_run_context.end_user + assert context_v2.plan_run.config == legacy_run_context.config + + +def test_migrate_v2_to_run_context(run_context_v2: RunContextV2): + """Test migration from RunContextV2 back to legacy RunContext.""" + legacy_context = migrate_v2_to_run_context(run_context_v2) + + # Check that all fields are migrated correctly + assert legacy_context.storage == run_context_v2.storage + assert legacy_context.tool_registry == run_context_v2.tool_registry + assert legacy_context.execution_hooks == run_context_v2.execution_hooks + assert legacy_context.telemetry == run_context_v2.telemetry + + # Check that legacy structures are recreated correctly + assert legacy_context.plan_run.id == run_context_v2.plan_run.id + assert legacy_context.plan_run.current_step_index == run_context_v2.plan_run.current_step_index + assert legacy_context.plan_run.state == run_context_v2.plan_run.state + assert legacy_context.plan == run_context_v2.plan_run.plan + assert legacy_context.end_user == run_context_v2.plan_run.end_user + assert legacy_context.config == run_context_v2.plan_run.config + assert legacy_context.step_output_values == run_context_v2.plan_run.step_output_values + + +def test_run_context_migration_roundtrip(legacy_run_context: RunContext): + """Test that RunContext migration works correctly in both directions.""" + # Migrate to V2 and back + context_v2 = migrate_run_context_to_v2(legacy_run_context) + migrated_back = migrate_v2_to_run_context(context_v2) + + # Check that key fields are preserved after roundtrip + assert migrated_back.plan == legacy_run_context.plan + assert migrated_back.end_user == legacy_run_context.end_user + assert migrated_back.config == legacy_run_context.config + assert migrated_back.storage == legacy_run_context.storage + assert migrated_back.tool_registry == legacy_run_context.tool_registry + assert migrated_back.execution_hooks == legacy_run_context.execution_hooks + assert migrated_back.telemetry == legacy_run_context.telemetry + + # Check plan run fields + assert migrated_back.plan_run.id == legacy_run_context.plan_run.id + assert migrated_back.plan_run.current_step_index == legacy_run_context.plan_run.current_step_index + assert migrated_back.plan_run.state == legacy_run_context.plan_run.state + assert migrated_back.plan_run.plan_run_inputs == legacy_run_context.plan_run.plan_run_inputs + + +def test_run_context_migration_with_clarifications(legacy_run_context: RunContext): + """Test RunContext migration preserves clarifications.""" + # Add a clarification to the legacy run context + clarification = InputClarification( + plan_run_id=legacy_run_context.plan_run.id, + step=1, + argument_name="test_arg", + user_guidance="test guidance", + resolved=False, + source="Test plan run", + ) + legacy_run_context.plan_run.outputs.clarifications = [clarification] + + # Migrate to V2 and back + context_v2 = migrate_run_context_to_v2(legacy_run_context) + migrated_back = migrate_v2_to_run_context(context_v2) + + # Check that clarifications are preserved + assert len(migrated_back.plan_run.outputs.clarifications) == 1 + assert migrated_back.plan_run.outputs.clarifications[0] == clarification + + # Check that V2 context can access clarifications + outstanding = context_v2.plan_run.get_outstanding_clarifications() + assert len(outstanding) == 1 + assert outstanding[0] == clarification \ No newline at end of file