feat(js/plugins/twelvelabs): add genkitx-twelvelabs plugin (Pegasus models, Marengo embedders)#5604
Conversation
…odels, Marengo embedders)
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
There was a problem hiding this comment.
Code Review
This pull request introduces a new Genkit plugin, genkitx-twelvelabs, which integrates TwelveLabs video AI capabilities, specifically Pegasus video-understanding models and Marengo multimodal embedding models. The feedback highlights three key improvements: updating the SSE event parser to correctly handle standard data: prefixes to avoid parsing errors, skipping API key resolution when no models or embedders are configured to prevent unnecessary initialization errors, and explicitly recalculating totalTokens when overriding outputTokens to ensure accurate usage statistics.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| async function* readSseEvents( | ||
| body: ReadableStream<Uint8Array> | ||
| ): AsyncGenerator<SseEvent> { | ||
| const reader = body.getReader(); | ||
| const decoder = new TextDecoder(); | ||
| let buffer = ''; | ||
| try { | ||
| for (;;) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| buffer += decoder.decode(value, { stream: true }); | ||
| let newlineIndex: number; | ||
| while ((newlineIndex = buffer.indexOf('\n')) >= 0) { | ||
| const line = buffer.slice(0, newlineIndex).trim(); | ||
| buffer = buffer.slice(newlineIndex + 1); | ||
| if (line) yield JSON.parse(line) as SseEvent; | ||
| } | ||
| } | ||
| const tail = buffer.trim(); | ||
| if (tail) yield JSON.parse(tail) as SseEvent; | ||
| } finally { | ||
| reader.cancel().catch(() => {}); | ||
| } | ||
| } |
There was a problem hiding this comment.
The readSseEvents function currently parses each line of the stream directly as JSON. However, standard Server-Sent Events (SSE) streams prefix each data line with data: (and may include other control lines like event: or empty lines). Parsing the raw line directly will result in a SyntaxError (e.g., Unexpected token d in JSON...) on the first event.
We should update the parser to only process lines starting with data: and strip the prefix before parsing.
async function* readSseEvents(
body: ReadableStream<Uint8Array>
): AsyncGenerator<SseEvent> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
for (;;) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let newlineIndex: number;
while ((newlineIndex = buffer.indexOf('\n')) >= 0) {
const line = buffer.slice(0, newlineIndex).trim();
buffer = buffer.slice(newlineIndex + 1);
if (line.startsWith('data:')) {
const data = line.slice(5).replace(/^\s/, '');
if (data) {
yield JSON.parse(data) as SseEvent;
}
}
}
}
const tail = buffer.trim();
if (tail.startsWith('data:')) {
const data = tail.slice(5).replace(/^\s/, '');
if (data) {
yield JSON.parse(data) as SseEvent;
}
}
} finally {
reader.cancel().catch(() => {});
}
}There was a problem hiding this comment.
Good catch — fixed in ebcf994. readSseEvents now parses proper SSE framing: it only handles data: lines (stripping the prefix), skips blank/comment/non-data lines, and stops on the [DONE] sentinel. Added a streaming test that exercises the framing end-to-end.
| function twelvelabsPlugin(params?: TwelveLabsPluginParams): GenkitPlugin { | ||
| const baseUrl = params?.baseUrl ?? DEFAULT_BASE_URL; | ||
| return genkitPlugin('twelvelabs', async (ai: Genkit) => { | ||
| const apiKey = resolveApiKey(params); | ||
| params?.models?.forEach((model) => | ||
| defineTwelveLabsModel(ai, { apiKey, baseUrl, model }) | ||
| ); | ||
| params?.embedders?.forEach((embedder) => | ||
| defineTwelveLabsEmbedder(ai, { apiKey, baseUrl, embedder }) | ||
| ); | ||
| }); | ||
| } |
There was a problem hiding this comment.
If the plugin is registered without any models or embedders configured (e.g., twelvelabs()), it will still attempt to resolve the API key and throw an error if TWELVELABS_API_KEY is missing. Since the plugin is fully opt-in and does nothing when no models or embedders are specified, we should skip API key resolution and registration in this case to avoid unnecessary initialization errors.
function twelvelabsPlugin(params?: TwelveLabsPluginParams): GenkitPlugin {
const baseUrl = params?.baseUrl ?? DEFAULT_BASE_URL;
return genkitPlugin('twelvelabs', async (ai: Genkit) => {
if (!params?.models?.length && !params?.embedders?.length) {
return;
}
const apiKey = resolveApiKey(params);
params?.models?.forEach((model) =>
defineTwelveLabsModel(ai, { apiKey, baseUrl, model })
);
params?.embedders?.forEach((embedder) =>
defineTwelveLabsEmbedder(ai, { apiKey, baseUrl, embedder })
);
});
}There was a problem hiding this comment.
Done in ebcf994 — the plugin now returns early when registered with no models and no embedders, so twelvelabs() no longer resolves the API key or throws.
| return { | ||
| message, | ||
| finishReason: 'stop', | ||
| usage: { ...getBasicUsageStats(input.messages, message), outputTokens }, | ||
| } as GenerateResponseData; |
There was a problem hiding this comment.
When overriding outputTokens in the returned usage statistics, totalTokens is not automatically updated and may become inconsistent with the sum of inputTokens and outputTokens. We should explicitly recalculate totalTokens to ensure accurate usage reporting.
const basicStats = getBasicUsageStats(input.messages, message);
return {
message,
finishReason: 'stop',
usage: {
...basicStats,
outputTokens,
totalTokens: (basicStats.inputTokens ?? 0) + (outputTokens ?? 0),
},
} as GenerateResponseData;There was a problem hiding this comment.
Fixed in ebcf994 — when overriding outputTokens we now also recompute totalTokens as inputTokens + outputTokens so the usage stats stay consistent. Covered by the new streaming test.
… skip empty config
- readSseEvents: parse standard `data: {json}` SSE framing, skip blank/comment/non-data lines and the [DONE] sentinel (was parsing raw lines as JSON).
- /analyze usage: recompute totalTokens when overriding outputTokens so it stays consistent with inputTokens.
- plugin: return early when registered with no models or embedders, so `twelvelabs()` no longer requires an API key.
- add a streaming model test covering SSE parsing and totalTokens.
|
Thanks for the review! I've addressed all three items in ebcf994 (SSE parsing, no-config early return, and totalTokens recomputation). Heads up on the |
Hi! I'm Mohit, I work at TwelveLabs (@mohit-twelvelabs).
This PR adds
genkitx-twelvelabs, a new opt-in plugin that brings TwelveLabs video AI to Genkit:http(s)URL; TwelveLabs fetches it server-side viaPOST /analyze, so no prior indexing is required. Supports streaming.POST /embed) that share a space with TwelveLabs video embeddings, which makes them a natural fit for video retrieval / RAG flows.Why it helps this project: Genkit has strong text and image model coverage but no dedicated video-understanding or video-embedding provider. This fills that gap and follows the exact shape of existing plugins (e.g.
genkitx-ollama): agenkitPluginfactory plus.model()/.embedder()references,defineModel/defineEmbedder, REST calls with no extra SDK dependency.Opt-in / non-breaking: Nothing is registered unless you list
modelsand/orembeddersin the plugin config. No defaults change and no existing behavior is touched. The package is a brand-new workspace underjs/plugins/twelvelabs.How it was tested:
fetch), mirroring existing plugin tests —pnpm test(4/4 pass).TWELVELABS_API_KEY(skipped when unset) that calls Marengo and asserts a 512-dim vector — verified passing against the real API./analyzepath was smoke-tested against the live API with a public sample video and returned a correct description.tsctypecheck andtsupbuild pass;prettier --checkand the repo copyright check pass.You can grab a free API key at https://twelvelabs.io — there's a generous free tier.
Checklist:
js/index.typedoc.md)