feat(a2a): add A2A protocol plugin for Genkit agents#5607
Conversation
Bump Next.js and use caret version ranges (^16.2.6) for next and eslint-config-next to allow compatible minor/patch updates.
Add a new test app demonstrating Genkit integration with Vercel AI Elements, including a weather agent with tool-calling and a banking agent showcasing the human-in-the-loop interrupt and restart flows. Track the `lib/` directory as source code by overriding the global `lib` gitignore rule, since this app uses it for source rather than build output.
Replace low-level streamBidi sessions with the higher-level chat() interface across agent testapps. Use chat.detach()/task.wait() for background turns, chat.sendStream() with turn.response/interrupts for streaming and approvals, and rely on the chat to track snapshotId. This simplifies the examples by removing manual session management, polling loops, and snapshot/state bookkeeping.
Add agent-core module and extend the Agent interface to expose the ergonomic, transport-agnostic AgentAPI (chat, loadChat, getSnapshot, abort) alongside the lower-level BidiAction surface. This lets server- and client-side code share one interface via an in-process transport that drives the agent action directly without HTTP. Also export the Media type from the ai package index and model module.
Apply streamed RFC 6902 JSON Patches (chunk.customPatch) to a locally tracked custom state and emit it as transient data-custom UI chunks, enabling React apps to render live agent progress.
Add `customAgentWithMultiCustomState` test agent and document the `customPatch` streaming behavior in the agents conformance testing guide. The new agent performs multiple sequential custom-state updates in a single turn to verify that the first patch is a whole-document replace and subsequent patches are incremental JSON Patch diffs. Update `expectChunks` assertion semantics to cover partial matching for `customPatch` chunks and bump the documented test count from 36 to 39.
Replace the untyped `status` field in AgentStreamChunkSchema with a typed `customPatch` field using RFC 6902 (JSON Patch) operations. This enables the runtime to emit deltas to the session's custom state during a turn, allowing clients to keep tracked custom state live mid-stream.
…atch
Replace `sendChunk({ status })` with `session.updateCustom` mutations in the
research agent so the runtime auto-emits `customPatch` chunks, keeping the
client's tracked custom state live mid-stream. Add a typed `status` field to
`ResearchState` and restore chat history (messages, tool calls/responses) in
the web app.
Remove the newSnapshotId property from AgentInit interface, SessionRunner options, and defineCustomAgent as it was unused throughout the codebase.
Consolidate duplicated logic in agent-core and agent modules: - Add `hydrateFromState` to centralize state/messages/artifacts hydration in AgentChatImpl, reused by both connect and snapshot loading paths - Extract `resolveSnapshot` and `runAbort` helpers shared between defineAction surfaces and ergonomic composite methods - Add `startBidi` helper for single-turn bidi stream handling Reduces code duplication and improves maintainability without changing behavior.
Replace the function-based ClientStateTransform type with a ClientTransform interface supporting both `state` and `chunk` projections. The `state` member reshapes/redacts session state at rest, while the new `chunk` member reshapes/redacts each stream chunk in flight (with nullish return dropping the chunk). Chunk emission is centralized through a single emitter so the transform applies consistently across all stream output.
There was a problem hiding this comment.
Code Review
This pull request introduces the @genkit-ai/a2a plugin, which exposes Genkit Agents over the Agent2Agent (A2A) protocol using the @a2a-js/sdk. It includes mapping utilities, an agent card generator, a custom request handler supporting streaming and interrupts, and a comprehensive travel concierge test application. The review feedback focuses on improving robustness and security, specifically recommending a FIFO eviction policy for the unbounded in-memory task map to prevent memory leaks, replacing a regular expression with string slicing to mitigate ReDoS risks, adding optional chaining to prevent potential null-pointer crashes on agent metadata, and simplifying the fresh message mapping to eliminate an unsafe non-null assertion.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| status: { state: 'submitted', timestamp: new Date().toISOString() }, | ||
| history: [userMessage], | ||
| }; | ||
| this.tasks.set(taskId, task); |
There was a problem hiding this comment.
The tasks map is an unbounded in-memory Map. In a long-running server environment, this map will grow indefinitely as new tasks are created, leading to a memory leak and potential Out-Of-Memory (OOM) errors. Since Map in JavaScript preserves insertion order, we can easily implement a simple FIFO eviction policy to keep only the most recent tasks (e.g., up to 10,000 tasks) and prevent memory exhaustion.
| this.tasks.set(taskId, task); | |
| this.tasks.set(taskId, task); | |
| if (this.tasks.size > 10000) { | |
| const oldestKey = this.tasks.keys().next().value; | |
| if (oldestKey !== undefined) { | |
| this.tasks.delete(oldestKey); | |
| } | |
| } |
| function parseDataUri( | ||
| url: string | ||
| ): { mimeType?: string; bytes: string } | undefined { | ||
| const match = /^data:([^;,]*)?(;base64)?,(.*)$/s.exec(url); | ||
| if (!match) return undefined; | ||
| const [, mimeType, isBase64, data] = match; | ||
| // Only base64 inline data maps cleanly to A2A `bytes`; percent-encoded text | ||
| // data uris are left as a uri reference. | ||
| if (!isBase64) return undefined; | ||
| return { mimeType: mimeType || undefined, bytes: data }; | ||
| } |
There was a problem hiding this comment.
Using a regular expression to parse potentially very large base64 data URIs can lead to high CPU usage or catastrophic backtracking (ReDoS) under load. Replacing the regex with simple string slicing and splitting is significantly faster, safer, and immune to regex-based performance issues.
| function parseDataUri( | |
| url: string | |
| ): { mimeType?: string; bytes: string } | undefined { | |
| const match = /^data:([^;,]*)?(;base64)?,(.*)$/s.exec(url); | |
| if (!match) return undefined; | |
| const [, mimeType, isBase64, data] = match; | |
| // Only base64 inline data maps cleanly to A2A `bytes`; percent-encoded text | |
| // data uris are left as a uri reference. | |
| if (!isBase64) return undefined; | |
| return { mimeType: mimeType || undefined, bytes: data }; | |
| } | |
| function parseDataUri( | |
| url: string | |
| ): { mimeType?: string; bytes: string } | undefined { | |
| if (!url.startsWith('data:')) return undefined; | |
| const commaIdx = url.indexOf(','); | |
| if (commaIdx === -1) return undefined; | |
| const meta = url.substring(5, commaIdx); | |
| const parts = meta.split(';'); | |
| if (!parts.includes('base64')) return undefined; | |
| const mimeType = parts[0] && parts[0] !== 'base64' ? parts[0] : undefined; | |
| return { mimeType, bytes: url.substring(commaIdx + 1) }; | |
| } |
| export function getAgentName(agent: AgentLike): string { | ||
| const name = agent.__action?.name; |
There was a problem hiding this comment.
To prevent potential runtime crashes if agent is null or undefined, use optional chaining (agent?.__action) to safely access the action metadata.
| export function getAgentName(agent: AgentLike): string { | |
| const name = agent.__action?.name; | |
| export function getAgentName(agent: AgentLike): string { | |
| const name = agent?.__action?.name; |
| export function getAgentDescription(agent: AgentLike): string | undefined { | ||
| return agent.__action?.description; | ||
| } |
There was a problem hiding this comment.
To prevent potential runtime crashes if agent is null or undefined, use optional chaining (agent?.__action) to safely access the action metadata.
| export function getAgentDescription(agent: AgentLike): string | undefined { | |
| return agent.__action?.description; | |
| } | |
| export function getAgentDescription(agent: AgentLike): string | undefined { | |
| return agent?.__action?.description; | |
| } |
| import { | ||
| a2aMessageToResumeInput, | ||
| genkitPartToA2A, | ||
| genkitPartsToA2A, | ||
| } from './mapping.js'; |
There was a problem hiding this comment.
Import a2aMessageToGenkit directly from ./mapping.js so we can simplify the fresh message mapping and avoid the unsafe non-null assertion in a2aMessageToResumeInputFresh.
| import { | |
| a2aMessageToResumeInput, | |
| genkitPartToA2A, | |
| genkitPartsToA2A, | |
| } from './mapping.js'; | |
| import { | |
| a2aMessageToGenkit, | |
| a2aMessageToResumeInput, | |
| genkitPartToA2A, | |
| genkitPartsToA2A, | |
| } from './mapping.js'; |
| const input = a2aMessageToResumeInput(message); | ||
| // `a2aMessageToResumeInput` returns `{ message }` for a fresh turn. | ||
| return input.message!; | ||
| } | ||
|
|
There was a problem hiding this comment.
Simplify a2aMessageToResumeInputFresh to directly call a2aMessageToGenkit(message). The current implementation calls a2aMessageToResumeInput and uses a non-null assertion (!), which will crash with a TypeError if the message contains data parts that look like tool responses/restarts but isResume is false.
| const input = a2aMessageToResumeInput(message); | |
| // `a2aMessageToResumeInput` returns `{ message }` for a fresh turn. | |
| return input.message!; | |
| } | |
| function a2aMessageToResumeInputFresh(message: A2AMessage) { | |
| return a2aMessageToGenkit(message); | |
| } |
Introduce @genkit-ai/a2a, a new plugin that exposes Genkit agents over the A2A (Agent2Agent) protocol using @a2a-js/sdk.
Provides GenkitA2ARequestHandler, which runs a Genkit agent turn for each incoming A2A message, streams output back as A2A task events, and derives the agent's AgentCard automatically. The A2A contextId maps to the Genkit sessionId so server-managed agents resume sessions across tasks.
Includes README with installation, quick start, and usage details.