Implement BEP-009 streaming primitives#3215
Conversation
Decomposes LLM streaming into three composable primitives — generic SSE connection, batched event retrieval, and provider-aware accumulator — wired together by Baml-level orchestration that shares retry/fallback/ round-robin logic with non-streaming calls. New crate modules: - sys_native/src/sse_parser.rs: incremental W3C SSE parser - sys_llm/src/stream_accumulator.rs: provider-aware delta extraction (OpenAI choices[0].delta.content, Anthropic content_block_delta) New resource types: SseStream, StreamAccumulator New sys ops: fetch_sse, sse_stream_next/close, new_stream_accumulator, add_events, content, is_done, build_request_stream, partial_parse, emit_partial, emit_tick New Baml orchestration: stream_primitive, execute_client_stream, execute_client_once_stream, stream_llm_function Python bridge: CallContext pyclass bundles tracing, collectors, cancellation, and streaming callbacks into a single context object passed to call_function/call_function_sync.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds end-to-end SSE streaming for LLM calls: new streaming orchestration and runtime APIs, SSE HTTP ops and incremental parser, provider-aware stream accumulators, propagation of stream/tick callbacks through engine/VM/Python bridge, and sys-op/native plumbing for emitting partials and ticks. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Caller
participant Runtime as BamlRuntime
participant Engine as BexEngine
participant SysOps as SysOpLayer
participant HTTP as Native HTTP SSE
participant Acc as StreamAccumulator
participant Callback as UserCallbacks
Client->>Runtime: call_function(name,args, CallContext{stream_cb,tick_cb})
Runtime->>Engine: call_function(FunctionCallContext{stream_callback,tick_callback})
Engine->>SysOps: execute_sys_op(request with {"stream":true})
SysOps->>HTTP: send_sse_async(request)
HTTP->>HTTP: background task parses SSE bytes -> SseEvent[]
HTTP->>Acc: register new accumulator / add_events(events_json)
loop per batch
Acc->>Engine: partial_content
Engine->>Callback: stream_callback(partial) (deduped via last_emitted_partial)
Engine->>Callback: tick_callback(raw_events)
end
Acc->>Engine: is_done() -> true
Engine->>Acc: get_content() -> final_value
Engine-->>Runtime: ExecutionResult(final_value or error)
Runtime-->>Client: return final value / throw on failure
sequenceDiagram
participant Parser as SseParser
participant Buffer as SseBuffer
Parser->>Parser: new()
loop incoming bytes
Parser->>Parser: feed(chunk)
Parser->>Parser: parse lines, aggregate fields
alt blank line completes event
Parser->>Buffer: emit SseEvent{event,data,id}
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Merging this PR will degrade performance by 33.14%
|
| Mode | Benchmark | BASE |
HEAD |
Efficiency | |
|---|---|---|---|---|---|
| ❌ | WallTime | bench_scale_100_functions |
2 ms | 2.4 ms | -18.01% |
| ❌ | WallTime | bench_single_simple_file |
996.8 µs | 1,441.1 µs | -30.83% |
| ❌ | WallTime | bench_incremental_modify_function |
171.8 µs | 220.7 µs | -22.15% |
| ❌ | WallTime | bench_scale_deep_nesting |
1.5 ms | 1.9 ms | -22.63% |
| ❌ | WallTime | bench_incremental_rename_type |
1.2 ms | 1.7 ms | -25.64% |
| ❌ | WallTime | bench_incremental_add_string_char |
985.3 µs | 1,424.2 µs | -30.82% |
| ❌ | WallTime | bench_incremental_add_user_field |
1.1 ms | 1.5 ms | -28.57% |
| ❌ | WallTime | bench_empty_project |
905.8 µs | 1,354.8 µs | -33.14% |
| ❌ | WallTime | bench_incremental_add_field |
172.2 µs | 220.2 µs | -21.79% |
| ❌ | WallTime | bench_incremental_add_new_file |
164.6 µs | 217.3 µs | -24.24% |
| ❌ | WallTime | bench_incremental_add_attribute |
985.5 µs | 1,424.2 µs | -30.8% |
| ❌ | WallTime | bench_incremental_no_change |
115.3 µs | 164.7 µs | -29.98% |
| ❌ | WallTime | bench_incremental_close_string |
990.1 µs | 1,427.6 µs | -30.65% |
Comparing bep009-streaming-implementation (34ca454) with canary (f32fa20)
Footnotes
-
91 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports. ↩
Binary size checks passed✅ 7 passed
Generated by |
There was a problem hiding this comment.
Actionable comments posted: 14
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7c320720-0f3f-41ec-a595-a2ded4447dfd
⛔ Files ignored due to path filters (9)
baml_language/Cargo.lockis excluded by!**/*.lockbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____01_lexer__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____02_parser__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____03_hir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_5_mir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_tir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____06_codegen.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/tests/bytecode_format/snapshots/bytecode_format__bytecode_display_expanded_unoptimized.snapis excluded by!**/*.snap
📒 Files selected for processing (18)
baml_language/crates/baml_builtins/baml/llm.bamlbaml_language/crates/baml_builtins/src/lib.rsbaml_language/crates/bex_engine/src/function_call_context.rsbaml_language/crates/bex_engine/src/lib.rsbaml_language/crates/bex_external_types/src/bex_external_value.rsbaml_language/crates/bex_resource_types/src/lib.rsbaml_language/crates/bridge_ctypes/src/handle_table.rsbaml_language/crates/bridge_python/src/lib.rsbaml_language/crates/bridge_python/src/runtime.rsbaml_language/crates/sys_llm/Cargo.tomlbaml_language/crates/sys_llm/src/lib.rsbaml_language/crates/sys_llm/src/stream_accumulator.rsbaml_language/crates/sys_native/Cargo.tomlbaml_language/crates/sys_native/src/lib.rsbaml_language/crates/sys_native/src/ops/http.rsbaml_language/crates/sys_native/src/registry.rsbaml_language/crates/sys_native/src/sse_parser.rsbaml_language/crates/sys_types/src/lib.rs
…return Use type narrowing (null guard + early return) to replace `match (llm_client.retry)` patterns in build_plan_with_state, execute_client, and execute_client_stream. This reduces nesting and reads more naturally.
There was a problem hiding this comment.
Actionable comments posted: 3
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 4ded66d1-bd47-4276-a5d0-1780fc24e15f
⛔ Files ignored due to path filters (5)
baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____01_lexer__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____02_parser__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____03_hir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_5_mir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____06_codegen.snapis excluded by!**/*.snap
📒 Files selected for processing (1)
baml_language/crates/baml_builtins/baml/llm.baml
Review fixes: - SSE error now sets buf.done=true to prevent caller hang on retry - stream_primitive checks accumulator.is_done() for early exit and returns failure on truncated streams - new_accumulator rejects unsupported providers (google-ai, aws-bedrock etc.) instead of silently ignoring their events - Poisoned mutex recovery in emit_partial deduplication - Remove redundant event_type.clear() after mem::take in SSE parser - Make futures/serde_json optional behind bundle-http feature gate Refactor: - Introduce PerCallContext struct bundling call_id, cancel, stream_callback, tick_callback — replaces loose parameters in run_event_loop_with_epoch and execute_sys_op, removing the #[allow(clippy::too_many_arguments)] annotation
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
baml_language/crates/baml_builtins/baml/llm.baml (1)
321-355:⚠️ Potential issue | 🟠 MajorStreaming failures still escape the retry/fallback envelope.
Any throw from
fetch_sse()/next(),new_stream_accumulator(),add_events(),emit_tick(),emit_partial(),partial_parse(), or finalparse()bypassesExecutionResult { ok: false }, soexecute_client_stream()never gets a chance to retry or move to the next fallback client. Once the SSE resource has been opened, that same path can also skipsse.close(). Please wrap this body in the same internal failure envelope used by the non-streaming orchestrator, and only throw fromstream_llm_function()after all attempts fail.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d0998c1b-3d7a-4016-8f85-44dda632801e
⛔ Files ignored due to path filters (5)
baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____01_lexer__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____02_parser__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____03_hir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_5_mir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____06_codegen.snapis excluded by!**/*.snap
📒 Files selected for processing (8)
baml_language/crates/baml_builtins/baml/llm.bamlbaml_language/crates/bex_engine/src/function_call_context.rsbaml_language/crates/bex_engine/src/lib.rsbaml_language/crates/sys_llm/src/stream_accumulator.rsbaml_language/crates/sys_native/Cargo.tomlbaml_language/crates/sys_native/src/lib.rsbaml_language/crates/sys_native/src/ops/http.rsbaml_language/crates/sys_native/src/sse_parser.rs
… poisoning - Replace serde_json unwrap with proper error in sse_stream_next - Return LlmOpError instead of panic in execute_partial_parse for non-string types - Remove early loop exit on accumulator.is_done() to fully drain SSE events - Add SseDropGuard to prevent consumer hangs on task cancellation - Extract token usage from OpenAI and Anthropic streaming events - Use unwrap_or_else(PoisonError::into_inner) consistently for lock acquisition
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
baml_language/crates/sys_types/src/lib.rs (1)
997-1009: 🧹 Nitpick | 🔵 TrivialConsider adding unit tests for the new streaming methods.
The new
SysOpLlmstreaming methods (lines 845-915) lack direct unit test coverage. While the existing tests validate the test context setup, consider adding tests that verify error propagation for the accumulator methods, potentially by mockingsys_llm::stream_accumulatorfunctions.As per coding guidelines: "Prefer writing Rust unit tests over integration tests where possible."
baml_language/crates/sys_native/src/registry.rs (1)
326-330:⚠️ Potential issue | 🟡 MinorSame inconsistency in
ResourceRegistryRef::remove.Line 328 uses
.unwrap()instead of the.unwrap_or_else(std::sync::PoisonError::into_inner)pattern used elsewhere.🛠️ Proposed fix
impl ResourceRegistryRef for ResourceRegistry { fn remove(&self, key: usize) { - self.entries.write().unwrap().remove(&key); + self.entries.write().unwrap_or_else(std::sync::PoisonError::into_inner).remove(&key); } }
♻️ Duplicate comments (1)
baml_language/crates/baml_builtins/baml/llm.baml (1)
332-347:⚠️ Potential issue | 🟠 MajorBreak the SSE loop as soon as
accumulator.is_done()flips true.The post-loop check at Lines 345-347 avoids truncated success, but the loop still waits for
sse.next() == null. If a provider emits its terminal event and keeps the socket open, this attempt blocks longer than necessary.Suggested fix
while (true) { let events = sse.next(); if (events == null) { break; } accumulator.add_events(events); baml.stream.emit_tick(events); let content = accumulator.content(); let parsed = primitive.partial_parse(content, return_type); baml.stream.emit_partial(parsed); + + if (accumulator.is_done()) { + break; + } }
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 4376c445-0dc6-40d7-b1d1-b8883a2dd324
⛔ Files ignored due to path filters (5)
baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____01_lexer__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____02_parser__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____03_hir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_5_mir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____06_codegen.snapis excluded by!**/*.snap
📒 Files selected for processing (6)
baml_language/crates/baml_builtins/baml/llm.bamlbaml_language/crates/sys_llm/src/lib.rsbaml_language/crates/sys_llm/src/stream_accumulator.rsbaml_language/crates/sys_native/src/ops/http.rsbaml_language/crates/sys_native/src/registry.rsbaml_language/crates/sys_types/src/lib.rs
… final value Use partial_parse instead of parse for the final value in stream_primitive, since the accumulator provides raw extracted content rather than the provider's JSON envelope. Add 8 streaming integration tests covering SSE primitives and full OpenAI streaming orchestration with wiremock.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (4)
baml_language/crates/baml_builtins/baml/llm.baml (4)
332-347:⚠️ Potential issue | 🟠 MajorStop the loop on logical completion, not only on socket EOF.
This still waits for
sse.next() == nullbefore leaving the loop. If the accumulator reaches done before the provider closes the connection, streaming can hang until transport EOF instead of finishing immediately.Suggested fix
while (true) { let events = sse.next(); if (events == null) { break; } accumulator.add_events(events); baml.stream.emit_tick(events); let content = accumulator.content(); let parsed = primitive.partial_parse(content, return_type); baml.stream.emit_partial(parsed); + + if (accumulator.is_done()) { + break; + } } sse.close(); if (accumulator.is_done() == false) {
328-343:⚠️ Potential issue | 🟠 MajorGuarantee
sse.close()on exceptional exits too.
close()only runs on the happy path. A throw fromsse.next(),add_events,emit_tick, orpartial_parseexits this function before cleanup, leaving the SSE resource alive across retries or fallback attempts.
330-352:⚠️ Potential issue | 🟠 MajorFail fast for return types that
partial_parse()cannot handle.This path now uses
partial_parse(...)for both incremental updates and the final value, but the streaming contract in this PR limits partial parsing to string outputs. Any streamed function with a non-string return type will fail after the SSE request has already started.Either validate
return_typebefore opening the stream, or keep streaming disabled for unsupported return types. Based on learnings: New language features for BAML require coordinated updates across Parser (parser-database), IR/validation (baml-core), Compiler (baml-compiler), and VM (baml-vm)
444-450:⚠️ Potential issue | 🔴 CriticalGuard empty round-robin streaming clients before
% length().This branch still divides by zero when
sub_clientsis empty.build_attempt_with_state()already treats that shape as empty on Lines 89-102; the executor should returnok: falsehere instead of throwing.Suggested fix
baml.llm.ClientType.RoundRobin => { - let idx = baml.llm.round_robin_next(llm_client.name) % llm_client.sub_clients.length(); - baml.llm.execute_client_stream( - llm_client.sub_clients.at(idx), - context, - active_delay_ms, - ) + if (llm_client.sub_clients.length() == 0) { + baml.llm.ExecutionResult { ok: false, value: null } + } else { + let idx = baml.llm.round_robin_next(llm_client.name) % llm_client.sub_clients.length(); + baml.llm.execute_client_stream( + llm_client.sub_clients.at(idx), + context, + active_delay_ms, + ) + } }
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: cebcc780-d15b-4e84-83df-b7fb230b63c8
⛔ Files ignored due to path filters (5)
baml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____01_lexer__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____02_parser__llm.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____03_hir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____04_5_mir.snapis excluded by!**/*.snapbaml_language/crates/baml_tests/snapshots/__baml_std__/baml_tests____baml_std____06_codegen.snapis excluded by!**/*.snap
📒 Files selected for processing (3)
baml_language/crates/baml_builtins/baml/llm.bamlbaml_language/crates/baml_tests/src/engine.rsbaml_language/crates/baml_tests/tests/streaming.rs
- Wrap partial_parse in catch so mid-stream parse failures are silently skipped, matching legacy behavior for incomplete structured content. - Add accumulator metadata accessors: model(), finish_reason(), input_tokens(), output_tokens() as new sys ops. - Add final_parse sys op on PrimitiveClient for strict parsing of the final accumulated content (separate from permissive partial_parse). - Update stream_primitive to use final_parse for the completed value.
Summary
sse_parser.rs), provider-aware delta extraction for OpenAI and Anthropic (stream_accumulator.rs), and new resource types (SseStream,StreamAccumulator)stream_primitive,execute_client_stream,stream_llm_functionCallContextpyclass bundles tracing, collectors, cancellation, and streaming callbacksllm.bamlto use null guard + early return instead ofmatchon nullableretryfield, leveraging type narrowingNew sys ops
fetch_sseSseStream.next/closePrimitiveClient.new_stream_accumulatorStreamAccumulator.add_events/content/is_donePrimitiveClient.build_request_stream"stream": trueto request bodyPrimitiveClient.partial_parseemit_partial/emit_tickBug found:
continuenot supported as catch arm expressionWhile refactoring
llm.bamlto eliminate theExecutionResult { ok, value }wrapper using throw/catch, we discovered thatcontinue(and likelybreak) cannot be used inside catch arms:The intended pattern was to catch errors in retry/fallback loops and skip to the next iteration. Since catch arms require an expression and
continueis parsed as a statement (not an expression), this pattern is rejected at parse time.Workaround:
ExecutionResult { ok: bool, value: unknown }is kept for internal retry/fallback orchestration where failures are expected and handled. The top-level functions (call_llm_function,stream_llm_function) usethrowat the boundary.Suggested fix: Treat
continue,break,return, andthrowuniformly as diverging expressions valid in any expression position (they already work this way for type narrowing purposes).CodeRabbit review fixes
buf.done = true— prevents caller hang when retrying after stream error (background task exited without marking stream done)stream_primitivechecksaccumulator.is_done()— breaks out of loop early when provider signals completion ([DONE]/finish_reason), and returnsok: falseif stream was truncatednew_accumulatorrejects unsupported providers — returns error for google-ai, aws-bedrock, etc. instead of silently ignoring their events inextract_deltaemit_partialdeduplication usesunwrap_or_else(PoisonError::into_inner)instead of panickingevent_type.clear()— removed aftermem::takewhich already leaves empty stringfutures/serde_jsonoptional — gated behindbundle-httpfeature to reduce compile time without SSEPerCallContextstruct — replaces 4 loose parameters (call_id,cancel,stream_callback,tick_callback) inrun_event_loop_with_epochandexecute_sys_op, removing the#[allow(clippy::too_many_arguments)]Follow-up compiler fixes
primitive.partial_parse(...)call_llm_functionandstream_llm_functionnow declareInvalidArgument, matchingget_jinja_templateandget_clientUnreachable catch armwarning andE0096InvalidArgumentthrows-contract errors that were being emitted from builtinllm.bamlacross diagnostics snapshotsTest plan
--workspace --all-targets --all-features -- -D warningsSummary by CodeRabbit
New Features
Improvements
SDK / Python
Tests