diff --git a/docs/tech-specs/confidence-based-agents.md b/docs/tech-specs/confidence-based-agents.md new file mode 100644 index 00000000..28195c0e --- /dev/null +++ b/docs/tech-specs/confidence-based-agents.md @@ -0,0 +1,871 @@ +# TrustGraph Confidence-Based Agent Architecture +## Technical Specification v1.0 + +### Executive Summary + +This document specifies a new agent architecture for TrustGraph that introduces confidence-based execution control as an alternative to the existing ReAct-based agent system. The architecture will be implemented as a new module set under `trustgraph-flow/trustgraph/agent/confidence/` to provide enhanced reliability, auditability, and reduced hallucinations for critical knowledge graph operations. + +### 1. Architecture Overview + +#### 1.1 Design Principles + +- **Modularity**: New confidence-based agent lives alongside existing ReAct agent +- **Service-Oriented**: Follows TrustGraph's existing Pulsar-based service patterns +- **Schema-Driven**: Leverages existing schema definitions with minimal extensions +- **Tool Agnostic**: Works with existing tools (KnowledgeQuery, TextCompletion, McpTool) + +#### 1.2 High-Level Architecture + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ Gateway Service Layer │ +│ (dispatch/agent_confidence.py) │ +└─────────────────────────────┬─────────────────────────────────────┘ + │ + Pulsar Message Bus + │ +┌─────────────────────────────┴────────────────────────────────────┐ +│ Confidence Agent Service │ +│ (agent/confidence/service.py) │ +│ │ +│ ┌──────────────┐ ┌─────────────────┐ ┌────────────────┐ │ +│ │ Planner │ │ Flow Controller │ │ Confidence │ │ +│ │ Module │─▶│ Module │─▶│ Evaluator │ │ +│ └──────────────┘ └─────────────────┘ └────────────────┘ │ +│ │ │ │ │ +│ ▼ ▼ ▼ │ +│ ┌──────────────┐ ┌───────────────┐ ┌────────────────┐ │ +│ │ Execution │ │ Memory │ │ Audit │ │ +│ │ Engine │◄──│ Manager │ │ Logger │ │ +│ └──────────────┘ └───────────────┘ └────────────────┘ │ +└──────────────────────────────────────────────────────────────────┘ + │ + Tool Service Clients + │ + ┌───────────────┬───────┴─────────┬─────────────────┐ + ▼ ▼ ▼ ▼ +KnowledgeQuery TextCompletion McpTool PromptService +``` + +### 2. Module Specifications + +#### 2.1 Core Modules Location + +All new modules will be created under: +``` +trustgraph-flow/trustgraph/agent/confidence/ +├── __init__.py +├── __main__.py +├── service.py # Main service entry point +├── planner.py # Planning module +├── flow_controller.py # Flow orchestration +├── confidence.py # Confidence evaluation +├── memory.py # Memory management +├── executor.py # Step execution +├── audit.py # Audit logging +└── types.py # Type definitions +``` + +#### 2.2 External Interface - Drop-in Replacement + +The confidence-based agent uses the existing `AgentRequest` and `AgentResponse` schemas as its external interface, making it a drop-in replacement for the ReAct agent: + +**Input:** `AgentRequest` (from `trustgraph-base/trustgraph/schema/services/agent.py`) +**Output:** `AgentResponse` (from `trustgraph-base/trustgraph/schema/services/agent.py`) + +This ensures complete compatibility with existing gateway dispatchers and client code. + +#### 2.3 Internal Schemas + +New internal schemas in `trustgraph-base/trustgraph/schema/services/agent_confidence.py`: + +**ConfidenceMetrics** +- `score`: Float - Confidence score (0.0 to 1.0) +- `reasoning`: String - Explanation of score calculation +- `retry_count`: Integer - Number of retries attempted + +**ExecutionStep** +- `id`: String - Unique step identifier +- `function`: String - Tool/function to execute +- `arguments`: Map(String) - Arguments for the function +- `dependencies`: Array(String) - IDs of prerequisite steps +- `confidence_threshold`: Float - Minimum acceptable confidence +- `timeout_ms`: Integer - Execution timeout + +**ExecutionPlan** +- `id`: String - Plan identifier +- `steps`: Array(ExecutionStep) - Ordered execution steps +- `context`: Map(String) - Global context for plan + +**StepResult** +- `step_id`: String - Reference to ExecutionStep +- `success`: Boolean - Execution success status +- `output`: String - Step execution output +- `confidence`: ConfidenceMetrics - Confidence evaluation +- `execution_time_ms`: Integer - Actual execution time + +These internal schemas are used for: +- Passing structured data between confidence agent modules +- Storing execution state and metrics +- Audit logging and debugging + +#### 2.4 Communication Pattern + +The confidence agent sends multiple `AgentResponse` messages during execution, similar to ReAct's thought/observation pattern: + +1. **Planning Phase**: Sends responses with planning thoughts and observations about the generated execution plan +2. **Execution Phase**: For each step, sends responses with: + - `thought`: Current step being executed and confidence reasoning + - `observation`: Tool output and confidence evaluation +3. **Final Response**: Sends the final answer with overall confidence assessment + +This streaming approach provides real-time visibility into the agent's reasoning and confidence evaluations while maintaining compatibility with existing clients. + +### 3. Module Implementation Details + +#### 3.1 Planner Module (`planner.py`) + +The Planner Module generates structured execution plans from user requests using an LLM to create confidence-scored step sequences. + +**Key Responsibilities:** +- Parse user requests into structured plans +- Assign confidence thresholds based on operation criticality +- Determine step dependencies +- Select appropriate tool combinations + +#### 3.2 Flow Controller (`flow_controller.py`) + +The Flow Controller orchestrates plan execution with confidence-based control flow, managing step dependencies and retry logic. + +**Key Capabilities:** +- Step dependency resolution +- Confidence-based retry logic +- User override handling +- Graceful failure modes + +**Configuration Schema:** +```yaml +confidence_agent: + default_confidence_threshold: 0.7 + max_retries: 3 + retry_backoff_factor: 2.0 + override_enabled: true + step_timeout_ms: 30000 + parallel_execution: false +``` + +#### 3.3 Confidence Evaluator (`confidence.py`) + +The Confidence Evaluator calculates confidence scores for execution results based on multiple factors to ensure reliability. + +**Confidence Scoring Factors:** +- Graph query result size and consistency +- Entity extraction precision scores +- Vector search similarity thresholds +- LLM response coherence metrics + +#### 3.4 Memory Manager (`memory.py`) + +The Memory Manager handles inter-step data flow and context preservation, ensuring efficient memory usage while maintaining necessary state. + +**Memory Strategies:** +- Selective context passing based on dependencies +- Graph data serialization for efficiency +- Automatic context window management +- Result caching with TTL + +#### 3.5 Executor Module (`executor.py`) + +The Step Executor handles individual plan step execution using registered tools, managing tool selection, error handling, and result transformation. + +**Tool Mapping:** +- `GraphQuery` → GraphRagClient +- `TextCompletion` → TextCompletionClient +- `McpTool` → McpToolClient +- `Prompt` → PromptClient + +#### 3.6 Service Implementation (`service.py`) + +The main service class coordinates all confidence agent components and handles request/response flow through the Pulsar message bus. + +**Service Workflow:** +1. Generate execution plan via Planner Module +2. Execute plan with confidence control via Flow Controller +3. Generate response with confidence metrics and audit trail + +**Client Specifications:** +- TextCompletionClientSpec for LLM operations +- GraphRagClientSpec for knowledge graph queries +- ToolClientSpec for MCP tool invocations + +### 4. Integration Points + +#### 4.1 Gateway Integration + +The confidence agent reuses the existing gateway dispatcher `trustgraph-flow/trustgraph/gateway/dispatch/agent.py` since it uses the same AgentRequest and AgentResponse schemas. No new dispatcher is needed, making it a true drop-in replacement. + +#### 4.2 Configuration Integration + +Configuration in deployment YAML: + +```yaml +services: + - name: confidence-agent + module: trustgraph.agent.confidence + instances: 2 + config: + max_iterations: 15 + confidence_threshold: 0.75 + + # Existing react agent continues to work + - name: react-agent + module: trustgraph.agent.react + instances: 2 +``` + +#### 4.3 Tool Integration + +The confidence agent reuses existing tool implementations: +- `KnowledgeQueryImpl` for graph RAG operations +- `TextCompletionImpl` for LLM completions +- `McpToolImpl` for MCP tool invocations +- `PromptImpl` for prompt-based operations + +No changes required to existing tools. + +### 5. End-to-End Execution Flow + +#### 5.1 Module Interaction Overview + +When an `AgentRequest` arrives, the confidence agent orchestrates the following flow: + +1. **Service Entry**: The main service receives the `AgentRequest` via Pulsar +2. **Planning Phase**: Service invokes Planner Module to generate an `ExecutionPlan` +3. **Execution Loop**: Service passes plan to Flow Controller, which: + - Resolves step dependencies + - For each step, calls Executor with context from Memory Manager + - Evaluator assesses confidence after each execution + - Retry logic triggered if confidence below threshold +4. **Response Stream**: Service sends `AgentResponse` messages at key points +5. **Audit Trail**: Logger records all decisions and confidence scores + +#### 5.2 Detailed Message Flow + +```mermaid +sequenceDiagram + participant Client + participant Service as ConfidenceAgent
Service + participant Planner + participant FlowCtrl as Flow
Controller + participant Memory + participant Executor + participant Evaluator + participant Tools + + Client->>Service: AgentRequest + Service->>Service: Parse request,
extract config + + %% Planning Phase + Service->>Planner: generate_plan(request) + Planner->>Tools: Query available tools + Planner->>Planner: LLM generates
ExecutionPlan + Planner-->>Service: ExecutionPlan + Service->>Client: AgentResponse
(planning thought) + + %% Execution Phase + Service->>FlowCtrl: execute_plan(plan) + + loop For each ExecutionStep + FlowCtrl->>Memory: get_context(step) + Memory-->>FlowCtrl: context + dependencies + + FlowCtrl->>Executor: execute_step(step, context) + Executor->>Tools: invoke_tool(name, args) + Tools-->>Executor: raw_result + + Executor->>Evaluator: evaluate(result) + Evaluator-->>Executor: ConfidenceMetrics + + alt Confidence >= threshold + Executor-->>FlowCtrl: StepResult (success) + FlowCtrl->>Memory: store_result(step, result) + FlowCtrl->>Service: Send progress + Service->>Client: AgentResponse
(step observation) + else Confidence < threshold + FlowCtrl->>FlowCtrl: Retry with backoff + Note over FlowCtrl: Max 3 retries by default + alt After max retries + FlowCtrl->>Service: Request override + Service->>Client: AgentResponse
(override request) + end + end + end + + FlowCtrl-->>Service: All StepResults + Service->>Service: Generate final answer + Service->>Client: AgentResponse
(final answer) +``` + +#### 5.3 Confidence Decision Points + +The confidence mechanism affects execution at three critical points: + +**1. Planning Confidence** +- Planner assigns confidence thresholds to each step based on: + - Operation criticality (graph mutations = higher threshold) + - Tool reliability history + - Query complexity +- Default thresholds: GraphQuery (0.8), TextCompletion (0.7), McpTool (0.6) + +**2. Execution Confidence** +- After each tool execution, Evaluator calculates confidence based on: + - Output completeness and structure + - Consistency with expected schemas + - Semantic coherence (for text outputs) + - Result size and validity (for graph queries) + +**3. Retry Decision** +- If confidence < threshold: + - First retry: Same parameters with backoff + - Second retry: Adjusted parameters (e.g., broader query) + - Third retry: Simplified approach + - After max retries: User override or graceful failure + +#### 5.4 Example: Graph Query with Low Confidence + +**Scenario**: User asks "What are the connections between Entity X and Entity Y?" + +**Step 1: Planning** +``` +AgentRequest arrives: + question: "What are the connections between Entity X and Entity Y?" + +Planner generates ExecutionPlan: + Step 1: GraphQuery + function: "GraphQuery" + arguments: {"query": "MATCH path=(x:Entity {name:'X'})-[*..3]-(y:Entity {name:'Y'}) RETURN path"} + confidence_threshold: 0.8 +``` + +**Step 2: First Execution** +``` +Executor runs GraphQuery: + Result: Empty result set [] + +Evaluator assesses confidence: + Score: 0.3 (low - empty results suspicious) + Reasoning: "Empty result may indicate entities don't exist or query too restrictive" + +Flow Controller decides: + 0.3 < 0.8 threshold → RETRY +``` + +**Step 3: Retry with Adjusted Query** +``` +Flow Controller adjusts parameters: + New query: "MATCH (x:Entity), (y:Entity) WHERE x.name CONTAINS 'X' AND y.name CONTAINS 'Y' RETURN x, y" + +Executor runs adjusted query: + Result: Found 2 entities but no connections + +Evaluator assesses confidence: + Score: 0.85 + Reasoning: "Entities exist but genuinely unconnected" + +Flow Controller decides: + 0.85 >= 0.8 threshold → SUCCESS +``` + +**Step 4: Response Stream** +``` +AgentResponse 1 (planning): + thought: "Planning graph traversal query to find connections" + observation: "Generated query with 3-hop path search" + +AgentResponse 2 (retry): + thought: "Initial query returned empty, adjusting search parameters" + observation: "Retrying with broader entity matching" + +AgentResponse 3 (final): + answer: "Entity X and Entity Y exist in the graph but have no direct or indirect connections within 3 hops" + thought: "Query successful with high confidence after parameter adjustment" + observation: "Confidence: 0.85 - Entities verified to exist but unconnected" +``` + +#### 5.5 Example: Multi-Step Plan with Dependencies + +**Scenario**: "Summarize the main topics discussed about AI regulation" + +**ExecutionPlan Generated**: +``` +Step 1: GraphQuery - Find documents about AI regulation + confidence_threshold: 0.75 + +Step 2: TextCompletion - Extract key topics from documents + dependencies: [Step 1] + confidence_threshold: 0.7 + +Step 3: TextCompletion - Generate summary + dependencies: [Step 2] + confidence_threshold: 0.8 +``` + +**Execution Flow**: +1. **Step 1 Success** (confidence: 0.9) + - Found 15 relevant documents + - Memory Manager stores document list + +2. **Step 2 Initial Failure** (confidence: 0.5) + - Topics extraction unclear + - Retry with more specific prompt + - **Retry Success** (confidence: 0.75) + - Memory Manager stores topics list + +3. **Step 3 Success** (confidence: 0.85) + - Uses topics from memory + - Generates coherent summary + +**Total AgentResponses sent**: 6 +- 1 for planning +- 2 for Step 1 (execution + success) +- 2 for Step 2 (failure + retry success) +- 1 for Step 3 +- 1 final response + +### 6. Monitoring and Observability + +#### 6.1 Metrics + +New metrics to expose via Prometheus: + +**Confidence Metrics:** +- `agent_confidence_score` - Histogram of confidence scores with buckets [0.1, 0.3, 0.5, 0.7, 0.9, 1.0] +- `agent_confidence_failures` - Counter of steps failing confidence thresholds + +**Retry Metrics:** +- `agent_retry_count` - Counter of retries by function name +- `agent_retry_success_rate` - Gauge of retry success percentage + +**Plan Execution Metrics:** +- `agent_plan_execution_seconds` - Histogram of total plan execution time +- `agent_step_execution_seconds` - Histogram of individual step execution time +- `agent_plan_complexity` - Histogram of number of steps per plan + +#### 6.2 Audit Trail + +Structured audit logging format: + +```json +{ + "execution_id": "550e8400-e29b-41d4-a716-446655440000", + "timestamp": "2024-01-15T10:30:00Z", + "request": { + "question": "Find relationships between entities X and Y", + "confidence_threshold": 0.75 + }, + "plan": { + "steps": [ + { + "id": "step-1", + "function": "GraphQuery", + "confidence_threshold": 0.8 + } + ] + }, + "execution": [ + { + "step_id": "step-1", + "start_time": "2024-01-15T10:30:01Z", + "end_time": "2024-01-15T10:30:02Z", + "confidence_score": 0.85, + "retry_count": 0, + "success": true + } + ], + "final_confidence": 0.85, + "total_duration_ms": 1500 +} +``` + +### 7. Testing Strategy + +#### 7.1 Unit Tests + +Location: `tests/unit/test_agent/test_confidence/` + +**Test Coverage Areas:** +- Plan generation with various request types +- Confidence score calculation and validation +- Memory manager context handling +- Flow controller retry logic +- Executor tool mapping and error handling + +#### 7.2 Integration Tests + +Location: `tests/integration/test_agent_confidence/` + +**Test Scenarios:** +- End-to-end confidence flow with mock services +- Multi-step plan execution with dependencies +- Retry behavior under various confidence scores +- User override flow simulation +- Fallback to ReAct agent on failure + +#### 7.3 Contract Tests + +**Contract Validation:** +- Pulsar message schema serialization/deserialization +- Compatibility with existing tool service interfaces +- Gateway dispatcher protocol compliance +- Response format consistency with ReAct agent where applicable + +### 8. Performance Considerations + +#### 8.1 Expected Performance Impact + +| Metric | ReAct Agent | Confidence Agent | Impact | +|--------|------------|------------------|--------| +| Latency (p50) | 500ms | 650ms | +30% due to planning | +| Latency (p99) | 2000ms | 3000ms | +50% with retries | +| Success Rate | 85% | 92% | +7% improvement | +| Memory Usage | 512MB | 768MB | +50% for context | + +#### 8.2 Optimization Strategies + +- **Plan Caching**: Cache plans for similar requests +- **Parallel Execution**: Execute independent steps concurrently +- **Confidence Precomputation**: Pre-calculate confidence for common operations +- **Context Pruning**: Aggressive memory management for large contexts + +### 9. Security Considerations + +#### 9.1 Data Protection + +- Confidence scores must not leak sensitive information +- Audit trails sanitized before logging +- Memory manager respects data classification levels + +#### 9.2 Access Control + +- Inherit existing TrustGraph RBAC policies +- Override functionality requires elevated privileges +- Audit trail access restricted to administrators + +### 10. Phase 2: Microservices Architecture + +#### 10.1 Evolution from Monolithic to Microservices + +The initial implementation of the confidence-based agent will be a single monolithic service containing all modules (Planner, Flow Controller, Executor, Evaluator, Memory Manager, and Audit Logger) as described in this specification. This approach simplifies initial development and testing. + +In Phase 2, the monolithic service will be decomposed into separate microservices, each exposed as independent Pulsar request/response services. This evolution provides: +- Better scalability and resource allocation +- Independent deployment and versioning +- Fault isolation and resilience +- Reusability across different agent architectures + +#### 10.2 Proposed Microservice Decomposition + +The following microservices will be created, each with its own request/response schema: + +**1. Planning Service** (`trustgraph-flow/trustgraph/planning/`) +- **Purpose**: Generate execution plans from natural language requests +- **Request Schema**: `PlanningRequest` (question, context, available_tools) +- **Response Schema**: `PlanningResponse` (execution_plan, confidence_thresholds) +- **Queue**: `planning-request` / `planning-response` +- **Reusability**: Can be used by other agent architectures needing structured plans + +**2. Confidence Evaluation Service** (`trustgraph-flow/trustgraph/confidence/`) +- **Purpose**: Evaluate confidence scores for any execution result +- **Request Schema**: `ConfidenceRequest` (function_name, input, output, context) +- **Response Schema**: `ConfidenceResponse` (score, reasoning, recommendations) +- **Queue**: `confidence-request` / `confidence-response` +- **Reusability**: Can evaluate confidence for any service output + +**3. Execution Context Service** (`trustgraph-flow/trustgraph/context/`) +- **Purpose**: Manage execution context and memory across steps +- **Request Schema**: `ContextRequest` (operation: store/retrieve, step_id, data) +- **Response Schema**: `ContextResponse` (context_data, dependencies) +- **Queue**: `context-request` / `context-response` +- **Reusability**: General-purpose context management for workflows + +**4. Flow Orchestration Service** (`trustgraph-flow/trustgraph/orchestration/`) +- **Purpose**: Execute plans with dependency resolution and retry logic +- **Request Schema**: `OrchestrationRequest` (execution_plan, config) +- **Response Schema**: `OrchestrationResponse` (step_results, status) +- **Queue**: `orchestration-request` / `orchestration-response` +- **Reusability**: Can orchestrate any structured execution plan + +**5. Audit Service** (`trustgraph-flow/trustgraph/audit/`) +- **Purpose**: Centralized audit logging for all agent operations +- **Request Schema**: `AuditRequest` (execution_id, event_type, details) +- **Response Schema**: `AuditResponse` (logged, audit_id) +- **Queue**: `audit-request` / `audit-response` +- **Reusability**: System-wide audit trail service + +#### 10.3 Phase 2 Architecture + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ Gateway Service Layer │ +└─────────────────────────────┬─────────────────────────────────────┘ + │ + Pulsar Message Bus + │ +┌─────────────────────────────┴────────────────────────────────────┐ +│ Confidence Agent Coordinator Service │ +│ (thin orchestration layer) │ +└─────────────────────────────┬────────────────────────────────────┘ + │ + ┌─────────────────┼─────────────────┐ + │ │ │ + ┌──────▼───────┐ ┌─────▼──────┐ ┌──────▼───────┐ + │ Planning │ │ Confidence │ │ Context │ + │ Service │ │ Evaluation │ │ Service │ + └──────────────┘ │ Service │ └──────────────┘ + └────────────┘ + │ │ │ + ┌──────▼───────┐ ┌─────▼──────┐ + │ Flow │ │ Audit │ + │ Orchestration│ │ Service │ + │ Service │ └────────────┘ + └──────────────┘ +``` + +#### 10.4 Migration Strategy + +**Phase 1 to Phase 2 Migration Path:** + +1. **Extract Interfaces**: Define clean interfaces between existing modules +2. **Create Service Wrappers**: Wrap each module with Pulsar service endpoints +3. **Gradual Extraction**: Extract one service at a time: + - Start with Audit Service (least coupled) + - Then Confidence Evaluation Service + - Follow with Context Service + - Extract Planning Service + - Finally, Flow Orchestration Service +4. **Coordinator Simplification**: Reduce main service to thin coordination layer +5. **Testing**: Ensure backward compatibility at each step + +#### 10.5 Benefits of Microservices Architecture + +**Operational Benefits:** +- **Independent Scaling**: Scale planning service during peak analysis +- **Fault Isolation**: Confidence service failure doesn't affect execution +- **Technology Flexibility**: Use specialized models for different services +- **Development Velocity**: Teams can work on services independently + +**Architectural Benefits:** +- **Reusability**: Planning service usable by other agents +- **Composability**: Mix and match services for different use cases +- **Versioning**: Deploy service updates without full system changes +- **Testing**: Easier unit and integration testing per service + +#### 10.6 Configuration for Phase 2 + +```yaml +# Phase 2 microservices configuration +services: + - name: confidence-agent-coordinator + module: trustgraph.agent.confidence_coordinator + dependencies: + - planning-service + - confidence-service + - context-service + - orchestration-service + - audit-service + + - name: planning-service + module: trustgraph.planning + instances: 3 + + - name: confidence-service + module: trustgraph.confidence + instances: 2 + + - name: context-service + module: trustgraph.context + instances: 2 + storage: redis + + - name: orchestration-service + module: trustgraph.orchestration + instances: 4 + + - name: audit-service + module: trustgraph.audit + instances: 1 + storage: postgresql +``` + +### 11. Open Questions and Future Work + +#### 11.1 Immediate Questions for Implementation + +1. **LLM Integration**: The Planning Module will use the existing prompt service for all LLM interactions, with prompt templates stored in the configuration service following TrustGraph's standard approach. This ensures consistency with existing patterns and centralized template management. + +2. **Confidence Calibration**: What specific calibration methodology should be used to ensure confidence scores are meaningful across different operation types? + +3. **Parallel Execution**: Should Phase 1 include parallel step execution, or defer to Phase 2? + +#### 11.2 Future Enhancements + +1. **Adaptive Thresholds**: Machine learning-based threshold adjustment based on historical performance + +2. **Plan Templates**: Pre-defined execution templates for common query patterns + +3. **Multi-Agent Coordination**: Support for confidence-based multi-agent workflows + +4. **Explainable Confidence**: Natural language explanations for confidence scores + +### 12. Conclusion + +This specification defines a confidence-based agent architecture that: + +- **Integrates seamlessly** with existing TrustGraph infrastructure +- **Provides enhanced reliability** through confidence-based control +- **Maintains compatibility** with existing tools and services +- **Enables gradual adoption** through side-by-side deployment + +The architecture is designed to be implemented incrementally, tested thoroughly, and deployed safely alongside the existing ReAct agent system. + +### Appendix A: Configuration Strategy + +#### A.1 Configuration Hierarchy + +The confidence-based agent follows TrustGraph's standard configuration approach with multiple override levels: + +**1. Command Line Parameters** (highest precedence) +```bash +# Example command line with overrides +tg-confidence-agent \ + --confidence-threshold=0.8 \ + --max-retries=5 \ + --timeout-ms=45000 \ + --audit-enabled=true +``` + +**2. Built-in Defaults** (lowest precedence) +- Ensures useful functionality without any configuration +- Default confidence threshold: 0.75 +- Default max retries: 3 +- Default timeout: 30000ms +- All core functionality enabled by default + +**3. Future Configuration Sources** (open question) +- Request-level overrides in AgentRequest.plan field +- Configuration service stored settings +- Runtime dynamic adjustment based on performance + +#### A.2 Default Configuration Values + +The service operates with sensible defaults when no parameters are specified: + +```yaml +# Built-in defaults (no configuration required) +defaults: + confidence_threshold: 0.75 + max_retries: 3 + retry_backoff_factor: 2.0 + step_timeout_ms: 30000 + max_iterations: 15 + + # Tool-specific defaults + tool_thresholds: + GraphQuery: 0.8 + TextCompletion: 0.7 + McpTool: 0.6 + + # Memory defaults + max_context_size: 8192 + cache_ttl_seconds: 300 + + # Audit defaults + audit_enabled: true + audit_level: INFO + + # Performance defaults + parallel_execution: false + plan_cache_size: 100 +``` + +#### A.3 Command Line Override Examples + +```bash +# High confidence mode +tg-confidence-agent --confidence-threshold=0.9 --max-retries=1 + +# Development mode with verbose audit +tg-confidence-agent --audit-level=DEBUG --timeout-ms=60000 + +# Performance optimized +tg-confidence-agent --parallel-execution=true --plan-cache-size=500 + +# Tool-specific threshold override +tg-confidence-agent --graph-query-threshold=0.85 +``` + +#### A.4 Open Questions for Future Configuration + +1. **Request-Level Overrides**: Should confidence thresholds be configurable per-request via the AgentRequest.plan field? + +2. **Dynamic Configuration**: Should the service support runtime configuration updates via the configuration service? + +3. **User Profiles**: Should different user types have different default confidence thresholds? + +4. **Context-Aware Thresholds**: Should confidence thresholds adapt based on query complexity or domain? + +5. **Configuration Persistence**: Should override settings be persisted across service restarts? + +#### A.5 Configuration Priority Resolution + +When multiple configuration sources are present: +1. Command line parameters override all others +2. Built-in defaults provide baseline functionality +3. Future sources (request/config service) will insert between these levels +4. Configuration validation ensures all values remain within acceptable ranges + +### Appendix B: API Examples + +#### Request Example (AgentRequest) + +```json +{ + "question": "What are the relationships between Company A and Company B in the knowledge graph?", + "plan": "{\"confidence_threshold\": 0.8, \"max_retries\": 3}", + "state": "initial", + "history": [] +} +``` + +#### Interim Response Example (AgentResponse - Planning) + +```json +{ + "answer": "", + "thought": "Creating execution plan with confidence thresholds for graph query", + "observation": "Plan generated: 1 step with GraphQuery function, confidence threshold 0.8", + "error": null +} +``` + +#### Interim Response Example (AgentResponse - Execution) + +```json +{ + "answer": "", + "thought": "Executing GraphQuery to find relationships between Company A and Company B", + "observation": "Query returned 3 relationships with confidence score 0.92", + "error": null +} +``` + +#### Final Response Example (AgentResponse) + +```json +{ + "answer": "Company A and Company B have 3 relationships: 1) Partnership agreement signed 2023, 2) Shared board member John Doe, 3) Joint venture in Project X", + "thought": "Analysis complete with high confidence (0.92)", + "observation": "All steps executed successfully. Audit trail available at: execution-log-789", + "error": null +} +``` diff --git a/tests/.coverage b/tests/.coverage new file mode 100644 index 00000000..6d556f2b Binary files /dev/null and b/tests/.coverage differ diff --git a/tests/unit/test_agent/test_confidence/__init__.py b/tests/unit/test_agent/test_confidence/__init__.py new file mode 100644 index 00000000..5d02039d --- /dev/null +++ b/tests/unit/test_agent/test_confidence/__init__.py @@ -0,0 +1 @@ +# Confidence agent unit tests \ No newline at end of file diff --git a/tests/unit/test_agent/test_confidence/test_confidence_evaluator.py b/tests/unit/test_agent/test_confidence/test_confidence_evaluator.py new file mode 100644 index 00000000..66f8f03e --- /dev/null +++ b/tests/unit/test_agent/test_confidence/test_confidence_evaluator.py @@ -0,0 +1,365 @@ +""" +Unit tests for the confidence evaluator module. + +Tests confidence scoring logic for different tool types following TrustGraph testing patterns. +""" + +import pytest +import json +from unittest.mock import Mock, AsyncMock + +from trustgraph.agent.confidence.confidence import ConfidenceEvaluator +from trustgraph.agent.confidence.types import ConfidenceMetrics + + +class TestConfidenceEvaluator: + """Test cases for the ConfidenceEvaluator class.""" + + @pytest.fixture + def evaluator(self): + """Create a ConfidenceEvaluator instance for testing.""" + return ConfidenceEvaluator() + + # Graph Query Tests + + def test_evaluate_graph_query_success_json_results(self, evaluator): + """Test confidence evaluation for successful GraphQuery with JSON results.""" + # Arrange + function_name = "GraphQuery" + arguments = {"query": "MATCH (n) RETURN n LIMIT 10"} + output = json.dumps([{"id": 1, "name": "entity1"}, {"id": 2, "name": "entity2"}]) + execution_time_ms = 500 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert 0.8 <= result.score <= 1.0 # Good result size should have high confidence + assert "2 items" in result.reasoning + assert result.retry_count == 0 + + def test_evaluate_graph_query_empty_results(self, evaluator): + """Test confidence evaluation for GraphQuery with empty results.""" + # Arrange + function_name = "GraphQuery" + arguments = {"query": "MATCH (n) WHERE false RETURN n"} + output = "[]" + execution_time_ms = 200 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.4 # Empty results should have low confidence + assert "Empty result set" in result.reasoning + assert result.retry_count == 0 + + def test_evaluate_graph_query_large_results(self, evaluator): + """Test confidence evaluation for GraphQuery with very large results.""" + # Arrange + function_name = "GraphQuery" + arguments = {"query": "MATCH (n) RETURN n"} + large_result = [{"id": i} for i in range(1500)] + output = json.dumps(large_result) + execution_time_ms = 2000 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert 0.6 <= result.score <= 0.8 # Large results should have moderate confidence + assert "1500 items" in result.reasoning + + def test_evaluate_graph_query_non_json_output(self, evaluator): + """Test confidence evaluation for GraphQuery with non-JSON output.""" + # Arrange + function_name = "GraphQuery" + arguments = {"query": "RETURN 'hello world'"} + output = "hello world" + execution_time_ms = 100 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert 0.5 <= result.score <= 0.7 + assert "Non-JSON response" in result.reasoning + + def test_evaluate_graph_query_very_fast_execution(self, evaluator): + """Test confidence penalty for very fast execution.""" + # Arrange + function_name = "GraphQuery" + arguments = {"query": "RETURN 1"} + output = json.dumps([{"result": 1}]) + execution_time_ms = 50 # Very fast execution + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert "Very fast execution" in result.reasoning + # Score should be penalized for suspiciously fast execution + + # Text Completion Tests + + def test_evaluate_text_completion_success(self, evaluator): + """Test confidence evaluation for successful TextCompletion.""" + # Arrange + function_name = "TextCompletion" + arguments = {"prompt": "Explain quantum physics"} + output = "Quantum physics is the branch of physics that studies matter and energy at the smallest scales. It describes phenomena that occur at atomic and subatomic levels, where classical physics breaks down." + execution_time_ms = 1500 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score >= 0.7 + assert "Reasonable response length" in result.reasoning + assert "Well-structured response" in result.reasoning + assert result.retry_count == 0 + + def test_evaluate_text_completion_empty_response(self, evaluator): + """Test confidence evaluation for TextCompletion with empty response.""" + # Arrange + function_name = "TextCompletion" + arguments = {"prompt": "Tell me about AI"} + output = "" + execution_time_ms = 500 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.2 + assert "Empty response" in result.reasoning + + def test_evaluate_text_completion_short_response(self, evaluator): + """Test confidence evaluation for TextCompletion with very short response.""" + # Arrange + function_name = "TextCompletion" + arguments = {"prompt": "What is AI?"} + output = "AI is AI." + execution_time_ms = 300 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.5 + assert "Very short response" in result.reasoning + + def test_evaluate_text_completion_error_indicators(self, evaluator): + """Test confidence evaluation for TextCompletion with error indicators.""" + # Arrange + function_name = "TextCompletion" + arguments = {"prompt": "Solve this complex equation"} + output = "I don't know how to solve this equation. Sorry, I cannot help with this." + execution_time_ms = 800 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.6 + assert "Response indicates uncertainty" in result.reasoning + + def test_evaluate_text_completion_very_long_response(self, evaluator): + """Test confidence evaluation for TextCompletion with suspiciously long response.""" + # Arrange + function_name = "TextCompletion" + arguments = {"prompt": "What is 2+2?"} + output = "Two plus two equals four. " * 1000 # Very long response for simple question + execution_time_ms = 3000 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.8 # Updated to match actual scoring logic + assert "Very long response" in result.reasoning + + # MCP Tool Tests + + def test_evaluate_mcp_tool_success(self, evaluator): + """Test confidence evaluation for successful McpTool execution.""" + # Arrange + function_name = "McpTool" + arguments = {"name": "weather_tool", "parameters": {"location": "San Francisco"}} + output = json.dumps({"temperature": 72, "conditions": "sunny", "success": True}) + execution_time_ms = 1200 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score >= 0.7 + assert "Tool execution successful" in result.reasoning + + def test_evaluate_mcp_tool_error_response(self, evaluator): + """Test confidence evaluation for McpTool with error response.""" + # Arrange + function_name = "McpTool" + arguments = {"name": "broken_tool", "parameters": {}} + output = json.dumps({"error": "Tool execution failed", "success": False}) + execution_time_ms = 500 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.3 + assert "Tool returned error" in result.reasoning + + def test_evaluate_mcp_tool_text_error(self, evaluator): + """Test confidence evaluation for McpTool with text error output.""" + # Arrange + function_name = "McpTool" + arguments = {"name": "api_tool", "parameters": {"endpoint": "/invalid"}} + output = "Error: API endpoint not found. Request failed." + execution_time_ms = 800 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.4 + assert "Error indicated in output" in result.reasoning + + def test_evaluate_mcp_tool_timeout_risk(self, evaluator): + """Test confidence evaluation for McpTool with long execution time.""" + # Arrange + function_name = "McpTool" + arguments = {"name": "slow_tool", "parameters": {}} + output = json.dumps({"result": "completed", "success": True}) + execution_time_ms = 65000 # Over 1 minute + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.7 # Should be penalized for long execution + assert "Tool execution timeout risk" in result.reasoning + + # Retry Testing + + def test_evaluate_with_retries(self, evaluator): + """Test confidence evaluation with retry penalties.""" + # Arrange + function_name = "GraphQuery" + arguments = {"query": "MATCH (n) RETURN n"} + output = json.dumps([{"id": 1}]) + execution_time_ms = 500 + retry_count = 2 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms, retry_count) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.retry_count == retry_count + assert "Retry penalty applied (2 retries)" in result.reasoning + # Score should be lower than without retries due to penalty + + # Generic Function Tests + + def test_evaluate_unknown_function(self, evaluator): + """Test confidence evaluation for unknown function type.""" + # Arrange + function_name = "UnknownFunction" + arguments = {"param": "value"} + output = "Some output" + execution_time_ms = 1000 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert 0.5 <= result.score <= 0.7 + assert "Generic evaluation for unknown function" in result.reasoning + assert "Function returned output" in result.reasoning + + def test_evaluate_unknown_function_no_output(self, evaluator): + """Test confidence evaluation for unknown function with no output.""" + # Arrange + function_name = "UnknownFunction" + arguments = {"param": "value"} + output = "" + execution_time_ms = 500 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.4 + assert "No output from function" in result.reasoning + + # Prompt Function Tests + + def test_evaluate_prompt_function(self, evaluator): + """Test confidence evaluation for Prompt function.""" + # Arrange + function_name = "Prompt" + arguments = {"prompt": "analyze this data"} + output = "The data shows a clear upward trend with seasonal variations." + execution_time_ms = 1200 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score >= 0.7 # Should use text completion evaluation + assert "Reasonable response length" in result.reasoning + + # Edge Cases + + def test_evaluate_with_exception_in_evaluation(self, evaluator): + """Test confidence evaluation handles exceptions gracefully.""" + # Arrange + function_name = "GraphQuery" + arguments = {"query": "test"} + # Create problematic output that might cause JSON parsing issues + output = "{'invalid': json, syntax}" + execution_time_ms = 500 + + # Act + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + + # Assert + assert isinstance(result, ConfidenceMetrics) + assert result.score <= 0.7 # Updated - non-JSON response gets moderate score + # Should handle the exception and provide reasonable confidence score + + def test_confidence_score_bounds(self, evaluator): + """Test that confidence scores are always within valid bounds.""" + # Arrange + test_cases = [ + ("GraphQuery", {}, "[]", 100), + ("TextCompletion", {}, "Great response!", 1000), + ("McpTool", {}, '{"success": true}', 500), + ("UnknownFunction", {}, "", 0), + ] + + # Act & Assert + for function_name, arguments, output, execution_time_ms in test_cases: + result = evaluator.evaluate(function_name, arguments, output, execution_time_ms) + assert 0.0 <= result.score <= 1.0, f"Score out of bounds for {function_name}: {result.score}" + assert isinstance(result.reasoning, str) + assert len(result.reasoning) > 0 + assert result.retry_count >= 0 \ No newline at end of file diff --git a/trustgraph-base/trustgraph/schema/services/agent_confidence.py b/trustgraph-base/trustgraph/schema/services/agent_confidence.py new file mode 100644 index 00000000..648c373d --- /dev/null +++ b/trustgraph-base/trustgraph/schema/services/agent_confidence.py @@ -0,0 +1,47 @@ +""" +Schema definitions for the confidence-based agent internal communication. + +These schemas are used internally between confidence agent modules and are not +part of the external API. The confidence agent uses the existing AgentRequest +and AgentResponse schemas for external communication. +""" + +from pulsar.schema import Record, String, Array, Map, Float, Integer, Boolean + +############################################################################ + +# Confidence evaluation schemas + +class ConfidenceMetrics(Record): + """Confidence evaluation metrics for execution results.""" + score = Float() # Confidence score (0.0 to 1.0) + reasoning = String() # Explanation of score calculation + retry_count = Integer() # Number of retries attempted + + +class ExecutionStep(Record): + """Individual step in an execution plan.""" + id = String() # Unique step identifier + function = String() # Tool/function to execute + arguments = Map(String()) # Arguments for the function + dependencies = Array(String()) # IDs of prerequisite steps + confidence_threshold = Float() # Minimum acceptable confidence + timeout_ms = Integer() # Execution timeout + + +class ExecutionPlan(Record): + """Complete execution plan with ordered steps.""" + id = String() # Plan identifier + steps = Array(ExecutionStep()) # Ordered execution steps + context = Map(String()) # Global context for plan + + +class StepResult(Record): + """Result of executing a single step.""" + step_id = String() # Reference to ExecutionStep + success = Boolean() # Execution success status + output = String() # Step execution output + confidence = ConfidenceMetrics() # Confidence evaluation + execution_time_ms = Integer() # Actual execution time + +############################################################################ \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/__init__.py b/trustgraph-flow/trustgraph/agent/confidence/__init__.py new file mode 100644 index 00000000..f43dee60 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/__init__.py @@ -0,0 +1,17 @@ +""" +TrustGraph Confidence-Based Agent + +This module implements a confidence-based agent architecture that provides +enhanced reliability and auditability compared to traditional ReAct agents. + +The agent uses structured execution plans with confidence-based control flow, +ensuring high-quality outputs through systematic evaluation and retry mechanisms. +""" + +__all__ = [ + "ConfidenceAgent", + "ConfidenceMetrics", + "ExecutionStep", + "ExecutionPlan", + "StepResult" +] \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/__main__.py b/trustgraph-flow/trustgraph/agent/confidence/__main__.py new file mode 100644 index 00000000..c598e345 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/__main__.py @@ -0,0 +1,210 @@ +""" +Confidence Agent Entry Point + +Main entry point for running the confidence-based agent as a standalone service. +""" + +import sys +import asyncio +import logging +import argparse +from typing import Dict, Any + +from .service import Processor + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="TrustGraph Confidence-Based Agent Service" + ) + + # Service configuration + parser.add_argument( + "--id", + type=str, + default="confidence-agent", + help="Service identifier" + ) + + parser.add_argument( + "--max-iterations", + type=int, + default=15, + help="Maximum number of agent iterations" + ) + + # Confidence thresholds + parser.add_argument( + "--confidence-threshold", + type=float, + default=0.75, + help="Default confidence threshold (0.0-1.0)" + ) + + parser.add_argument( + "--graph-query-threshold", + type=float, + default=0.8, + help="Confidence threshold for GraphQuery operations" + ) + + parser.add_argument( + "--text-completion-threshold", + type=float, + default=0.7, + help="Confidence threshold for TextCompletion operations" + ) + + parser.add_argument( + "--mcp-tool-threshold", + type=float, + default=0.6, + help="Confidence threshold for McpTool operations" + ) + + # Retry configuration + parser.add_argument( + "--max-retries", + type=int, + default=3, + help="Maximum number of retries for low confidence steps" + ) + + parser.add_argument( + "--retry-backoff-factor", + type=float, + default=2.0, + help="Backoff factor for retry delays" + ) + + # Step execution + parser.add_argument( + "--step-timeout-ms", + type=int, + default=30000, + help="Default timeout for step execution (milliseconds)" + ) + + parser.add_argument( + "--parallel-execution", + action="store_true", + help="Enable parallel execution of independent steps" + ) + + # User interaction + parser.add_argument( + "--override-enabled", + action="store_true", + default=True, + help="Enable user override for low confidence results" + ) + + # Logging + parser.add_argument( + "--log-level", + type=str, + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + default="INFO", + help="Logging level" + ) + + parser.add_argument( + "--audit-level", + type=str, + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + default="INFO", + help="Audit logging level" + ) + + # Pulsar configuration (standard TrustGraph parameters) + parser.add_argument( + "--pulsar-endpoint", + type=str, + default="pulsar://localhost:6650", + help="Pulsar broker endpoint" + ) + + return parser.parse_args() + + +def setup_logging(log_level: str, audit_level: str): + """Setup logging configuration.""" + + # Configure main logging + logging.basicConfig( + level=getattr(logging, log_level.upper()), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Configure audit logging with separate level + audit_logger = logging.getLogger("confidence_agent_audit") + audit_logger.setLevel(getattr(logging, audit_level.upper())) + + logger = logging.getLogger(__name__) + logger.info(f"Logging configured: main={log_level}, audit={audit_level}") + + +async def main(): + """Main entry point.""" + args = parse_args() + + # Setup logging + setup_logging(args.log_level, args.audit_level) + + logger = logging.getLogger(__name__) + logger.info("Starting TrustGraph Confidence-Based Agent Service") + + # Build service parameters + params = { + "id": args.id, + "max_iterations": args.max_iterations, + "confidence_threshold": args.confidence_threshold, + "graph_query_threshold": args.graph_query_threshold, + "text_completion_threshold": args.text_completion_threshold, + "mcp_tool_threshold": args.mcp_tool_threshold, + "max_retries": args.max_retries, + "retry_backoff_factor": args.retry_backoff_factor, + "step_timeout_ms": args.step_timeout_ms, + "parallel_execution": args.parallel_execution, + "override_enabled": args.override_enabled, + "pulsar_endpoint": args.pulsar_endpoint, + } + + # Log configuration + logger.info("Service configuration:") + for key, value in params.items(): + logger.info(f" {key}: {value}") + + try: + # Create and run the service + processor = Processor(**params) + + logger.info("Confidence agent service initialized successfully") + + # Run the service (this will block) + await processor.run() + + except KeyboardInterrupt: + logger.info("Received interrupt signal, shutting down...") + except Exception as e: + logger.error(f"Service failed: {e}", exc_info=True) + sys.exit(1) + finally: + logger.info("Confidence agent service stopped") + + +def cli_main(): + """Entry point for command line execution.""" + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nShutdown requested by user") + except Exception as e: + print(f"Fatal error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + cli_main() \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/audit.py b/trustgraph-flow/trustgraph/agent/confidence/audit.py new file mode 100644 index 00000000..73c75b72 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/audit.py @@ -0,0 +1,387 @@ +""" +Audit Logger Module + +Provides structured audit logging for all confidence agent operations. +Records execution decisions, confidence scores, and complete audit trails. +""" + +import json +import time +import logging +from typing import Dict, Any, Optional, TYPE_CHECKING + +from .types import ExecutionPlan, ExecutionStep, StepResult, PlanExecution + +if TYPE_CHECKING: + pass + + +class AuditLogger: + """ + Centralized audit logging for confidence agent operations. + + Records structured audit trails including: + - Plan generation and validation + - Step execution with confidence scores + - Retry decisions and reasoning + - User overrides and manual interventions + - Performance metrics and timing + """ + + def __init__(self, logger_name: str = "confidence_agent_audit"): + # Create dedicated audit logger + self.audit_logger = logging.getLogger(logger_name) + self.audit_logger.setLevel(logging.INFO) + + # Prevent propagation to avoid duplication in main logs + self.audit_logger.propagate = False + + # Add handler if not already present + if not self.audit_logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - AUDIT - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + handler.setFormatter(formatter) + self.audit_logger.addHandler(handler) + + self.main_logger = logging.getLogger(__name__) + + # Track current execution context + self.current_execution_id: Optional[str] = None + self.execution_start_time: Optional[float] = None + + async def log_plan_start(self, plan: ExecutionPlan, execution: PlanExecution) -> None: + """ + Log the start of plan execution. + + Args: + plan: Execution plan being started + execution: Plan execution state + """ + self.current_execution_id = execution.plan_id + self.execution_start_time = time.time() + + audit_entry = { + "event": "plan_start", + "execution_id": execution.plan_id, + "timestamp": self._get_timestamp(), + "plan": { + "id": plan.id, + "step_count": len(plan.steps), + "steps": [ + { + "id": step.id, + "function": step.function, + "dependencies": step.dependencies, + "confidence_threshold": step.confidence_threshold, + "timeout_ms": step.timeout_ms + } + for step in plan.steps + ], + "context": plan.context + } + } + + self.audit_logger.info(json.dumps(audit_entry)) + self.main_logger.debug(f"Audit: Plan '{plan.id}' execution started") + + async def log_plan_complete(self, plan: ExecutionPlan, execution: PlanExecution) -> None: + """ + Log the completion of plan execution. + + Args: + plan: Execution plan that completed + execution: Final execution state + """ + total_duration = 0 + if self.execution_start_time: + total_duration = int((time.time() - self.execution_start_time) * 1000) + + # Calculate summary statistics + successful_steps = len(execution.completed_steps) + failed_steps = len(execution.failed_steps) + total_retries = sum(r.confidence.retry_count for r in execution.results) + + avg_confidence = 0.0 + if execution.results: + avg_confidence = sum(r.confidence.score for r in execution.results) / len(execution.results) + + audit_entry = { + "event": "plan_complete", + "execution_id": execution.plan_id, + "timestamp": self._get_timestamp(), + "status": execution.status.value, + "duration_ms": total_duration, + "summary": { + "total_steps": len(plan.steps), + "successful_steps": successful_steps, + "failed_steps": failed_steps, + "success_rate": successful_steps / len(plan.steps) if plan.steps else 0.0, + "total_retries": total_retries, + "average_confidence": avg_confidence + }, + "completed_steps": execution.completed_steps, + "failed_steps": execution.failed_steps + } + + self.audit_logger.info(json.dumps(audit_entry)) + self.main_logger.debug( + f"Audit: Plan '{plan.id}' completed with status {execution.status.value} " + f"({successful_steps}/{len(plan.steps)} successful)" + ) + + # Clear execution context + self.current_execution_id = None + self.execution_start_time = None + + async def log_step_start(self, step: ExecutionStep, retry_count: int = 0) -> None: + """ + Log the start of step execution. + + Args: + step: Execution step being started + retry_count: Current retry attempt number + """ + audit_entry = { + "event": "step_start", + "execution_id": self.current_execution_id, + "timestamp": self._get_timestamp(), + "step": { + "id": step.id, + "function": step.function, + "arguments": step.arguments, + "dependencies": step.dependencies, + "confidence_threshold": step.confidence_threshold, + "timeout_ms": step.timeout_ms, + "retry_count": retry_count + } + } + + self.audit_logger.info(json.dumps(audit_entry)) + + if retry_count > 0: + self.main_logger.debug(f"Audit: Step '{step.id}' retry {retry_count} started") + else: + self.main_logger.debug(f"Audit: Step '{step.id}' execution started") + + async def log_step_complete(self, step: ExecutionStep, result: StepResult) -> None: + """ + Log the completion of step execution. + + Args: + step: Execution step that completed + result: Step execution result + """ + audit_entry = { + "event": "step_complete", + "execution_id": self.current_execution_id, + "timestamp": self._get_timestamp(), + "step_id": step.id, + "result": { + "success": result.success, + "confidence": { + "score": result.confidence.score, + "reasoning": result.confidence.reasoning, + "retry_count": result.confidence.retry_count, + "threshold": step.confidence_threshold, + "threshold_met": result.confidence.score >= step.confidence_threshold + }, + "execution_time_ms": result.execution_time_ms, + "output_length": len(result.output) if result.output else 0, + "output_preview": result.output[:200] if result.output else "" + } + } + + self.audit_logger.info(json.dumps(audit_entry)) + self.main_logger.debug( + f"Audit: Step '{step.id}' completed " + f"(success: {result.success}, confidence: {result.confidence.score:.3f})" + ) + + async def log_retry_decision( + self, + step: ExecutionStep, + result: StepResult, + retry_count: int, + will_retry: bool, + reason: str + ) -> None: + """ + Log retry decision making. + + Args: + step: Step being retried + result: Current step result + retry_count: Current retry count + will_retry: Whether step will be retried + reason: Reason for retry decision + """ + audit_entry = { + "event": "retry_decision", + "execution_id": self.current_execution_id, + "timestamp": self._get_timestamp(), + "step_id": step.id, + "retry_count": retry_count, + "will_retry": will_retry, + "reason": reason, + "current_result": { + "success": result.success, + "confidence_score": result.confidence.score, + "confidence_threshold": step.confidence_threshold, + "confidence_reasoning": result.confidence.reasoning + } + } + + self.audit_logger.info(json.dumps(audit_entry)) + self.main_logger.debug( + f"Audit: Retry decision for step '{step.id}' - will_retry: {will_retry} ({reason})" + ) + + async def log_user_override( + self, + step: ExecutionStep, + result: StepResult, + override_granted: bool, + override_reason: Optional[str] = None + ) -> None: + """ + Log user override decisions. + + Args: + step: Step requiring override + result: Step result with low confidence + override_granted: Whether override was granted + override_reason: Optional reason provided by user + """ + audit_entry = { + "event": "user_override", + "execution_id": self.current_execution_id, + "timestamp": self._get_timestamp(), + "step_id": step.id, + "override_granted": override_granted, + "override_reason": override_reason, + "trigger": { + "confidence_score": result.confidence.score, + "confidence_threshold": step.confidence_threshold, + "confidence_reasoning": result.confidence.reasoning, + "retry_count": result.confidence.retry_count + } + } + + self.audit_logger.info(json.dumps(audit_entry)) + self.main_logger.info( + f"Audit: User override for step '{step.id}' - " + f"{'GRANTED' if override_granted else 'DENIED'}" + ) + + async def log_error(self, context_id: str, error_message: str, error_details: Optional[Dict[str, Any]] = None) -> None: + """ + Log errors during execution. + + Args: + context_id: ID of step or plan where error occurred + error_message: Error message + error_details: Optional additional error details + """ + audit_entry = { + "event": "error", + "execution_id": self.current_execution_id, + "timestamp": self._get_timestamp(), + "context_id": context_id, + "error_message": error_message, + "error_details": error_details or {} + } + + self.audit_logger.error(json.dumps(audit_entry)) + self.main_logger.error(f"Audit: Error in {context_id} - {error_message}") + + async def log_confidence_analysis( + self, + step_id: str, + function_name: str, + confidence_factors: Dict[str, Any] + ) -> None: + """ + Log detailed confidence analysis for debugging. + + Args: + step_id: ID of step being analyzed + function_name: Function that was executed + confidence_factors: Detailed confidence analysis factors + """ + audit_entry = { + "event": "confidence_analysis", + "execution_id": self.current_execution_id, + "timestamp": self._get_timestamp(), + "step_id": step_id, + "function_name": function_name, + "confidence_factors": confidence_factors + } + + self.audit_logger.debug(json.dumps(audit_entry)) + + async def log_memory_operation( + self, + operation: str, + key: str, + step_id: str, + details: Optional[Dict[str, Any]] = None + ) -> None: + """ + Log memory manager operations for debugging. + + Args: + operation: Type of memory operation (store, retrieve, etc.) + key: Memory key + step_id: Step performing the operation + details: Optional operation details + """ + audit_entry = { + "event": "memory_operation", + "execution_id": self.current_execution_id, + "timestamp": self._get_timestamp(), + "operation": operation, + "key": key, + "step_id": step_id, + "details": details or {} + } + + self.audit_logger.debug(json.dumps(audit_entry)) + + def _get_timestamp(self) -> str: + """Get ISO format timestamp.""" + return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + + def get_audit_summary(self, execution_id: str) -> Optional[Dict[str, Any]]: + """ + Get audit summary for an execution (placeholder for future implementation). + + Args: + execution_id: Execution ID to summarize + + Returns: + Audit summary or None if not available + """ + # In a full implementation, this would query stored audit logs + # For Phase 1, this is a placeholder + self.main_logger.debug(f"Audit summary requested for execution {execution_id}") + return None + + def set_log_level(self, level: str) -> None: + """ + Set audit logging level. + + Args: + level: Logging level (DEBUG, INFO, WARNING, ERROR) + """ + numeric_level = getattr(logging, level.upper(), logging.INFO) + self.audit_logger.setLevel(numeric_level) + self.main_logger.debug(f"Audit log level set to {level}") + + def flush_logs(self) -> None: + """Flush any buffered audit logs.""" + for handler in self.audit_logger.handlers: + if hasattr(handler, 'flush'): + handler.flush() \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/confidence.py b/trustgraph-flow/trustgraph/agent/confidence/confidence.py new file mode 100644 index 00000000..1664f6ea --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/confidence.py @@ -0,0 +1,312 @@ +""" +Confidence Evaluator Module + +Calculates confidence scores for execution results based on multiple factors +to ensure reliability and guide retry decisions. +""" + +import json +import logging +from typing import Dict, Any, Optional +from .types import ConfidenceMetrics + + +class ConfidenceEvaluator: + """ + Evaluates confidence scores for tool execution results. + + Uses multiple scoring factors: + - Graph query result size and consistency + - Entity extraction precision scores + - Vector search similarity thresholds + - LLM response coherence metrics + """ + + def __init__(self): + self.logger = logging.getLogger(__name__) + + # Default scoring weights + self.weights = { + "output_completeness": 0.3, + "structure_validity": 0.2, + "semantic_coherence": 0.2, + "result_size": 0.2, + "error_indicators": 0.1 + } + + def evaluate( + self, + function_name: str, + arguments: Dict[str, Any], + output: str, + execution_time_ms: int, + retry_count: int = 0 + ) -> ConfidenceMetrics: + """ + Evaluate confidence score for execution result. + + Args: + function_name: Name of the function/tool executed + arguments: Arguments passed to the function + output: Raw output from function execution + execution_time_ms: Time taken for execution + retry_count: Number of retries attempted + + Returns: + ConfidenceMetrics with score, reasoning, and retry count + """ + # Function-specific confidence evaluation + if function_name == "GraphQuery": + return self._evaluate_graph_query(output, arguments, execution_time_ms, retry_count) + elif function_name == "TextCompletion": + return self._evaluate_text_completion(output, arguments, execution_time_ms, retry_count) + elif function_name == "McpTool": + return self._evaluate_mcp_tool(output, arguments, execution_time_ms, retry_count) + elif function_name == "Prompt": + return self._evaluate_prompt(output, arguments, execution_time_ms, retry_count) + else: + return self._evaluate_generic(output, arguments, execution_time_ms, retry_count) + + def _evaluate_graph_query( + self, + output: str, + arguments: Dict[str, Any], + execution_time_ms: int, + retry_count: int + ) -> ConfidenceMetrics: + """Evaluate confidence for graph query results.""" + + score = 0.5 # Base score + reasons = [] + + try: + # Try to parse as JSON result + if output.strip(): + try: + result = json.loads(output) + if isinstance(result, list): + result_count = len(result) + + if result_count == 0: + score = 0.3 + reasons.append("Empty result set - may indicate query issues") + elif result_count > 0 and result_count < 1000: + score = 0.8 + min(0.15, result_count / 100 * 0.1) + reasons.append(f"Good result size: {result_count} items") + else: + score = 0.7 + reasons.append(f"Large result set: {result_count} items") + + elif isinstance(result, dict): + if result: + score = 0.85 + reasons.append("Structured result with data") + else: + score = 0.4 + reasons.append("Empty structured result") + else: + score = 0.75 + reasons.append("Valid structured response") + + except json.JSONDecodeError: + # Not JSON, treat as text response + if len(output.strip()) > 0: + score = 0.6 + reasons.append("Non-JSON response with content") + else: + score = 0.2 + reasons.append("Empty text response") + else: + score = 0.2 + reasons.append("No output returned") + + except Exception as e: + score = 0.3 + reasons.append(f"Error evaluating output: {str(e)}") + + # Adjust for execution time (very fast might indicate cached/empty result) + if execution_time_ms < 100: + score *= 0.9 + reasons.append("Very fast execution - possible empty result") + elif execution_time_ms > 30000: + score *= 0.85 + reasons.append("Slow execution - possible complexity issues") + + # Penalty for retries + score *= (0.9 ** retry_count) + if retry_count > 0: + reasons.append(f"Retry penalty applied ({retry_count} retries)") + + return ConfidenceMetrics( + score=max(0.0, min(1.0, score)), + reasoning="; ".join(reasons), + retry_count=retry_count + ) + + def _evaluate_text_completion( + self, + output: str, + arguments: Dict[str, Any], + execution_time_ms: int, + retry_count: int + ) -> ConfidenceMetrics: + """Evaluate confidence for text completion results.""" + + score = 0.5 + reasons = [] + + if not output or not output.strip(): + score = 0.1 + reasons.append("Empty response") + else: + text = output.strip() + + # Basic coherence checks + if len(text) < 10: + score = 0.4 + reasons.append("Very short response") + elif len(text) > 10000: + score = 0.6 + reasons.append("Very long response - may contain hallucinations") + else: + score = 0.75 + reasons.append("Reasonable response length") + + # Check for common error patterns + error_patterns = [ + "i don't know", + "i cannot", + "i'm unable to", + "error:", + "failed to", + "sorry, i" + ] + + text_lower = text.lower() + error_found = any(pattern in text_lower for pattern in error_patterns) + + if error_found: + score = 0.5 + reasons.append("Response indicates uncertainty or error") + else: + score += 0.1 + reasons.append("No obvious error indicators") + + # Check for structure (sentences, paragraphs) + sentences = text.count('.') + text.count('!') + text.count('?') + if sentences >= 2: + score += 0.05 + reasons.append("Well-structured response") + + # Execution time considerations + if execution_time_ms < 500: + score *= 0.95 + reasons.append("Very fast completion") + elif execution_time_ms > 45000: + score *= 0.9 + reasons.append("Slow completion") + + # Retry penalty + score *= (0.85 ** retry_count) + if retry_count > 0: + reasons.append(f"Retry penalty applied ({retry_count} retries)") + + return ConfidenceMetrics( + score=max(0.0, min(1.0, score)), + reasoning="; ".join(reasons), + retry_count=retry_count + ) + + def _evaluate_mcp_tool( + self, + output: str, + arguments: Dict[str, Any], + execution_time_ms: int, + retry_count: int + ) -> ConfidenceMetrics: + """Evaluate confidence for MCP tool results.""" + + score = 0.6 # Default for tools + reasons = [] + + if not output: + score = 0.3 + reasons.append("No tool output") + else: + try: + # Try to parse tool result + result = json.loads(output) + if isinstance(result, dict) and "error" in result: + score = 0.2 + reasons.append("Tool returned error") + elif isinstance(result, dict) and result.get("success", True): + score = 0.8 + reasons.append("Tool execution successful") + else: + score = 0.65 + reasons.append("Tool returned result") + except json.JSONDecodeError: + if "error" in output.lower() or "failed" in output.lower(): + score = 0.3 + reasons.append("Error indicated in output") + else: + score = 0.7 + reasons.append("Text output from tool") + + # Time-based adjustments + if execution_time_ms > 60000: + score *= 0.8 + reasons.append("Tool execution timeout risk") + + # Retry penalty + score *= (0.8 ** retry_count) + if retry_count > 0: + reasons.append(f"Retry penalty applied ({retry_count} retries)") + + return ConfidenceMetrics( + score=max(0.0, min(1.0, score)), + reasoning="; ".join(reasons), + retry_count=retry_count + ) + + def _evaluate_prompt( + self, + output: str, + arguments: Dict[str, Any], + execution_time_ms: int, + retry_count: int + ) -> ConfidenceMetrics: + """Evaluate confidence for prompt service results.""" + + # Similar to text completion but with prompt-specific logic + return self._evaluate_text_completion(output, arguments, execution_time_ms, retry_count) + + def _evaluate_generic( + self, + output: str, + arguments: Dict[str, Any], + execution_time_ms: int, + retry_count: int + ) -> ConfidenceMetrics: + """Generic confidence evaluation for unknown functions.""" + + score = 0.5 + reasons = ["Generic evaluation for unknown function"] + + if output and len(output.strip()) > 0: + score = 0.6 + reasons.append("Function returned output") + else: + score = 0.3 + reasons.append("No output from function") + + # Retry penalty + score *= (0.9 ** retry_count) + if retry_count > 0: + reasons.append(f"Retry penalty applied ({retry_count} retries)") + + return ConfidenceMetrics( + score=max(0.0, min(1.0, score)), + reasoning="; ".join(reasons), + retry_count=retry_count + ) \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/executor.py b/trustgraph-flow/trustgraph/agent/confidence/executor.py new file mode 100644 index 00000000..dbf3de19 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/executor.py @@ -0,0 +1,313 @@ +""" +Executor Module + +Handles individual plan step execution using registered tools. +Manages tool selection, error handling, and result transformation. +""" + +import json +import asyncio +import logging +import time +from typing import Dict, Any, Optional, TYPE_CHECKING + +from trustgraph.base import ( + TextCompletionClient, + GraphRagClient, + ToolClient, + PromptClient +) + +from .types import ExecutionStep, StepResult, ContextEntry +from .confidence import ConfidenceEvaluator + +if TYPE_CHECKING: + from .memory import MemoryManager + + +class StepExecutor: + """ + Executes individual execution steps using the appropriate tools. + + Tool Mapping: + - GraphQuery → GraphRagClient + - TextCompletion → TextCompletionClient + - McpTool → ToolClient + - Prompt → PromptClient + """ + + def __init__( + self, + text_completion_client: Optional[TextCompletionClient] = None, + graph_rag_client: Optional[GraphRagClient] = None, + tool_client: Optional[ToolClient] = None, + prompt_client: Optional[PromptClient] = None + ): + self.logger = logging.getLogger(__name__) + + # Tool clients (will be injected by service) + self.text_completion_client = text_completion_client + self.graph_rag_client = graph_rag_client + self.tool_client = tool_client + self.prompt_client = prompt_client + + # Confidence evaluator + self.confidence_evaluator = ConfidenceEvaluator() + + async def execute_step( + self, + step: ExecutionStep, + context: Dict[str, Any], + memory_manager: "MemoryManager" + ) -> StepResult: + """ + Execute a single step with the given context. + + Args: + step: The execution step to run + context: Context data from memory manager + memory_manager: Memory manager for storing results + + Returns: + StepResult with execution outcome and confidence + """ + start_time = time.time() + + try: + self.logger.info(f"Executing step '{step.id}' with function '{step.function}'") + + # Execute the step based on function type + output = await self._execute_function(step, context) + + execution_time_ms = int((time.time() - start_time) * 1000) + + # Evaluate confidence + confidence = self.confidence_evaluator.evaluate( + function_name=step.function, + arguments=step.arguments, + output=output, + execution_time_ms=execution_time_ms + ) + + # Create result + result = StepResult( + step_id=step.id, + success=True, + output=output, + confidence=confidence, + execution_time_ms=execution_time_ms + ) + + self.logger.info( + f"Step '{step.id}' completed successfully " + f"(confidence: {confidence.score:.2f}, time: {execution_time_ms}ms)" + ) + + return result + + except asyncio.TimeoutError: + execution_time_ms = int((time.time() - start_time) * 1000) + self.logger.error(f"Step '{step.id}' timed out after {execution_time_ms}ms") + + return StepResult( + step_id=step.id, + success=False, + output=f"Execution timed out after {execution_time_ms}ms", + confidence=self.confidence_evaluator._evaluate_generic( + "", step.arguments, execution_time_ms, 0 + ), + execution_time_ms=execution_time_ms + ) + + except Exception as e: + execution_time_ms = int((time.time() - start_time) * 1000) + error_msg = f"Step execution failed: {str(e)}" + self.logger.error(f"Step '{step.id}' failed: {error_msg}") + + return StepResult( + step_id=step.id, + success=False, + output=error_msg, + confidence=self.confidence_evaluator._evaluate_generic( + "", step.arguments, execution_time_ms, 0 + ), + execution_time_ms=execution_time_ms + ) + + async def _execute_function(self, step: ExecutionStep, context: Dict[str, Any]) -> str: + """ + Execute the specific function based on step type. + + Args: + step: Execution step + context: Available context data + + Returns: + Raw output from function execution + """ + function_name = step.function + args = step.arguments + timeout_seconds = step.timeout_ms / 1000.0 + + # Substitute context variables in arguments + resolved_args = self._resolve_arguments(args, context) + + if function_name == "GraphQuery": + return await self._execute_graph_query(resolved_args, timeout_seconds) + elif function_name == "TextCompletion": + return await self._execute_text_completion(resolved_args, timeout_seconds) + elif function_name == "McpTool": + return await self._execute_mcp_tool(resolved_args, timeout_seconds) + elif function_name == "Prompt": + return await self._execute_prompt(resolved_args, timeout_seconds) + else: + raise ValueError(f"Unknown function type: {function_name}") + + async def _execute_graph_query(self, args: Dict[str, Any], timeout: float) -> str: + """Execute graph query using GraphRagClient.""" + if not self.graph_rag_client: + raise RuntimeError("GraphRagClient not configured") + + query = args.get("query", "") + user = args.get("user", "trustgraph") + collection = args.get("collection", "default") + + result = await self.graph_rag_client.rag( + query=query, + user=user, + collection=collection, + timeout=timeout + ) + + # Convert result to JSON string for consistent handling + if isinstance(result, (dict, list)): + return json.dumps(result) + else: + return str(result) + + async def _execute_text_completion(self, args: Dict[str, Any], timeout: float) -> str: + """Execute text completion using TextCompletionClient.""" + if not self.text_completion_client: + raise RuntimeError("TextCompletionClient not configured") + + system = args.get("system", "") + prompt = args.get("prompt", "") + + result = await self.text_completion_client.text_completion( + system=system, + prompt=prompt, + timeout=timeout + ) + + return str(result) + + async def _execute_mcp_tool(self, args: Dict[str, Any], timeout: float) -> str: + """Execute MCP tool using ToolClient.""" + if not self.tool_client: + raise RuntimeError("ToolClient not configured") + + name = args.get("name", "") + parameters = args.get("parameters", {}) + + result = await self.tool_client.invoke( + name=name, + parameters=parameters, + timeout=timeout + ) + + # Convert result to string for consistent handling + if isinstance(result, (dict, list)): + return json.dumps(result) + else: + return str(result) + + async def _execute_prompt(self, args: Dict[str, Any], timeout: float) -> str: + """Execute prompt using PromptClient.""" + if not self.prompt_client: + raise RuntimeError("PromptClient not configured") + + # Note: This is a simplified implementation + # The actual prompt client interface may differ + prompt = args.get("prompt", "") + + # For now, delegate to text completion + # In practice, this would use the prompt service + if self.text_completion_client: + result = await self.text_completion_client.text_completion( + system="", + prompt=prompt, + timeout=timeout + ) + return str(result) + else: + raise RuntimeError("No text completion client for prompt execution") + + def _resolve_arguments(self, args: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """ + Resolve context variables in step arguments. + + Supports simple variable substitution like {{variable_name}}. + + Args: + args: Original arguments + context: Available context variables + + Returns: + Arguments with context variables resolved + """ + resolved = {} + + for key, value in args.items(): + if isinstance(value, str): + resolved[key] = self._substitute_variables(value, context) + else: + resolved[key] = value + + return resolved + + def _substitute_variables(self, text: str, context: Dict[str, Any]) -> str: + """ + Substitute {{variable}} patterns with context values. + + Args: + text: Text potentially containing variables + context: Context dictionary + + Returns: + Text with variables substituted + """ + import re + + def replace_var(match): + var_name = match.group(1) + return str(context.get(var_name, f"{{{{{var_name}}}}}")) # Keep original if not found + + # Replace {{variable}} patterns + return re.sub(r'\{\{(\w+)\}\}', replace_var, text) + + def set_clients( + self, + text_completion_client: Optional[TextCompletionClient] = None, + graph_rag_client: Optional[GraphRagClient] = None, + tool_client: Optional[ToolClient] = None, + prompt_client: Optional[PromptClient] = None + ) -> None: + """ + Set tool clients (used by service for dependency injection). + + Args: + text_completion_client: Text completion client + graph_rag_client: Graph RAG client + tool_client: MCP tool client + prompt_client: Prompt service client + """ + if text_completion_client: + self.text_completion_client = text_completion_client + if graph_rag_client: + self.graph_rag_client = graph_rag_client + if tool_client: + self.tool_client = tool_client + if prompt_client: + self.prompt_client = prompt_client + + self.logger.debug("Tool clients configured") \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/flow_controller.py b/trustgraph-flow/trustgraph/agent/confidence/flow_controller.py new file mode 100644 index 00000000..c885e2cf --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/flow_controller.py @@ -0,0 +1,383 @@ +""" +Flow Controller Module + +Orchestrates plan execution with confidence-based control flow. +Manages step dependencies, retry logic, and user overrides. +""" + +import asyncio +import logging +import time +from typing import List, Dict, Set, Optional, Callable, Any, TYPE_CHECKING + +from .types import ( + ExecutionPlan, ExecutionStep, StepResult, ExecutionStatus, + PlanExecution, AgentConfig, ConfidenceMetrics +) + +if TYPE_CHECKING: + from .memory import MemoryManager + from .executor import StepExecutor + from .audit import AuditLogger + + +class FlowControlError(Exception): + """Exception raised when flow control encounters unrecoverable errors.""" + pass + + +class FlowController: + """ + Orchestrates execution plan flow with confidence-based control. + + Key Capabilities: + - Step dependency resolution + - Confidence-based retry logic + - User override handling + - Graceful failure modes + """ + + def __init__(self, config: AgentConfig): + self.logger = logging.getLogger(__name__) + self.config = config + + # Callback for requesting user overrides (set by service) + self.override_callback: Optional[Callable[[str, StepResult], bool]] = None + + # Callback for progress updates (set by service) + self.progress_callback: Optional[Callable[[str, StepResult], None]] = None + + async def execute_plan( + self, + plan: ExecutionPlan, + memory_manager: "MemoryManager", + executor: "StepExecutor", + audit_logger: Optional["AuditLogger"] = None + ) -> PlanExecution: + """ + Execute a complete execution plan with confidence control. + + Args: + plan: Execution plan to run + memory_manager: Memory manager for context + executor: Step executor + audit_logger: Optional audit logger + + Returns: + PlanExecution with results and status + """ + start_time = time.time() + + # Initialize plan execution tracking + execution = PlanExecution( + plan_id=plan.id, + status=ExecutionStatus.IN_PROGRESS, + current_step=None, + completed_steps=[], + failed_steps=[], + results=[], + total_execution_time_ms=0 + ) + + try: + self.logger.info(f"Starting execution of plan '{plan.id}' with {len(plan.steps)} steps") + + if audit_logger: + await audit_logger.log_plan_start(plan, execution) + + # Build dependency graph + self._register_dependencies(plan, memory_manager) + + # Execute steps with dependency resolution + if self.config.parallel_execution: + await self._execute_parallel(plan, execution, memory_manager, executor, audit_logger) + else: + await self._execute_sequential(plan, execution, memory_manager, executor, audit_logger) + + # Final status determination + if execution.failed_steps: + execution.status = ExecutionStatus.FAILED + else: + execution.status = ExecutionStatus.COMPLETED + + execution.total_execution_time_ms = int((time.time() - start_time) * 1000) + + self.logger.info( + f"Plan '{plan.id}' execution completed: {execution.status.value} " + f"({len(execution.completed_steps)}/{len(plan.steps)} steps, " + f"{execution.total_execution_time_ms}ms)" + ) + + if audit_logger: + await audit_logger.log_plan_complete(plan, execution) + + return execution + + except Exception as e: + execution.status = ExecutionStatus.FAILED + execution.total_execution_time_ms = int((time.time() - start_time) * 1000) + + error_msg = f"Plan execution failed: {str(e)}" + self.logger.error(error_msg) + + if audit_logger: + await audit_logger.log_error(plan.id, error_msg) + + raise FlowControlError(error_msg) + + async def _execute_sequential( + self, + plan: ExecutionPlan, + execution: PlanExecution, + memory_manager: "MemoryManager", + executor: "StepExecutor", + audit_logger: Optional["AuditLogger"] + ) -> None: + """Execute steps sequentially with dependency resolution.""" + + remaining_steps = set(step.id for step in plan.steps) + + while remaining_steps: + # Get steps ready to execute (dependencies satisfied) + ready_steps = memory_manager.get_ready_steps( + [step for step in plan.steps if step.id in remaining_steps], + set(execution.completed_steps) + ) + + if not ready_steps: + # Check if we're stuck (circular dependencies or failed dependencies) + if execution.failed_steps: + self.logger.warning("Cannot proceed - some steps failed and others depend on them") + break + else: + raise FlowControlError("Circular dependency detected - no steps can proceed") + + # Execute ready steps one by one + for step in ready_steps: + await self._execute_single_step(step, execution, memory_manager, executor, audit_logger) + remaining_steps.discard(step.id) + + async def _execute_parallel( + self, + plan: ExecutionPlan, + execution: PlanExecution, + memory_manager: "MemoryManager", + executor: "StepExecutor", + audit_logger: Optional["AuditLogger"] + ) -> None: + """Execute steps in parallel where possible (Phase 2 feature).""" + # For now, fall back to sequential - parallel execution is Phase 2 + await self._execute_sequential(plan, execution, memory_manager, executor, audit_logger) + + async def _execute_single_step( + self, + step: ExecutionStep, + execution: PlanExecution, + memory_manager: "MemoryManager", + executor: "StepExecutor", + audit_logger: Optional["AuditLogger"] + ) -> None: + """ + Execute a single step with retry logic and confidence control. + + Args: + step: Step to execute + execution: Plan execution state + memory_manager: Memory manager + executor: Step executor + audit_logger: Optional audit logger + """ + execution.current_step = step.id + retry_count = 0 + + while retry_count <= self.config.max_retries: + try: + self.logger.info(f"Executing step '{step.id}' (attempt {retry_count + 1})") + + if audit_logger: + await audit_logger.log_step_start(step, retry_count) + + # Get context for this step + context = memory_manager.get_context_for_step(step) + + # Execute the step + result = await executor.execute_step(step, context, memory_manager) + + # Update confidence with retry count + result.confidence.retry_count = retry_count + + # Store result + execution.results.append(result) + memory_manager.store_step_result(result) + + if audit_logger: + await audit_logger.log_step_complete(step, result) + + # Check confidence against threshold + confidence_ok = result.confidence.score >= step.confidence_threshold + + if result.success and confidence_ok: + # Step succeeded with good confidence + execution.completed_steps.append(step.id) + self.logger.info( + f"Step '{step.id}' completed successfully " + f"(confidence: {result.confidence.score:.3f})" + ) + + # Notify progress callback + if self.progress_callback: + try: + self.progress_callback("step_success", result) + except Exception as e: + self.logger.warning(f"Progress callback error: {e}") + + break + + elif result.success and not confidence_ok: + # Step succeeded but confidence too low + self.logger.warning( + f"Step '{step.id}' succeeded but confidence too low " + f"({result.confidence.score:.3f} < {step.confidence_threshold})" + ) + + if retry_count < self.config.max_retries: + # Retry with backoff + backoff_delay = self.config.retry_backoff_factor ** retry_count + self.logger.info(f"Retrying step '{step.id}' after {backoff_delay}s backoff") + await asyncio.sleep(backoff_delay) + retry_count += 1 + continue + else: + # Max retries reached - check for user override + if await self._handle_low_confidence(step, result): + execution.completed_steps.append(step.id) + self.logger.info(f"Step '{step.id}' accepted via user override") + break + else: + execution.failed_steps.append(step.id) + self.logger.error(f"Step '{step.id}' failed - low confidence after retries") + break + + else: + # Step execution failed + self.logger.warning(f"Step '{step.id}' execution failed: {result.output}") + + if retry_count < self.config.max_retries: + # Retry with backoff + backoff_delay = self.config.retry_backoff_factor ** retry_count + self.logger.info(f"Retrying failed step '{step.id}' after {backoff_delay}s backoff") + await asyncio.sleep(backoff_delay) + retry_count += 1 + continue + else: + # Max retries reached + execution.failed_steps.append(step.id) + self.logger.error(f"Step '{step.id}' failed after {retry_count + 1} attempts") + break + + except Exception as e: + error_msg = f"Unexpected error executing step '{step.id}': {str(e)}" + self.logger.error(error_msg) + + if audit_logger: + await audit_logger.log_error(step.id, error_msg) + + if retry_count < self.config.max_retries: + backoff_delay = self.config.retry_backoff_factor ** retry_count + self.logger.info(f"Retrying step '{step.id}' after error, {backoff_delay}s backoff") + await asyncio.sleep(backoff_delay) + retry_count += 1 + continue + else: + execution.failed_steps.append(step.id) + break + + execution.current_step = None + + async def _handle_low_confidence(self, step: ExecutionStep, result: StepResult) -> bool: + """ + Handle low confidence scenario - potentially request user override. + + Args: + step: Step that had low confidence + result: Step result with low confidence + + Returns: + True if step should be accepted anyway, False to fail + """ + if not self.config.override_enabled: + return False + + if not self.override_callback: + self.logger.warning("No override callback configured - cannot request user override") + return False + + try: + # Request user override via callback + override_granted = self.override_callback( + f"Step '{step.id}' has low confidence ({result.confidence.score:.3f} < " + f"{step.confidence_threshold}). Reasoning: {result.confidence.reasoning}. " + f"Accept anyway?", + result + ) + + if override_granted: + self.logger.info(f"User override granted for step '{step.id}'") + return True + else: + self.logger.info(f"User override denied for step '{step.id}'") + return False + + except Exception as e: + self.logger.error(f"Error requesting user override: {e}") + return False + + def _register_dependencies(self, plan: ExecutionPlan, memory_manager: "MemoryManager") -> None: + """Register step dependencies with memory manager.""" + for step in plan.steps: + for dep_step_id in step.dependencies: + memory_manager.register_dependency(step.id, dep_step_id) + + def set_override_callback(self, callback: Callable[[str, StepResult], bool]) -> None: + """Set callback for requesting user overrides.""" + self.override_callback = callback + self.logger.debug("Override callback configured") + + def set_progress_callback(self, callback: Callable[[str, StepResult], None]) -> None: + """Set callback for progress updates.""" + self.progress_callback = callback + self.logger.debug("Progress callback configured") + + def get_execution_stats(self, execution: PlanExecution) -> Dict[str, Any]: + """ + Get execution statistics. + + Args: + execution: Plan execution + + Returns: + Dictionary with execution statistics + """ + total_steps = len(execution.results) + successful_steps = sum(1 for r in execution.results if r.success) + high_confidence_steps = sum( + 1 for r in execution.results + if r.success and r.confidence.score >= 0.8 + ) + + avg_confidence = 0.0 + if execution.results: + avg_confidence = sum(r.confidence.score for r in execution.results) / len(execution.results) + + total_retries = sum(r.confidence.retry_count for r in execution.results) + + return { + "total_steps": total_steps, + "successful_steps": successful_steps, + "failed_steps": len(execution.failed_steps), + "high_confidence_steps": high_confidence_steps, + "average_confidence": avg_confidence, + "total_retries": total_retries, + "execution_time_ms": execution.total_execution_time_ms, + "status": execution.status.value + } \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/memory.py b/trustgraph-flow/trustgraph/agent/confidence/memory.py new file mode 100644 index 00000000..7d8740d4 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/memory.py @@ -0,0 +1,282 @@ +""" +Memory Manager Module + +Handles inter-step data flow and context preservation for the confidence agent. +Manages execution context, result caching, and dependency resolution. +""" + +import json +import time +import logging +from typing import Dict, Any, Optional, List, Set +from .types import ContextEntry, ExecutionStep, StepResult + + +class MemoryManager: + """ + Manages execution context and inter-step data flow. + + Responsibilities: + - Store and retrieve execution context between steps + - Manage step dependencies and result passing + - Handle context window management + - Provide result caching with TTL + """ + + def __init__(self, max_context_size: int = 8192, cache_ttl_seconds: int = 300): + self.logger = logging.getLogger(__name__) + self.max_context_size = max_context_size + self.cache_ttl_seconds = cache_ttl_seconds + + # In-memory storage for Phase 1 (could be Redis/external in Phase 2) + self._context: Dict[str, ContextEntry] = {} + self._step_results: Dict[str, StepResult] = {} + self._dependency_graph: Dict[str, Set[str]] = {} # step_id -> dependent_step_ids + + def store_context(self, key: str, value: Any, step_id: str, ttl_seconds: Optional[int] = None) -> None: + """ + Store a context entry. + + Args: + key: Context key + value: Value to store + step_id: ID of step that created this entry + ttl_seconds: Time to live (defaults to cache_ttl_seconds) + """ + ttl = ttl_seconds or self.cache_ttl_seconds + + entry = ContextEntry( + key=key, + value=value, + step_id=step_id, + timestamp=int(time.time()), + ttl_seconds=ttl + ) + + self._context[key] = entry + self.logger.debug(f"Stored context key '{key}' from step '{step_id}'") + + # Clean up expired entries + self._cleanup_expired() + + # Manage context size + self._manage_context_size() + + def get_context(self, key: str) -> Optional[Any]: + """ + Retrieve a context value. + + Args: + key: Context key to retrieve + + Returns: + Context value or None if not found/expired + """ + entry = self._context.get(key) + if not entry: + return None + + # Check if expired + if self._is_expired(entry): + del self._context[key] + return None + + return entry.value + + def get_context_for_step(self, step: ExecutionStep) -> Dict[str, Any]: + """ + Get all relevant context for a step based on its dependencies. + + Args: + step: Execution step needing context + + Returns: + Dictionary of relevant context entries + """ + context = {} + + # Include results from dependency steps + for dep_step_id in step.dependencies: + result = self._step_results.get(dep_step_id) + if result and result.success: + context[f"step_{dep_step_id}_output"] = result.output + context[f"step_{dep_step_id}_confidence"] = result.confidence.score + + # Include global context entries (filter by relevance if needed) + for key, entry in self._context.items(): + if not self._is_expired(entry): + context[key] = entry.value + + self.logger.debug(f"Retrieved context for step '{step.id}': {len(context)} entries") + return context + + def store_step_result(self, result: StepResult) -> None: + """ + Store result from step execution. + + Args: + result: Step execution result + """ + self._step_results[result.step_id] = result + + # Store key outputs in context for easy access + if result.success: + self.store_context( + key=f"result_{result.step_id}", + value=result.output, + step_id=result.step_id + ) + + self.logger.debug(f"Stored result for step '{result.step_id}' (success: {result.success})") + + def get_step_result(self, step_id: str) -> Optional[StepResult]: + """ + Get stored result for a step. + + Args: + step_id: ID of step + + Returns: + StepResult or None if not found + """ + return self._step_results.get(step_id) + + def register_dependency(self, step_id: str, depends_on: str) -> None: + """ + Register a dependency relationship between steps. + + Args: + step_id: Step that depends on another + depends_on: Step that must complete first + """ + if depends_on not in self._dependency_graph: + self._dependency_graph[depends_on] = set() + + self._dependency_graph[depends_on].add(step_id) + + def get_ready_steps(self, all_steps: List[ExecutionStep], completed_steps: Set[str]) -> List[ExecutionStep]: + """ + Get steps that are ready to execute (all dependencies completed). + + Args: + all_steps: All steps in the plan + completed_steps: Set of completed step IDs + + Returns: + List of steps ready for execution + """ + ready = [] + + for step in all_steps: + if step.id in completed_steps: + continue + + # Check if all dependencies are completed + deps_completed = all(dep_id in completed_steps for dep_id in step.dependencies) + + if deps_completed: + ready.append(step) + + return ready + + def serialize_context(self) -> str: + """ + Serialize current context for debugging/audit. + + Returns: + JSON string of current context + """ + serializable = {} + + for key, entry in self._context.items(): + if not self._is_expired(entry): + # Convert complex objects to strings for serialization + try: + value = entry.value + if not isinstance(value, (str, int, float, bool, list, dict)): + value = str(value) + + serializable[key] = { + "value": value, + "step_id": entry.step_id, + "timestamp": entry.timestamp + } + except Exception as e: + self.logger.warning(f"Could not serialize context key '{key}': {e}") + + return json.dumps(serializable, indent=2) + + def clear_context(self) -> None: + """Clear all stored context (for cleanup between requests).""" + self._context.clear() + self._step_results.clear() + self._dependency_graph.clear() + self.logger.debug("Cleared all context") + + def get_memory_usage(self) -> Dict[str, int]: + """ + Get memory usage statistics. + + Returns: + Dictionary with usage statistics + """ + return { + "context_entries": len(self._context), + "step_results": len(self._step_results), + "dependencies": sum(len(deps) for deps in self._dependency_graph.values()), + "estimated_size_bytes": self._estimate_memory_size() + } + + def _is_expired(self, entry: ContextEntry) -> bool: + """Check if a context entry has expired.""" + if entry.ttl_seconds is None: + return False + + age_seconds = int(time.time()) - entry.timestamp + return age_seconds > entry.ttl_seconds + + def _cleanup_expired(self) -> None: + """Remove expired context entries.""" + expired_keys = [ + key for key, entry in self._context.items() + if self._is_expired(entry) + ] + + for key in expired_keys: + del self._context[key] + + if expired_keys: + self.logger.debug(f"Cleaned up {len(expired_keys)} expired context entries") + + def _manage_context_size(self) -> None: + """Manage context size by removing oldest entries if needed.""" + current_size = self._estimate_memory_size() + + if current_size > self.max_context_size: + # Remove oldest entries first + sorted_entries = sorted( + self._context.items(), + key=lambda x: x[1].timestamp + ) + + removed_count = 0 + for key, entry in sorted_entries: + del self._context[key] + removed_count += 1 + + # Check size again + if self._estimate_memory_size() <= self.max_context_size * 0.8: + break + + self.logger.debug(f"Removed {removed_count} context entries to manage size") + + def _estimate_memory_size(self) -> int: + """Rough estimate of memory usage in bytes.""" + total = 0 + + for key, entry in self._context.items(): + total += len(key) * 2 # Unicode chars + total += len(str(entry.value)) * 2 # Rough estimate + total += 100 # Overhead + + return total \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/planner.py b/trustgraph-flow/trustgraph/agent/confidence/planner.py new file mode 100644 index 00000000..4c36292b --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/planner.py @@ -0,0 +1,337 @@ +""" +Planner Module + +Generates structured execution plans from user requests using an LLM. +Creates confidence-scored step sequences with appropriate tool combinations. +""" + +import uuid +import json +import logging +from typing import Dict, Any, List, Optional, TYPE_CHECKING + +from trustgraph.base import TextCompletionClient +from .types import ExecutionPlan, ExecutionStep, AgentConfig + +if TYPE_CHECKING: + pass + + +class PlanningError(Exception): + """Exception raised when plan generation fails.""" + pass + + +class ExecutionPlanner: + """ + Generates structured execution plans from natural language requests. + + Key Responsibilities: + - Parse user requests into structured plans + - Assign confidence thresholds based on operation criticality + - Determine step dependencies + - Select appropriate tool combinations + """ + + def __init__(self, text_completion_client: Optional[TextCompletionClient] = None): + self.logger = logging.getLogger(__name__) + self.text_completion_client = text_completion_client + + # Planning prompt template + self.planning_prompt = """You are an AI planning assistant that converts user questions into structured execution plans. + +Available tools and their capabilities: +- GraphQuery: Query knowledge graphs using natural language. Returns structured data about entities and relationships. +- TextCompletion: Generate text, analyze content, or answer questions using LLM capabilities. +- McpTool: Execute external tools and APIs for specialized tasks. +- Prompt: Use pre-defined prompts for specific analysis tasks. + +Given a user question, create an execution plan with the following structure: + +{ + "steps": [ + { + "id": "step-1", + "function": "GraphQuery|TextCompletion|McpTool|Prompt", + "arguments": {"key": "value", "key2": "{{context_variable}}"}, + "dependencies": ["step-id1", "step-id2"], + "confidence_threshold": 0.7, + "timeout_ms": 30000, + "reasoning": "Why this step is needed and what it accomplishes" + } + ], + "context": { + "user_question": "Original question", + "approach": "High-level approach description" + } +} + +Important guidelines: +1. Use GraphQuery for factual information retrieval from knowledge graphs +2. Use TextCompletion for analysis, summarization, and text generation +3. Break complex tasks into logical steps with clear dependencies +4. Set higher confidence thresholds (0.8-0.9) for critical operations +5. Use context variables like {{step_1_output}} to pass data between steps +6. Keep steps focused and atomic - each step should do one thing well +7. Include reasoning for each step to explain its purpose + +User Question: {question} + +Generate the execution plan as JSON:""" + + async def generate_plan( + self, + question: str, + config: AgentConfig, + additional_context: Optional[Dict[str, Any]] = None + ) -> ExecutionPlan: + """ + Generate an execution plan from a user question. + + Args: + question: User's question or request + config: Agent configuration with thresholds + additional_context: Optional additional context + + Returns: + ExecutionPlan with structured steps + + Raises: + PlanningError: If plan generation fails + """ + if not self.text_completion_client: + raise PlanningError("TextCompletionClient not configured") + + try: + self.logger.info(f"Generating execution plan for question: {question[:100]}...") + + # Format the planning prompt + prompt = self.planning_prompt.format(question=question) + + # Get plan from LLM + response = await self.text_completion_client.text_completion( + system="You are a precise AI planning assistant. Always respond with valid JSON only.", + prompt=prompt, + timeout=30 + ) + + # Parse the response + plan_data = self._parse_plan_response(response) + + # Create ExecutionPlan object + plan = self._create_execution_plan(plan_data, config) + + self.logger.info(f"Generated plan '{plan.id}' with {len(plan.steps)} steps") + return plan + + except Exception as e: + error_msg = f"Failed to generate execution plan: {str(e)}" + self.logger.error(error_msg) + raise PlanningError(error_msg) + + def _parse_plan_response(self, response: str) -> Dict[str, Any]: + """ + Parse LLM response into plan data structure. + + Args: + response: Raw response from LLM + + Returns: + Parsed plan data + + Raises: + PlanningError: If parsing fails + """ + try: + # Try to extract JSON from response + response = response.strip() + + # Handle cases where LLM adds extra text + if not response.startswith('{'): + # Look for JSON block + start = response.find('{') + end = response.rfind('}') + 1 + if start != -1 and end > start: + response = response[start:end] + else: + raise ValueError("No JSON found in response") + + plan_data = json.loads(response) + + # Validate required fields + if "steps" not in plan_data: + raise ValueError("Missing 'steps' field in plan") + + if not isinstance(plan_data["steps"], list): + raise ValueError("'steps' must be a list") + + return plan_data + + except json.JSONDecodeError as e: + raise PlanningError(f"Invalid JSON in plan response: {e}") + except ValueError as e: + raise PlanningError(f"Invalid plan structure: {e}") + + def _create_execution_plan( + self, + plan_data: Dict[str, Any], + config: AgentConfig + ) -> ExecutionPlan: + """ + Create ExecutionPlan object from parsed plan data. + + Args: + plan_data: Parsed plan data from LLM + config: Agent configuration + + Returns: + ExecutionPlan object + """ + plan_id = str(uuid.uuid4()) + steps = [] + + for step_data in plan_data["steps"]: + step = self._create_execution_step(step_data, config) + steps.append(step) + + # Get context, defaulting to empty dict + context = plan_data.get("context", {}) + + return ExecutionPlan( + id=plan_id, + steps=steps, + context=context + ) + + def _create_execution_step( + self, + step_data: Dict[str, Any], + config: AgentConfig + ) -> ExecutionStep: + """ + Create ExecutionStep object from step data. + + Args: + step_data: Step data from plan + config: Agent configuration + + Returns: + ExecutionStep object + """ + # Required fields + step_id = step_data.get("id", str(uuid.uuid4())) + function = step_data.get("function", "") + arguments = step_data.get("arguments", {}) + + # Optional fields with defaults + dependencies = step_data.get("dependencies", []) + + # Get confidence threshold - use function-specific or default + confidence_threshold = step_data.get("confidence_threshold") + if confidence_threshold is None: + confidence_threshold = config.tool_thresholds.get( + function, + config.default_confidence_threshold + ) + + timeout_ms = step_data.get("timeout_ms", config.step_timeout_ms) + + return ExecutionStep( + id=step_id, + function=function, + arguments=arguments, + dependencies=dependencies, + confidence_threshold=confidence_threshold, + timeout_ms=timeout_ms + ) + + def create_simple_plan( + self, + function: str, + arguments: Dict[str, Any], + config: AgentConfig + ) -> ExecutionPlan: + """ + Create a simple single-step plan programmatically. + + Useful for testing or simple operations. + + Args: + function: Function name to execute + arguments: Function arguments + config: Agent configuration + + Returns: + ExecutionPlan with single step + """ + step = ExecutionStep( + id="step-1", + function=function, + arguments=arguments, + dependencies=[], + confidence_threshold=config.tool_thresholds.get( + function, config.default_confidence_threshold + ), + timeout_ms=config.step_timeout_ms + ) + + plan = ExecutionPlan( + id=str(uuid.uuid4()), + steps=[step], + context={ + "approach": f"Single {function} execution", + "generated_by": "create_simple_plan" + } + ) + + self.logger.debug(f"Created simple plan for {function}") + return plan + + def validate_plan(self, plan: ExecutionPlan) -> List[str]: + """ + Validate an execution plan for common issues. + + Args: + plan: Plan to validate + + Returns: + List of validation error messages (empty if valid) + """ + errors = [] + + if not plan.steps: + errors.append("Plan has no steps") + return errors + + step_ids = {step.id for step in plan.steps} + + for step in plan.steps: + # Check step has required fields + if not step.id: + errors.append("Step missing ID") + if not step.function: + errors.append(f"Step '{step.id}' missing function") + + # Check dependencies exist + for dep_id in step.dependencies: + if dep_id not in step_ids: + errors.append(f"Step '{step.id}' depends on non-existent step '{dep_id}'") + + # Check for circular dependencies (basic check) + if step.id in step.dependencies: + errors.append(f"Step '{step.id}' has circular dependency on itself") + + # Validate confidence threshold + if not (0.0 <= step.confidence_threshold <= 1.0): + errors.append(f"Step '{step.id}' has invalid confidence threshold: {step.confidence_threshold}") + + # Validate timeout + if step.timeout_ms <= 0: + errors.append(f"Step '{step.id}' has invalid timeout: {step.timeout_ms}") + + return errors + + def set_text_completion_client(self, client: TextCompletionClient) -> None: + """Set the text completion client (dependency injection).""" + self.text_completion_client = client + self.logger.debug("TextCompletionClient configured for planner") \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/service.py b/trustgraph-flow/trustgraph/agent/confidence/service.py new file mode 100644 index 00000000..2b7ed068 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/service.py @@ -0,0 +1,373 @@ +""" +Confidence Agent Service + +Main service class that coordinates all confidence agent components +and handles request/response flow through the Pulsar message bus. +""" + +import json +import logging +import functools +from typing import Dict, Any, Optional, Callable + +from trustgraph.base import ( + AgentService, + TextCompletionClientSpec, + GraphRagClientSpec, + ToolClientSpec, + PromptClientSpec +) +from trustgraph.schema import AgentRequest, AgentResponse, Error + +from .types import AgentConfig +from .planner import ExecutionPlanner, PlanningError +from .flow_controller import FlowController, FlowControlError +from .memory import MemoryManager +from .executor import StepExecutor +from .audit import AuditLogger + +# Module logger +logger = logging.getLogger(__name__) + +default_ident = "confidence-agent" +default_max_iterations = 15 + + +class ConfidenceAgentService(AgentService): + """ + Main service class for the confidence-based agent. + + Service Workflow: + 1. Generate execution plan via Planner Module + 2. Execute plan with confidence control via Flow Controller + 3. Generate response with confidence metrics and audit trail + + Uses existing AgentRequest and AgentResponse schemas for compatibility. + """ + + def __init__(self, **params): + id = params.get("id", default_ident) + + # Extract configuration parameters + self.max_iterations = int(params.get("max_iterations", default_max_iterations)) + self.confidence_threshold = float(params.get("confidence_threshold", 0.75)) + self.max_retries = int(params.get("max_retries", 3)) + self.retry_backoff_factor = float(params.get("retry_backoff_factor", 2.0)) + self.override_enabled = bool(params.get("override_enabled", True)) + self.step_timeout_ms = int(params.get("step_timeout_ms", 30000)) + self.parallel_execution = bool(params.get("parallel_execution", False)) + + # Tool-specific thresholds + self.graph_query_threshold = float(params.get("graph_query_threshold", 0.8)) + self.text_completion_threshold = float(params.get("text_completion_threshold", 0.7)) + self.mcp_tool_threshold = float(params.get("mcp_tool_threshold", 0.6)) + + # Create agent configuration + self.config = AgentConfig( + default_confidence_threshold=self.confidence_threshold, + max_retries=self.max_retries, + retry_backoff_factor=self.retry_backoff_factor, + override_enabled=self.override_enabled, + step_timeout_ms=self.step_timeout_ms, + parallel_execution=self.parallel_execution, + max_iterations=self.max_iterations, + tool_thresholds={ + "GraphQuery": self.graph_query_threshold, + "TextCompletion": self.text_completion_threshold, + "McpTool": self.mcp_tool_threshold, + "Prompt": self.text_completion_threshold + } + ) + + super(ConfidenceAgentService, self).__init__(**params | {"id": id}) + + # Initialize core modules + self.memory_manager = MemoryManager() + self.executor = StepExecutor() + self.planner = ExecutionPlanner() + self.flow_controller = FlowController(self.config) + self.audit_logger = AuditLogger() + + # Set up flow controller callbacks + self.flow_controller.set_override_callback(self._request_user_override) + self.flow_controller.set_progress_callback(self._handle_progress_update) + + # Track current request for callbacks + self._current_respond_callback: Optional[Callable] = None + + # Register client specifications + self.register_specification( + TextCompletionClientSpec( + request_name="text-completion-request", + response_name="text-completion-response", + ) + ) + + self.register_specification( + GraphRagClientSpec( + request_name="graph-rag-request", + response_name="graph-rag-response", + ) + ) + + self.register_specification( + ToolClientSpec( + request_name="mcp-tool-request", + response_name="mcp-tool-response", + ) + ) + + self.register_specification( + PromptClientSpec( + request_name="prompt-request", + response_name="prompt-response", + ) + ) + + logger.info(f"ConfidenceAgentService initialized with config: {self.config.__dict__}") + + async def agent_request(self, request, respond, next, flow): + """ + Process an agent request with confidence-based execution. + + Args: + request: AgentRequest message + respond: Response callback function + next: Next request callback (for chaining) + flow: Flow context + """ + self._current_respond_callback = respond + + try: + logger.info(f"Processing confidence agent request: {request.question[:100]}...") + + # Parse plan overrides from request if provided + plan_overrides = self._parse_plan_overrides(request.plan) + if plan_overrides: + logger.debug(f"Plan overrides: {plan_overrides}") + + # Clear previous context + self.memory_manager.clear_context() + + # Set up tool clients + self._configure_tool_clients(flow) + + # Send planning thought + await self._send_thought(respond, "Creating execution plan with confidence thresholds") + + # Generate execution plan + planner = self.planner + try: + plan = await planner.generate_plan( + request.question, + self.config, + additional_context=plan_overrides + ) + + # Validate plan + validation_errors = planner.validate_plan(plan) + if validation_errors: + error_msg = f"Plan validation failed: {'; '.join(validation_errors)}" + logger.error(error_msg) + await self._send_error(respond, error_msg) + return + + await self._send_observation( + respond, + f"Plan generated: {len(plan.steps)} steps with confidence thresholds" + ) + + except PlanningError as e: + error_msg = f"Plan generation failed: {str(e)}" + logger.error(error_msg) + await self._send_error(respond, error_msg) + return + + # Execute the plan + try: + await self._send_thought(respond, "Executing plan with confidence-based control") + + execution = await self.flow_controller.execute_plan( + plan, + self.memory_manager, + self.executor, + self.audit_logger + ) + + # Generate final response based on execution results + final_answer = self._generate_final_answer(plan, execution, request.question) + + # Send final response + await self._send_final_answer(respond, final_answer, execution) + + except FlowControlError as e: + error_msg = f"Plan execution failed: {str(e)}" + logger.error(error_msg) + await self._send_error(respond, error_msg) + return + + except Exception as e: + error_msg = f"Unexpected error in confidence agent: {str(e)}" + logger.error(error_msg, exc_info=True) + await self._send_error(respond, error_msg) + + finally: + self._current_respond_callback = None + + def _configure_tool_clients(self, flow): + """Configure tool clients from flow context.""" + + # Get clients from flow context + text_completion_client = getattr(flow, 'text_completion_client', None) + graph_rag_client = getattr(flow, 'graph_rag_client', None) + tool_client = getattr(flow, 'tool_client', None) + prompt_client = getattr(flow, 'prompt_client', None) + + # Configure executor + self.executor.set_clients( + text_completion_client=text_completion_client, + graph_rag_client=graph_rag_client, + tool_client=tool_client, + prompt_client=prompt_client + ) + + # Configure planner + if text_completion_client: + self.planner.set_text_completion_client(text_completion_client) + + def _parse_plan_overrides(self, plan_str: str) -> Optional[Dict[str, Any]]: + """Parse plan overrides from request.plan field.""" + if not plan_str or not plan_str.strip(): + return None + + try: + return json.loads(plan_str) + except json.JSONDecodeError: + logger.warning(f"Invalid JSON in plan field: {plan_str}") + return None + + def _generate_final_answer(self, plan, execution, original_question: str) -> str: + """Generate final answer from execution results.""" + + if not execution.results: + return "No results were generated." + + # Get the last successful result as the primary answer + successful_results = [r for r in execution.results if r.success] + + if not successful_results: + return "All execution steps failed. Please try rephrasing your question." + + # Use the output from the last successful step + primary_result = successful_results[-1] + answer = primary_result.output + + # If the result looks like JSON, try to make it more readable + try: + parsed = json.loads(answer) + if isinstance(parsed, list) and len(parsed) > 0: + if len(parsed) == 1: + answer = f"Found 1 result: {json.dumps(parsed[0], indent=2)}" + else: + answer = f"Found {len(parsed)} results:\n" + "\n".join([ + f"{i+1}. {json.dumps(item, indent=2)}" + for i, item in enumerate(parsed[:3]) + ]) + if len(parsed) > 3: + answer += f"\n... and {len(parsed) - 3} more results" + elif isinstance(parsed, dict): + answer = json.dumps(parsed, indent=2) + except json.JSONDecodeError: + # Keep original answer if not JSON + pass + + return answer + + async def _send_thought(self, respond, thought: str): + """Send a thought response.""" + response = AgentResponse( + answer="", + error=None, + thought=thought, + observation="" + ) + await respond(response) + + async def _send_observation(self, respond, observation: str): + """Send an observation response.""" + response = AgentResponse( + answer="", + error=None, + thought="", + observation=observation + ) + await respond(response) + + async def _send_final_answer(self, respond, answer: str, execution): + """Send the final answer with confidence information.""" + + # Calculate overall confidence + avg_confidence = 0.0 + if execution.results: + avg_confidence = sum(r.confidence.score for r in execution.results) / len(execution.results) + + # Create final thought with confidence summary + final_thought = ( + f"Analysis complete with average confidence {avg_confidence:.2f} " + f"({len(execution.completed_steps)}/{len(execution.results)} steps successful)" + ) + + response = AgentResponse( + answer=answer, + error=None, + thought=final_thought, + observation="" + ) + await respond(response) + + async def _send_error(self, respond, error_message: str): + """Send an error response.""" + response = AgentResponse( + answer="", + error=Error(message=error_message), + thought="", + observation="" + ) + await respond(response) + + def _request_user_override(self, message: str, result) -> bool: + """ + Request user override for low confidence results. + + For Phase 1, this automatically denies overrides. + In Phase 2, this could integrate with UI for user input. + """ + logger.info(f"User override requested: {message}") + + # For Phase 1, automatically deny overrides + # In Phase 2, this could send a special response asking for user input + return False + + async def _handle_progress_update(self, event_type: str, result): + """Handle progress updates from flow controller.""" + if not self._current_respond_callback: + return + + try: + if event_type == "step_success": + observation = ( + f"Step '{result.step_id}' completed successfully " + f"(confidence: {result.confidence.score:.2f})" + ) + await self._send_observation(self._current_respond_callback, observation) + elif event_type == "step_retry": + thought = f"Retrying step '{result.step_id}' - confidence too low" + await self._send_thought(self._current_respond_callback, thought) + except Exception as e: + logger.warning(f"Error sending progress update: {e}") + + +# Create the main processor class for service registration +class Processor(ConfidenceAgentService): + """Main processor class for service registration.""" + pass \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/agent/confidence/types.py b/trustgraph-flow/trustgraph/agent/confidence/types.py new file mode 100644 index 00000000..06564653 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/confidence/types.py @@ -0,0 +1,110 @@ +""" +Internal type definitions for the confidence-based agent. + +These types are used internally between confidence agent modules for +structured data flow, execution state, and metrics tracking. +""" + +from dataclasses import dataclass +from typing import Dict, List, Optional, Any +from enum import Enum + + +@dataclass +class ConfidenceMetrics: + """Confidence evaluation metrics for execution results.""" + + score: float # Confidence score (0.0 to 1.0) + reasoning: str # Explanation of score calculation + retry_count: int # Number of retries attempted + + +@dataclass +class ExecutionStep: + """Individual step in an execution plan.""" + + id: str # Unique step identifier + function: str # Tool/function to execute + arguments: Dict[str, Any] # Arguments for the function + dependencies: List[str] # IDs of prerequisite steps + confidence_threshold: float # Minimum acceptable confidence + timeout_ms: int # Execution timeout + + +@dataclass +class ExecutionPlan: + """Complete execution plan with ordered steps.""" + + id: str # Plan identifier + steps: List[ExecutionStep] # Ordered execution steps + context: Dict[str, Any] # Global context for plan + + +@dataclass +class StepResult: + """Result of executing a single step.""" + + step_id: str # Reference to ExecutionStep + success: bool # Execution success status + output: str # Step execution output + confidence: ConfidenceMetrics # Confidence evaluation + execution_time_ms: int # Actual execution time + + +class ExecutionStatus(Enum): + """Status of plan or step execution.""" + + PENDING = "pending" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + FAILED = "failed" + RETRYING = "retrying" + + +@dataclass +class PlanExecution: + """Tracks execution state of an entire plan.""" + + plan_id: str + status: ExecutionStatus + current_step: Optional[str] # ID of currently executing step + completed_steps: List[str] # IDs of completed steps + failed_steps: List[str] # IDs of failed steps + results: List[StepResult] # All step results + total_execution_time_ms: int + + +@dataclass +class AgentConfig: + """Configuration for the confidence agent.""" + + default_confidence_threshold: float = 0.75 + max_retries: int = 3 + retry_backoff_factor: float = 2.0 + override_enabled: bool = True + step_timeout_ms: int = 30000 + parallel_execution: bool = False + max_iterations: int = 15 + + # Tool-specific thresholds + tool_thresholds: Dict[str, float] = None + + def __post_init__(self): + if self.tool_thresholds is None: + self.tool_thresholds = { + "GraphQuery": 0.8, + "TextCompletion": 0.7, + "McpTool": 0.6, + "Prompt": 0.7 + } + + +@dataclass +class ContextEntry: + """Entry in the execution context/memory.""" + + key: str + value: Any + step_id: str # Step that created this entry + timestamp: int # Unix timestamp + ttl_seconds: Optional[int] = None # Time to live \ No newline at end of file