Skip to content

Commit 6d8d499

Browse files
authored
feat: add audio Streaming sample (#244)
* feat: add audio Streaming sample add audio Streaming sample * fix: added license headers added license headers
1 parent 99e3903 commit 6d8d499

File tree

16 files changed

+1349
-346
lines changed

16 files changed

+1349
-346
lines changed
122 KB
Loading

docs/assets/adk-streaming-mic.png

20.6 KB
Loading

docs/assets/adk-streaming-text.png

71.6 KB
Loading

docs/get-started/quickstart-streaming.md

Lines changed: 2 additions & 342 deletions
Original file line numberDiff line numberDiff line change
@@ -166,348 +166,8 @@ Stop `adk web` by pressing `Ctrl-C` on the console.
166166

167167
The following features will be supported in the future versions of the ADK Streaming: Callback, LongRunningTool, ExampleTool, and Shell agent (e.g. SequentialAgent).
168168

169-
## 5. Building a Custom Streaming App (Optional) {#5.-build-custom-app}
170-
171-
In the previous section, we have checked that our basic search agent works with the ADK Streaming using `adk web` tool. In the this section, we will learn how to build your own web application capable of the streaming communication using [FastAPI](https://fastapi.tiangolo.com/).
172-
173-
Add `static` directory under `app`, and add `main.py` and `index.html` as empty files, as in the following structure:
174-
175-
```console
176-
adk-streaming/ # Project folder
177-
└── app/ # the web app folder
178-
├── main.py # FastAPI web app
179-
└── static/ # Static content folder
180-
└── index.html # The web client page
181-
```
182-
183-
By adding the directories and files above, the entire directory structure and files will look like:
184-
185-
```console
186-
adk-streaming/ # Project folder
187-
└── app/ # the web app folder
188-
├── main.py # FastAPI web app
189-
├── static/ # Static content folder
190-
| └── index.html # The web client page
191-
├── .env # Gemini API key
192-
└── google_search_agent/ # Agent folder
193-
├── __init__.py # Python package
194-
└── agent.py # Agent definition
195-
```
196-
197-
**main.py**
198-
199-
Copy-paste the following code block to the main.py file.
200-
201-
```py
202-
import os
203-
import json
204-
import asyncio
205-
206-
from pathlib import Path
207-
from dotenv import load_dotenv
208-
209-
from google.genai.types import (
210-
Part,
211-
Content,
212-
)
213-
214-
from google.adk.runners import Runner
215-
from google.adk.agents import LiveRequestQueue
216-
from google.adk.agents.run_config import RunConfig
217-
from google.adk.sessions.in_memory_session_service import InMemorySessionService
218-
219-
from fastapi import FastAPI, WebSocket
220-
from fastapi.staticfiles import StaticFiles
221-
from fastapi.responses import FileResponse
222-
223-
from google_search_agent.agent import root_agent
224-
225-
#
226-
# ADK Streaming
227-
#
228-
229-
# Load Gemini API Key
230-
load_dotenv()
231-
232-
APP_NAME = "ADK Streaming example"
233-
session_service = InMemorySessionService()
234-
235-
236-
def start_agent_session(session_id: str):
237-
"""Starts an agent session"""
238-
239-
# Create a Session
240-
session = session_service.create_session(
241-
app_name=APP_NAME,
242-
user_id=session_id,
243-
session_id=session_id,
244-
)
245-
246-
# Create a Runner
247-
runner = Runner(
248-
app_name=APP_NAME,
249-
agent=root_agent,
250-
session_service=session_service,
251-
)
252-
253-
# Set response modality = TEXT
254-
run_config = RunConfig(response_modalities=["TEXT"])
255-
256-
# Create a LiveRequestQueue for this session
257-
live_request_queue = LiveRequestQueue()
258-
259-
# Start agent session
260-
live_events = runner.run_live(
261-
session=session,
262-
live_request_queue=live_request_queue,
263-
run_config=run_config,
264-
)
265-
return live_events, live_request_queue
266-
267-
268-
async def agent_to_client_messaging(websocket, live_events):
269-
"""Agent to client communication"""
270-
while True:
271-
async for event in live_events:
272-
# turn_complete
273-
if event.turn_complete:
274-
await websocket.send_text(json.dumps({"turn_complete": True}))
275-
print("[TURN COMPLETE]")
276-
277-
if event.interrupted:
278-
await websocket.send_text(json.dumps({"interrupted": True}))
279-
print("[INTERRUPTED]")
280-
281-
# Read the Content and its first Part
282-
part: Part = (
283-
event.content and event.content.parts and event.content.parts[0]
284-
)
285-
if not part or not event.partial:
286-
continue
287-
288-
# Get the text
289-
text = event.content and event.content.parts and event.content.parts[0].text
290-
if not text:
291-
continue
292-
293-
# Send the text to the client
294-
await websocket.send_text(json.dumps({"message": text}))
295-
print(f"[AGENT TO CLIENT]: {text}")
296-
await asyncio.sleep(0)
297-
298-
299-
async def client_to_agent_messaging(websocket, live_request_queue):
300-
"""Client to agent communication"""
301-
while True:
302-
text = await websocket.receive_text()
303-
content = Content(role="user", parts=[Part.from_text(text=text)])
304-
live_request_queue.send_content(content=content)
305-
print(f"[CLIENT TO AGENT]: {text}")
306-
await asyncio.sleep(0)
307-
308-
309-
#
310-
# FastAPI web app
311-
#
312-
313-
app = FastAPI()
314-
315-
STATIC_DIR = Path("static")
316-
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
317-
318-
319-
@app.get("/")
320-
async def root():
321-
"""Serves the index.html"""
322-
return FileResponse(os.path.join(STATIC_DIR, "index.html"))
323-
324-
325-
@app.websocket("/ws/{session_id}")
326-
async def websocket_endpoint(websocket: WebSocket, session_id: int):
327-
"""Client websocket endpoint"""
328-
329-
# Wait for client connection
330-
await websocket.accept()
331-
print(f"Client #{session_id} connected")
332-
333-
# Start agent session
334-
session_id = str(session_id)
335-
live_events, live_request_queue = start_agent_session(session_id)
336-
337-
# Start tasks
338-
agent_to_client_task = asyncio.create_task(
339-
agent_to_client_messaging(websocket, live_events)
340-
)
341-
client_to_agent_task = asyncio.create_task(
342-
client_to_agent_messaging(websocket, live_request_queue)
343-
)
344-
await asyncio.gather(agent_to_client_task, client_to_agent_task)
345-
346-
# Disconnected
347-
print(f"Client #{session_id} disconnected")
348-
349-
```
350-
351-
This code creates a real-time chat application using ADK and FastAPI. It sets up a WebSocket endpoint where clients can connect and interact with a Google Search Agent.
352-
353-
Key functionalities:
354-
355-
* Loads the Gemini API key.
356-
* Uses ADK to manage agent sessions and run the \`google\_search\_agent\`.
357-
* \`start\_agent\_session\` initializes an agent session with a live request queue for real-time communication.
358-
* \`agent\_to\_client\_messaging\` asynchronously streams the agent's text responses and status updates (turn complete, interrupted) to the connected WebSocket client.
359-
* \`client\_to\_agent\_messaging\` asynchronously receives text messages from the WebSocket client and sends them as user input to the agent.
360-
* FastAPI serves a static frontend and handles WebSocket connections at \`/ws/{session\_id}\`.
361-
* When a client connects, it starts an agent session and creates concurrent tasks for bidirectional communication between the client and the agent via WebSockets.
362-
363-
Copy-paste the following code block to the `index.html` file.
364-
365-
```javascript title="index.html"
366-
<!doctype html>
367-
<html>
368-
<head>
369-
<title>ADK Streaming Test</title>
370-
</head>
371-
372-
<body>
373-
<h1>ADK Streaming Test</h1>
374-
<div
375-
id="messages"
376-
style="height: 300px; overflow-y: auto; border: 1px solid black"></div>
377-
<br />
378-
379-
<form id="messageForm">
380-
<label for="message">Message:</label>
381-
<input type="text" id="message" name="message" />
382-
<button type="submit" id="sendButton" disabled>Send</button>
383-
</form>
384-
</body>
385-
386-
<script>
387-
// Connect the server with a WebSocket connection
388-
const sessionId = Math.random().toString().substring(10);
389-
const ws_url = "ws://" + window.location.host + "/ws/" + sessionId;
390-
let ws = new WebSocket(ws_url);
391-
392-
// Get DOM elements
393-
const messageForm = document.getElementById("messageForm");
394-
const messageInput = document.getElementById("message");
395-
const messagesDiv = document.getElementById("messages");
396-
let currentMessageId = null;
397-
398-
// WebSocket handlers
399-
function addWebSocketHandlers(ws) {
400-
ws.onopen = function () {
401-
console.log("WebSocket connection opened.");
402-
document.getElementById("sendButton").disabled = false;
403-
document.getElementById("messages").textContent = "Connection opened";
404-
addSubmitHandler(this);
405-
};
406-
407-
ws.onmessage = function (event) {
408-
// Parse the incoming message
409-
const packet = JSON.parse(event.data);
410-
console.log(packet);
411-
412-
// Check if the turn is complete
413-
// if turn complete, add new message
414-
if (packet.turn_complete && packet.turn_complete == true) {
415-
currentMessageId = null;
416-
return;
417-
}
418-
419-
// add a new message for a new turn
420-
if (currentMessageId == null) {
421-
currentMessageId = Math.random().toString(36).substring(7);
422-
const message = document.createElement("p");
423-
message.id = currentMessageId;
424-
// Append the message element to the messagesDiv
425-
messagesDiv.appendChild(message);
426-
}
427-
428-
// Add message text to the existing message element
429-
const message = document.getElementById(currentMessageId);
430-
message.textContent += packet.message;
431-
432-
// Scroll down to the bottom of the messagesDiv
433-
messagesDiv.scrollTop = messagesDiv.scrollHeight;
434-
};
435-
436-
// When the connection is closed, try reconnecting
437-
ws.onclose = function () {
438-
console.log("WebSocket connection closed.");
439-
document.getElementById("sendButton").disabled = true;
440-
document.getElementById("messages").textContent = "Connection closed";
441-
setTimeout(function () {
442-
console.log("Reconnecting...");
443-
ws = new WebSocket(ws_url);
444-
addWebSocketHandlers(ws);
445-
}, 5000);
446-
};
447-
448-
ws.onerror = function (e) {
449-
console.log("WebSocket error: ", e);
450-
};
451-
}
452-
addWebSocketHandlers(ws);
453-
454-
// Add submit handler to the form
455-
function addSubmitHandler(ws) {
456-
messageForm.onsubmit = function (e) {
457-
e.preventDefault();
458-
const message = messageInput.value;
459-
if (message) {
460-
const p = document.createElement("p");
461-
p.textContent = "> " + message;
462-
messagesDiv.appendChild(p);
463-
ws.send(message);
464-
messageInput.value = "";
465-
}
466-
return false;
467-
};
468-
}
469-
</script>
470-
</html>
471-
```
472-
473-
This HTML file sets up a basic webpage with:
474-
475-
* A form (\`messageForm\`) with an input field for typing messages and a "Send" button.
476-
* JavaScript that:
477-
* Connects to a WebSocket server at \`wss://\[current host\]/ws/\[random session ID\]\`.
478-
* Enables the "Send" button upon successful connection.
479-
* Appends received messages from the WebSocket to the \`messages\` div, handling streaming responses and turn completion.
480-
* Sends the text entered in the input field to the WebSocket server when the form is submitted.
481-
* Attempts to reconnect if the WebSocket connection closes.
482-
483-
## 6\. Interact with Your Streaming app {#4.-interact-with-your-streaming-app}
484-
485-
1\. **Navigate to the Correct Directory:**
486-
487-
To run your agent effectively, you need to be in the **app folder (`adk-streaming/app`)**
488-
489-
2\. **Start the Fast API**: Run the following command to start CLI interface with
490-
491-
```console
492-
uvicorn main:app --reload
493-
```
494-
495-
3\. **Access the UI:** Once the UI server starts, the terminal will display a local URL (e.g., [http://localhost:8000](http://localhost:8501)). Click this link to open the UI in your browser.
496-
497-
Now you should see the UI like this:
498-
499-
<img src="../../assets/adk-streaming.png" alt="ADK Streaming Test">
500-
501-
Try asking a question `What is Gemini?`. The agent will use Google Search to respond to your queries. You would notice that the UI shows the agent's response as streaming text. You can also send messages to the agent at any time, even while the agent is still responding. This demonstrates the bidirectional communication capability of ADK Streaming.
502-
503-
Benefits over conventional synchronous web apps:
504-
505-
* Real-time two-way communication: Seamless interaction.
506-
* More responsive and engaging: No need to wait for full responses or constant refreshing. Feels like a live conversation.
507-
* Can be extended to multimodal apps with audio, image and video streaming support.
508-
509169
Congratulations\! You've successfully created and interacted with your first Streaming agent using ADK\!
510170

511-
## Next steps
171+
## Next steps: build custom streaming app
512172

513-
* **Add audio/image modality:** with the Streaming, you can also have real-time communication with the agent using audio and image. We will add more samples for the multimodal support in the future. Stay tuned!
173+
In [Custom Audio Streaming app](../streaming/custom-streaming.md) tutorial, it overviews the server and client code for a custom asynchronous web app built with ADK Streaming and [FastAPI](https://fastapi.tiangolo.com/), enabling real-time, bidirectional audio and text communication.

0 commit comments

Comments
 (0)