|
| 1 | +# Streaming Tools |
| 2 | + |
| 3 | +!!! info |
| 4 | + |
| 5 | + This is only supported in streaming(live) agents/api. |
| 6 | + |
| 7 | +Streaming tools allows tools(functions) to stream intermediate results back to agents and agents can respond to those intermediate results. |
| 8 | +For example, we can use streaming tools to monitor the changes of the stock price and have the agent react to it. Another example is we can have the agent monitor the video stream, and when there is changes in video stream, the agent can report the changes. |
| 9 | + |
| 10 | +To define a streaming tool, you must adhere to the following: |
| 11 | + |
| 12 | +1. **Asynchronous Function:** The tool must be an `async` Python function. |
| 13 | +2. **AsyncGenerator Return Type:** The function must be typed to return an `AsyncGenerator`. The first type parameter to `AsyncGenerator` is the type of the data you `yield` (e.g., `str` for text messages, or a custom object for structured data). The second type parameter is typically `None` if the generator doesn't receive values via `send()`. |
| 14 | + |
| 15 | + |
| 16 | +We support two types of streaming tools: |
| 17 | +- Simple type. This is a one type of streaming tools that only take non video/audio streams(the streams that you feed to adk web or adk runner) as input. |
| 18 | +- Video streaming tools. This only works in video streaming and the video stream(the streams that you feed to adk web or adk runner) will be passed into this function. |
| 19 | + |
| 20 | +Now let's define an agent that can monitor stock price changes and monitor the video stream changes. |
| 21 | + |
| 22 | +```python |
| 23 | +import asyncio |
| 24 | +from typing import AsyncGenerator |
| 25 | + |
| 26 | +from agents.agents import LiveRequestQueue |
| 27 | +from agents.agents.llm_agent import Agent |
| 28 | +from agents.tools.function_tool import FunctionTool |
| 29 | +from google.genai import Client |
| 30 | +from google.genai import types as genai_types |
| 31 | + |
| 32 | + |
| 33 | +async def monitor_stock_price(stock_symbol: str) -> AsyncGenerator[str, None]: |
| 34 | + """This function will monitor the price for the given stock_symbol in a continuous, streaming and asynchronously way.""" |
| 35 | + print(f"Start monitor stock price for {stock_symbol}!") |
| 36 | + |
| 37 | + # Let's mock stock price change. |
| 38 | + await asyncio.sleep(4) |
| 39 | + price_alert1 = f"the price for {stock_symbol} is 300" |
| 40 | + yield price_alert1 |
| 41 | + print(price_alert1) |
| 42 | + |
| 43 | + await asyncio.sleep(4) |
| 44 | + price_alert1 = f"the price for {stock_symbol} is 400" |
| 45 | + yield price_alert1 |
| 46 | + print(price_alert1) |
| 47 | + |
| 48 | + await asyncio.sleep(20) |
| 49 | + price_alert1 = f"the price for {stock_symbol} is 900" |
| 50 | + yield price_alert1 |
| 51 | + print(price_alert1) |
| 52 | + |
| 53 | + await asyncio.sleep(20) |
| 54 | + price_alert1 = f"the price for {stock_symbol} is 500" |
| 55 | + yield price_alert1 |
| 56 | + print(price_alert1) |
| 57 | + |
| 58 | + |
| 59 | +# for video streaming, `input_stream: LiveRequestQueue` is required and reserved key parameter for ADK to pass the video streams in. |
| 60 | +async def monitor_video_stream( |
| 61 | + input_stream: LiveRequestQueue, |
| 62 | +) -> AsyncGenerator[str, None]: |
| 63 | + """Monitor how many people are in the video streams.""" |
| 64 | + print("start monitor_video_stream!") |
| 65 | + client = Client(vertexai=False) |
| 66 | + prompt_text = ( |
| 67 | + "Count the number of people in this image. Just respond with a numeric" |
| 68 | + " number." |
| 69 | + ) |
| 70 | + last_count = None |
| 71 | + while True: |
| 72 | + last_valid_req = None |
| 73 | + print("Start monitoring loop") |
| 74 | + |
| 75 | + # use this loop to pull the latest images and discard the old ones |
| 76 | + while input_stream._queue.qsize() != 0: |
| 77 | + live_req = await input_stream.get() |
| 78 | + |
| 79 | + if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg": |
| 80 | + last_valid_req = live_req |
| 81 | + |
| 82 | + # If we found a valid image, process it |
| 83 | + if last_valid_req is not None: |
| 84 | + print("Processing the most recent frame from the queue") |
| 85 | + |
| 86 | + # Create an image part using the blob's data and mime type |
| 87 | + image_part = genai_types.Part.from_bytes( |
| 88 | + data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type |
| 89 | + ) |
| 90 | + |
| 91 | + contents = genai_types.Content( |
| 92 | + role="user", |
| 93 | + parts=[image_part, genai_types.Part.from_text(prompt_text)], |
| 94 | + ) |
| 95 | + |
| 96 | + # Call the model to generate content based on the provided image and prompt |
| 97 | + response = client.models.generate_content( |
| 98 | + model="gemini-2.0-flash-exp", |
| 99 | + contents=contents, |
| 100 | + config=genai_types.GenerateContentConfig( |
| 101 | + system_instruction=( |
| 102 | + "You are a helpful video analysis assistant. You can count" |
| 103 | + " the number of people in this image or video. Just respond" |
| 104 | + " with a numeric number." |
| 105 | + ) |
| 106 | + ), |
| 107 | + ) |
| 108 | + if not last_count: |
| 109 | + last_count = response.candidates[0].content.parts[0].text |
| 110 | + elif last_count != response.candidates[0].content.parts[0].text: |
| 111 | + last_count = response.candidates[0].content.parts[0].text |
| 112 | + yield response |
| 113 | + print("response:", response) |
| 114 | + |
| 115 | + # Wait before checking for new images |
| 116 | + await asyncio.sleep(0.5) |
| 117 | + |
| 118 | + |
| 119 | +# Use this exact function to help ADK stop your streaming tools when requested. |
| 120 | +# for example, if we want to stop `monitor_stock_price`, then the agent will |
| 121 | +# invoke this function with stop_streaming(function_name=monitor_stock_price). |
| 122 | +def stop_streaming(function_name: str): |
| 123 | + """Stop the streaming |
| 124 | +
|
| 125 | + Args: |
| 126 | + function_name: The name of the streaming function to stop. |
| 127 | + """ |
| 128 | + pass |
| 129 | + |
| 130 | + |
| 131 | +root_agent = Agent( |
| 132 | + model="gemini-2.0-flash-exp", |
| 133 | + name="video_streaming_agent", |
| 134 | + instruction=""" |
| 135 | + You are a monitoring agent. You can do video monitoring and stock price monitoring |
| 136 | + using the provided tools/functions. |
| 137 | + When users want to monitor a video stream, |
| 138 | + You can use monitor_video_stream function to do that. When monitor_video_stream |
| 139 | + returns the alert, you should tell the users. |
| 140 | + When users want to monitor a stock price, you can use monitor_stock_price. |
| 141 | + Don't ask too many questions. Don't be too talkative. |
| 142 | + """, |
| 143 | + tools=[ |
| 144 | + monitor_video_stream, |
| 145 | + monitor_stock_price, |
| 146 | + FunctionTool(stop_streaming), |
| 147 | + ] |
| 148 | +) |
| 149 | +``` |
| 150 | + |
| 151 | +Here are some sample queries to test: |
| 152 | +- Help me monitor the stock price for $XYZ stock. |
| 153 | +- Help me monitor how many people are there in the video stream. |
0 commit comments