diff --git a/docs/runtime/index.md b/docs/runtime/index.md index e6da486c5..598e9852e 100644 --- a/docs/runtime/index.md +++ b/docs/runtime/index.md @@ -43,61 +43,10 @@ The `Runner` acts as the central coordinator for a single user invocation. Its r *Conceptual Runner Loop:* -=== "Python" - - ```py - # Simplified view of Runner's main loop logic - def run(new_query, ...) -> Generator[Event]: - # 1. Append new_query to session event history (via SessionService) - session_service.append_event(session, Event(author='user', content=new_query)) - - # 2. Kick off event loop by calling the agent - agent_event_generator = agent_to_run.run_async(context) - - async for event in agent_event_generator: - # 3. Process the generated event and commit changes - session_service.append_event(session, event) # Commits state/artifact deltas etc. - # memory_service.update_memory(...) # If applicable - # artifact_service might have already been called via context during agent run - - # 4. Yield event for upstream processing (e.g., UI rendering) - yield event - # Runner implicitly signals agent generator can continue after yielding - ``` - -=== "Java" - - ```java - // Simplified conceptual view of the Runner's main loop logic in Java. - public Flowable runConceptual( - Session session, - InvocationContext invocationContext, - Content newQuery - ) { - - // 1. Append new_query to session event history (via SessionService) - // ... - sessionService.appendEvent(session, userEvent).blockingGet(); - - // 2. Kick off event stream by calling the agent - Flowable agentEventStream = agentToRun.runAsync(invocationContext); - - // 3. Process each generated event, commit changes, and "yield" or "emit" - return agentEventStream.map(event -> { - // This mutates the session object (adds event, applies stateDelta). - // The return value of appendEvent (a Single) is conceptually - // just the event itself after processing. - sessionService.appendEvent(session, event).blockingGet(); // Simplified blocking call - - // memory_service.update_memory(...) // If applicable - conceptual - // artifact_service might have already been called via context during agent run - - // 4. "Yield" event for upstream processing - // In RxJava, returning the event in map effectively yields it to the next operator or subscriber. - return event; - }); - } - ``` + + + + ### Execution Logic's Role (Agent, Tool, Callback) @@ -111,112 +60,11 @@ Your code within agents, tools, and callbacks is responsible for the actual comp *Conceptual Execution Logic:* -=== "Python" - - ```py - # Simplified view of logic inside Agent.run_async, callbacks, or tools - - # ... previous code runs based on current state ... - - # 1. Determine a change or output is needed, construct the event - # Example: Updating state - update_data = {'field_1': 'value_2'} - event_with_state_change = Event( - author=self.name, - actions=EventActions(state_delta=update_data), - content=types.Content(parts=[types.Part(text="State updated.")]) - # ... other event fields ... - ) - - # 2. Yield the event to the Runner for processing & commit - yield event_with_state_change - # <<<<<<<<<<<< EXECUTION PAUSES HERE >>>>>>>>>>>> - - # <<<<<<<<<<<< RUNNER PROCESSES & COMMITS THE EVENT >>>>>>>>>>>> - - # 3. Resume execution ONLY after Runner is done processing the above event. - # Now, the state committed by the Runner is reliably reflected. - # Subsequent code can safely assume the change from the yielded event happened. - val = ctx.session.state['field_1'] - # here `val` is guaranteed to be "value_2" (assuming Runner committed successfully) - print(f"Resumed execution. Value of field_1 is now: {val}") - - # ... subsequent code continues ... - # Maybe yield another event later... - ``` - -=== "Java" - - ```java - // Simplified view of logic inside Agent.runAsync, callbacks, or tools - // ... previous code runs based on current state ... - - // 1. Determine a change or output is needed, construct the event - // Example: Updating state - ConcurrentMap updateData = new ConcurrentHashMap<>(); - updateData.put("field_1", "value_2"); - - EventActions actions = EventActions.builder().stateDelta(updateData).build(); - Content eventContent = Content.builder().parts(Part.fromText("State updated.")).build(); - - Event eventWithStateChange = Event.builder() - .author(self.name()) - .actions(actions) - .content(Optional.of(eventContent)) - // ... other event fields ... - .build(); - - // 2. "Yield" the event. In RxJava, this means emitting it into the stream. - // The Runner (or upstream consumer) will subscribe to this Flowable. - // When the Runner receives this event, it will process it (e.g., call sessionService.appendEvent). - // The 'appendEvent' in Java ADK mutates the 'Session' object held within 'ctx' (InvocationContext). - - // <<<<<<<<<<<< CONCEPTUAL PAUSE POINT >>>>>>>>>>>> - // In RxJava, the emission of 'eventWithStateChange' happens, and then the stream - // might continue with a 'flatMap' or 'concatMap' operator that represents - // the logic *after* the Runner has processed this event. - - // To model the "resume execution ONLY after Runner is done processing": - // The Runner's `appendEvent` is usually an async operation itself (returns Single). - // The agent's flow needs to be structured such that subsequent logic - // that depends on the committed state runs *after* that `appendEvent` completes. - - // This is how the Runner typically orchestrates it: - // Runner: - // agent.runAsync(ctx) - // .concatMapEager(eventFromAgent -> - // sessionService.appendEvent(ctx.session(), eventFromAgent) // This updates ctx.session().state() - // .toFlowable() // Emits the event after it's processed - // ) - // .subscribe(processedEvent -> { /* UI renders processedEvent */ }); - - // So, within the agent's own logic, if it needs to do something *after* an event it yielded - // has been processed and its state changes are reflected in ctx.session().state(), - // that subsequent logic would typically be in another step of its reactive chain. - - // For this conceptual example, we'll emit the event, and then simulate the "resume" - // as a subsequent operation in the Flowable chain. - - return Flowable.just(eventWithStateChange) // Step 2: Yield the event - .concatMap(yieldedEvent -> { - // <<<<<<<<<<<< RUNNER CONCEPTUALLY PROCESSES & COMMITS THE EVENT >>>>>>>>>>>> - // At this point, in a real runner, ctx.session().appendEvent(yieldedEvent) would have been called - // by the Runner, and ctx.session().state() would be updated. - // Since we are *inside* the agent's conceptual logic trying to model this, - // we assume the Runner's action has implicitly updated our 'ctx.session()'. - - // 3. Resume execution. - // Now, the state committed by the Runner (via sessionService.appendEvent) - // is reliably reflected in ctx.session().state(). - Object val = ctx.session().state().get("field_1"); - // here `val` is guaranteed to be "value_2" because the `sessionService.appendEvent` - // called by the Runner would have updated the session state within the `ctx` object. - - System.out.println("Resumed execution. Value of field_1 is now: " + val); - - // ... subsequent code continues ... - // If this subsequent code needs to yield another event, it would do so here. - ``` + + + + + This cooperative yield/pause/resume cycle between the `Runner` and your Execution Logic, mediated by `Event` objects, forms the core of the ADK Runtime. @@ -305,102 +153,17 @@ Understanding a few key aspects of how the ADK Runtime handles state, streaming, * **Implication:** Code that runs *after* resuming from a `yield` can reliably assume that the state changes signaled in the *yielded event* have been committed. -=== "Python" - - ```py - # Inside agent logic (conceptual) - - # 1. Modify state - ctx.session.state['status'] = 'processing' - event1 = Event(..., actions=EventActions(state_delta={'status': 'processing'})) - - # 2. Yield event with the delta - yield event1 - # --- PAUSE --- Runner processes event1, SessionService commits 'status' = 'processing' --- - - # 3. Resume execution - # Now it's safe to rely on the committed state - current_status = ctx.session.state['status'] # Guaranteed to be 'processing' - print(f"Status after resuming: {current_status}") - ``` - -=== "Java" - - ```java - // Inside agent logic (conceptual) - // ... previous code runs based on current state ... - - // 1. Prepare state modification and construct the event - ConcurrentHashMap stateChanges = new ConcurrentHashMap<>(); - stateChanges.put("status", "processing"); - - EventActions actions = EventActions.builder().stateDelta(stateChanges).build(); - Content content = Content.builder().parts(Part.fromText("Status update: processing")).build(); - - Event event1 = Event.builder() - .actions(actions) - // ... - .build(); - - // 2. Yield event with the delta - return Flowable.just(event1) - .map( - emittedEvent -> { - // --- CONCEPTUAL PAUSE & RUNNER PROCESSING --- - // 3. Resume execution (conceptually) - // Now it's safe to rely on the committed state. - String currentStatus = (String) ctx.session().state().get("status"); - System.out.println("Status after resuming (inside agent logic): " + currentStatus); // Guaranteed to be 'processing' - - // The event itself (event1) is passed on. - // If subsequent logic within this agent step produced *another* event, - // you'd use concatMap to emit that new event. - return emittedEvent; - }); - - // ... subsequent agent logic might involve further reactive operators - // or emitting more events based on the now-updated `ctx.session().state()`. - ``` + + + + ### "Dirty Reads" of Session State * **Definition:** While commitment happens *after* the yield, code running *later within the same invocation*, but *before* the state-changing event is actually yielded and processed, **can often see the local, uncommitted changes**. This is sometimes called a "dirty read". * **Example:** -=== "Python" - - ```py - # Code in before_agent_callback - callback_context.state['field_1'] = 'value_1' - # State is locally set to 'value_1', but not yet committed by Runner - - # ... agent runs ... - - # Code in a tool called later *within the same invocation* - # Readable (dirty read), but 'value_1' isn't guaranteed persistent yet. - val = tool_context.state['field_1'] # 'val' will likely be 'value_1' here - print(f"Dirty read value in tool: {val}") - - # Assume the event carrying the state_delta={'field_1': 'value_1'} - # is yielded *after* this tool runs and is processed by the Runner. - ``` - -=== "Java" - - ```java - // Modify state - Code in BeforeAgentCallback - // AND stages this change in callbackContext.eventActions().stateDelta(). - callbackContext.state().put("field_1", "value_1"); - - // --- agent runs ... --- - - // --- Code in a tool called later *within the same invocation* --- - // Readable (dirty read), but 'value_1' isn't guaranteed persistent yet. - Object val = toolContext.state().get("field_1"); // 'val' will likely be 'value_1' here - System.out.println("Dirty read value in tool: " + val); - // Assume the event carrying the state_delta={'field_1': 'value_1'} - // is yielded *after* this tool runs and is processed by the Runner. - ``` + * **Implications:** * **Benefit:** Allows different parts of your logic within a single complex step (e.g., multiple callbacks or tool calls before the next LLM turn) to coordinate using state without waiting for a full yield/commit cycle. @@ -418,14 +181,22 @@ This primarily relates to how responses from the LLM are handled, especially whe * **Non-Streaming:** The LLM generates the entire response at once. The framework yields a single event marked as non-partial, which the `Runner` processes fully. * **Why it Matters:** Ensures that state changes are applied atomically and only once based on the *complete* response from the LLM, while still allowing the UI to display text progressively as it's generated. -## Async is Primary (`run_async`) +### Async is Primary (`run_async`) -* **Core Design:** The ADK Runtime is fundamentally built on asynchronous libraries (like Python's `asyncio` and Java's `RxJava`) to handle concurrent operations (like waiting for LLM responses or tool executions) efficiently without blocking. +* **Core Design:** The ADK Runtime is fundamentally built on asynchronous libraries (like Python's `asyncio`) to handle concurrent operations (like waiting for LLM responses or tool executions) efficiently without blocking. * **Main Entry Point:** `Runner.run_async` is the primary method for executing agent invocations. All core runnable components (Agents, specific flows) use `asynchronous` methods internally. * **Synchronous Convenience (`run`):** A synchronous `Runner.run` method exists mainly for convenience (e.g., in simple scripts or testing environments). However, internally, `Runner.run` typically just calls `Runner.run_async` and manages the async event loop execution for you. -* **Developer Experience:** We recommend designing your applications (e.g., web servers using ADK) to be asynchronous for best performance. In Python, this means using `asyncio`; in Java, leverage `RxJava`'s reactive programming model. +* **Developer Experience:** We recommend designing your applications (e.g., web servers using ADK) to be asynchronous for best performance. In Python, this means using `asyncio`. * **Sync Callbacks/Tools:** The ADK framework supports both asynchronous and synchronous functions for tools and callbacks. - * **Blocking I/O:** For long-running synchronous I/O operations, the framework attempts to prevent stalls. Python ADK may use asyncio.to_thread, while Java ADK often relies on appropriate RxJava schedulers or wrappers for blocking calls. - * **CPU-Bound Work:** Purely CPU-intensive synchronous tasks will still block their execution thread in both environments. + * **Blocking I/O:** For long-running synchronous I/O operations, the framework attempts to prevent stalls. Python ADK may use asyncio.to_thread. + * **CPU-Bound Work:** Purely CPU-intensive synchronous tasks will still block their execution thread. + +### Pausing and Resuming Invocations + +The ADK runtime supports the ability to pause and resume agent invocations. This is particularly useful for tools that need to perform long-running operations or wait for external input, such as human-in-the-loop scenarios. + +The `LongRunningFunctionTool` is designed for this purpose. When an agent calls a `LongRunningFunctionTool`, the tool can signal to the runtime to pause the invocation. This is done by returning a result that indicates the operation is pending. The agent's execution is then suspended, and the agent client is notified. + +The client can then, at a later time, resume the invocation by sending back an intermediate or final response for the long-running operation. The ADK runtime will then continue the agent's execution from where it was paused. -Understanding these behaviors helps you write more robust ADK applications and debug issues related to state consistency, streaming updates, and asynchronous execution. +This mechanism allows agents to handle tasks that take a significant amount of time without blocking the entire system. You can find a more detailed explanation and code examples in the [Long Running Function Tools documentation](../tools/function-tools.md#long-run-tool). \ No newline at end of file