Skip to content

Commit be72bd6

Browse files
authored
Merge branch 'main' into feat/list-sessions-pagination
2 parents 8ee2974 + 4b677e7 commit be72bd6

File tree

4 files changed

+544
-82
lines changed

4 files changed

+544
-82
lines changed

src/google/adk/a2a/converters/to_adk_event.py

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
from collections.abc import Callable
18+
import json
1819
import logging
1920
from typing import Any
2021
from typing import List
@@ -28,9 +29,11 @@
2829
from a2a.types import TaskState
2930
from a2a.types import TaskStatusUpdateEvent
3031
from google.genai import types as genai_types
32+
from pydantic import ValidationError
3133

3234
from ...agents.invocation_context import InvocationContext
3335
from ...events.event import Event
36+
from ...events.event_actions import EventActions
3437
from ..experimental import a2a_experimental
3538
from .part_converter import A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY
3639
from .part_converter import A2APartToGenAIPartConverter
@@ -171,11 +174,15 @@ def _create_event(
171174
output_parts: List[genai_types.Part],
172175
invocation_context: Optional[InvocationContext],
173176
author: Optional[str],
177+
actions: Optional[EventActions] = None,
174178
long_running_function_ids: Optional[set[str]] = None,
175179
partial: bool = False,
176180
) -> Optional[Event]:
177181
"""Creates an ADK event from parts and metadata."""
178-
if not output_parts:
182+
event_actions = actions or EventActions()
183+
if not output_parts and not event_actions.model_dump(
184+
exclude_none=True, exclude_defaults=True
185+
):
179186
return None
180187

181188
event = Event(
@@ -186,19 +193,89 @@ def _create_event(
186193
),
187194
author=author or "a2a agent",
188195
branch=invocation_context.branch if invocation_context else None,
196+
actions=event_actions,
189197
long_running_tool_ids=(
190198
long_running_function_ids if long_running_function_ids else None
191199
),
192-
content=genai_types.Content(
193-
role="model",
194-
parts=output_parts,
200+
content=(
201+
genai_types.Content(
202+
role="model",
203+
parts=output_parts,
204+
)
205+
if output_parts
206+
else None
195207
),
196208
partial=partial,
197209
)
198210

199211
return event
200212

201213

214+
def _parse_adk_metadata_value(value: Any) -> Any:
215+
"""Parses ADK metadata values serialized through A2A."""
216+
if not isinstance(value, str):
217+
return value
218+
219+
try:
220+
return json.loads(value)
221+
except json.JSONDecodeError:
222+
return value
223+
224+
225+
def _extract_event_actions(
226+
metadata: Optional[dict[str, Any]],
227+
) -> EventActions:
228+
"""Extracts ADK event actions from A2A metadata."""
229+
if not metadata:
230+
return EventActions()
231+
232+
raw_actions = metadata.get(_get_adk_metadata_key("actions"))
233+
if raw_actions is None:
234+
return EventActions()
235+
236+
parsed_actions = _parse_adk_metadata_value(raw_actions)
237+
if not isinstance(parsed_actions, dict):
238+
logger.warning(
239+
"Ignoring invalid ADK actions metadata of type %s",
240+
type(parsed_actions).__name__,
241+
)
242+
return EventActions()
243+
244+
try:
245+
return EventActions.model_validate(parsed_actions)
246+
except ValidationError as error:
247+
logger.warning("Ignoring invalid ADK actions metadata: %s", error)
248+
return EventActions()
249+
250+
251+
def _merge_top_level_dicts(
252+
base: dict[str, Any], new_values: dict[str, Any]
253+
) -> dict[str, Any]:
254+
"""Merges dictionaries while preserving top-level overwrite semantics."""
255+
merged = dict(base)
256+
for key, value in new_values.items():
257+
if (
258+
key in merged
259+
and isinstance(merged[key], dict)
260+
and isinstance(value, dict)
261+
):
262+
merged[key] = {**merged[key], **value}
263+
else:
264+
merged[key] = value
265+
return merged
266+
267+
268+
def _merge_event_actions(
269+
existing_actions: EventActions, new_actions: EventActions
270+
) -> EventActions:
271+
"""Merges action metadata from multiple A2A sources."""
272+
merged_actions_data = _merge_top_level_dicts(
273+
existing_actions.model_dump(exclude_none=True, by_alias=True),
274+
new_actions.model_dump(exclude_none=True, by_alias=True),
275+
)
276+
return EventActions.model_validate(merged_actions_data)
277+
278+
202279
@a2a_experimental
203280
def convert_a2a_task_to_event(
204281
a2a_task: Task,
@@ -226,19 +303,28 @@ def convert_a2a_task_to_event(
226303
raise ValueError("A2A task cannot be None")
227304

228305
try:
306+
event_actions = EventActions()
229307
output_parts = []
230308
long_running_function_ids = set()
231309
if a2a_task.artifacts:
232310
artifact_parts = [
233311
part for artifact in a2a_task.artifacts for part in artifact.parts
234312
]
313+
for artifact in a2a_task.artifacts:
314+
event_actions = _merge_event_actions(
315+
event_actions, _extract_event_actions(artifact.metadata)
316+
)
235317
output_parts, _ = _convert_a2a_parts_to_adk_parts(
236318
artifact_parts, part_converter
237319
)
238320
if (
239321
a2a_task.status.message
240322
and a2a_task.status.state == TaskState.input_required
241323
):
324+
event_actions = _merge_event_actions(
325+
event_actions,
326+
_extract_event_actions(a2a_task.status.message.metadata),
327+
)
242328
parts, ids = _convert_a2a_parts_to_adk_parts(
243329
a2a_task.status.message.parts, part_converter
244330
)
@@ -249,6 +335,7 @@ def convert_a2a_task_to_event(
249335
output_parts,
250336
invocation_context,
251337
author,
338+
event_actions,
252339
long_running_function_ids,
253340
)
254341

@@ -288,7 +375,12 @@ def convert_a2a_message_to_event(
288375
output_parts, _ = _convert_a2a_parts_to_adk_parts(
289376
a2a_message.parts, part_converter
290377
)
291-
return _create_event(output_parts, invocation_context, author)
378+
return _create_event(
379+
output_parts,
380+
invocation_context,
381+
author,
382+
_extract_event_actions(a2a_message.metadata),
383+
)
292384

293385
except Exception as e:
294386
logger.error("Failed to convert A2A message to event: %s", e)
@@ -319,7 +411,11 @@ def convert_a2a_status_update_to_event(
319411
try:
320412
output_parts = []
321413
long_running_function_ids = set()
414+
event_actions = EventActions()
322415
if a2a_status_update.status.message:
416+
event_actions = _extract_event_actions(
417+
a2a_status_update.status.message.metadata
418+
)
323419
parts, ids = _convert_a2a_parts_to_adk_parts(
324420
a2a_status_update.status.message.parts, part_converter
325421
)
@@ -330,6 +426,7 @@ def convert_a2a_status_update_to_event(
330426
output_parts,
331427
invocation_context,
332428
author,
429+
event_actions,
333430
long_running_function_ids,
334431
)
335432
except Exception as e:
@@ -367,6 +464,7 @@ def convert_a2a_artifact_update_to_event(
367464
output_parts,
368465
invocation_context,
369466
author,
467+
_extract_event_actions(a2a_artifact_update.artifact.metadata),
370468
partial=not a2a_artifact_update.last_chunk,
371469
)
372470
except Exception as e:

src/google/adk/a2a/utils/agent_to_a2a.py

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414

1515
from __future__ import annotations
1616

17+
from contextlib import asynccontextmanager
1718
import logging
19+
from typing import AsyncIterator
20+
from typing import Callable
1821
from typing import Optional
1922
from typing import Union
2023

@@ -82,6 +85,7 @@ def to_a2a(
8285
agent_card: Optional[Union[AgentCard, str]] = None,
8386
push_config_store: Optional[PushNotificationConfigStore] = None,
8487
runner: Optional[Runner] = None,
88+
lifespan: Optional[Callable[[Starlette], AsyncIterator[None]]] = None,
8589
) -> Starlette:
8690
"""Convert an ADK agent to a A2A Starlette application.
8791
@@ -98,6 +102,11 @@ def to_a2a(
98102
config RPC methods are supported.
99103
runner: Optional pre-built Runner object. If not provided, a default
100104
runner will be created using in-memory services.
105+
lifespan: Optional async context manager for Starlette lifespan
106+
events. Use this to run startup/shutdown logic (e.g. initializing
107+
database connections or loading resources). The context manager
108+
receives the Starlette app instance and can set state on
109+
``app.state``.
101110
102111
Returns:
103112
A Starlette application that can be run with uvicorn
@@ -109,6 +118,15 @@ def to_a2a(
109118
110119
# Or with custom agent card:
111120
app = to_a2a(agent, agent_card=my_custom_agent_card)
121+
122+
# Or with lifespan:
123+
@asynccontextmanager
124+
async def lifespan(app):
125+
app.state.db = await init_db()
126+
yield
127+
await app.state.db.close()
128+
129+
app = to_a2a(agent, lifespan=lifespan)
112130
"""
113131
# Set up ADK logging to ensure logs are visible when using uvicorn directly
114132
adk_logger = logging.getLogger("google_adk")
@@ -151,11 +169,8 @@ async def create_runner() -> Runner:
151169
rpc_url=rpc_url,
152170
)
153171

154-
# Create a Starlette app that will be configured during startup
155-
app = Starlette()
156-
157-
# Add startup handler to build the agent card and configure A2A routes
158-
async def setup_a2a():
172+
# Build the agent card and configure A2A routes
173+
async def setup_a2a(app: Starlette):
159174
# Use provided agent card or build one asynchronously
160175
if provided_agent_card is not None:
161176
final_agent_card = provided_agent_card
@@ -173,7 +188,19 @@ async def setup_a2a():
173188
app,
174189
)
175190

176-
# Store the setup function to be called during startup
177-
app.add_event_handler("startup", setup_a2a)
191+
# Compose a lifespan that runs A2A setup and the user's lifespan
192+
@asynccontextmanager
193+
async def _combined_lifespan(
194+
app: Starlette,
195+
) -> AsyncIterator[None]:
196+
await setup_a2a(app)
197+
if lifespan:
198+
async with lifespan(app):
199+
yield
200+
else:
201+
yield
202+
203+
# Create a Starlette app with the composed lifespan
204+
app = Starlette(lifespan=_combined_lifespan)
178205

179206
return app

0 commit comments

Comments
 (0)