Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions baml_language/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

297 changes: 240 additions & 57 deletions baml_language/crates/baml_builtins/baml/llm.baml
Original file line number Diff line number Diff line change
Expand Up @@ -120,36 +120,36 @@ function build_plan_with_state(
llm_client: baml.llm.Client,
planner_state: baml.llm.PlannerState,
) -> baml.llm.OrchestrationStep[] {
match (llm_client.retry) {
null => baml.llm.build_attempt_with_state(llm_client, planner_state),
r: baml.llm.RetryPolicy => {
let result: baml.llm.OrchestrationStep[] = [];
let current_delay = r.initial_delay_ms + 0.0;

for (let attempt = 0; attempt <= r.max_retries; attempt += 1) {
let delay = 0;
if (attempt > 0) {
delay = baml.math.trunc(current_delay);
let next = current_delay * r.multiplier;
if (next > r.max_delay_ms + 0.0) {
current_delay = r.max_delay_ms + 0.0;
} else {
current_delay = next;
}
}
let retry = llm_client.retry;
if (retry == null) {
return baml.llm.build_attempt_with_state(llm_client, planner_state);
}

let inner = baml.llm.build_attempt_with_state(llm_client, planner_state);
for (let step in inner) {
result.push(baml.llm.OrchestrationStep {
primitive_client: step.primitive_client,
delay_ms: delay,
});
}
let result: baml.llm.OrchestrationStep[] = [];
let current_delay = retry.initial_delay_ms + 0.0;

for (let attempt = 0; attempt <= retry.max_retries; attempt += 1) {
let delay = 0;
if (attempt > 0) {
delay = baml.math.trunc(current_delay);
let next = current_delay * retry.multiplier;
if (next > retry.max_delay_ms + 0.0) {
current_delay = retry.max_delay_ms + 0.0;
} else {
current_delay = next;
}
}

result
let inner = baml.llm.build_attempt_with_state(llm_client, planner_state);
for (let step in inner) {
result.push(baml.llm.OrchestrationStep {
primitive_client: step.primitive_client,
delay_ms: delay,
});
}
}

result
}

/// Execute a client with retry semantics, evaluating strategies lazily.
Expand All @@ -161,46 +161,46 @@ function execute_client(
context: baml.llm.ExecutionContext,
inherited_delay_ms: int,
) -> baml.llm.ExecutionResult {
match (llm_client.retry) {
null => baml.llm.execute_client_once(
let retry = llm_client.retry;
if (retry == null) {
return baml.llm.execute_client_once(
llm_client,
context,
inherited_delay_ms,
),
r: baml.llm.RetryPolicy => {
let current_delay = r.initial_delay_ms + 0.0;

for (let attempt = 0; attempt <= r.max_retries; attempt += 1) {
let attempt_delay = inherited_delay_ms;
if (attempt > 0) {
attempt_delay = baml.math.trunc(current_delay);
let next = current_delay * r.multiplier;
if (next > r.max_delay_ms + 0.0) {
current_delay = r.max_delay_ms + 0.0;
} else {
current_delay = next;
}
}
// Legacy behavior: the last local retry attempt carries no local
// delay (fallback to inherited delay from outer wrappers, if any).
if (attempt == r.max_retries) {
attempt_delay = inherited_delay_ms;
}
);
}

let result = baml.llm.execute_client_once(
llm_client,
context,
attempt_delay,
);
let current_delay = retry.initial_delay_ms + 0.0;

if (result.ok) {
return result;
}
for (let attempt = 0; attempt <= retry.max_retries; attempt += 1) {
let attempt_delay = inherited_delay_ms;
if (attempt > 0) {
attempt_delay = baml.math.trunc(current_delay);
let next = current_delay * retry.multiplier;
if (next > retry.max_delay_ms + 0.0) {
current_delay = retry.max_delay_ms + 0.0;
} else {
current_delay = next;
}
}
// Legacy behavior: the last local retry attempt carries no local
// delay (fallback to inherited delay from outer wrappers, if any).
if (attempt == retry.max_retries) {
attempt_delay = inherited_delay_ms;
}

baml.llm.ExecutionResult { ok: false, value: null }
let result = baml.llm.execute_client_once(
llm_client,
context,
attempt_delay,
);

if (result.ok) {
return result;
}
}

baml.llm.ExecutionResult { ok: false, value: null }
}

/// Execute a single attempt for a client (no retry expansion here).
Expand Down Expand Up @@ -250,6 +250,9 @@ function execute_client_once(
}

baml.llm.ClientType.RoundRobin => {
if (llm_client.sub_clients.length() == 0) {
return baml.llm.ExecutionResult { ok: false, value: null };
}
let idx = baml.llm.round_robin_next(llm_client.name) % llm_client.sub_clients.length();
baml.llm.execute_client(
llm_client.sub_clients.at(idx),
Expand Down Expand Up @@ -290,7 +293,7 @@ function build_request(function_name: string, args: map<string, unknown>) -> bam
/// at execution time (only when an attempt is actually reached), matching
/// legacy behavior. Nested strategies and retries are handled recursively
/// by `execute_client`.
function call_llm_function(function_name: string, args: map<string, unknown>) -> unknown throws string {
function call_llm_function(function_name: string, args: map<string, unknown>) -> unknown throws string | InvalidArgument {
let jinja_string = baml.llm.get_jinja_template(function_name);
let llm_client = baml.llm.get_client(function_name);

Expand All @@ -308,3 +311,183 @@ function call_llm_function(function_name: string, args: map<string, unknown>) ->

throw "All orchestration steps failed";
}

// ============================================================================
// Streaming Orchestration
// ============================================================================

/// Execute a streaming call against a single primitive client.
///
/// Opens an SSE connection, accumulates provider-specific events, and emits
/// partial values + raw ticks via the engine's streaming callbacks.
/// Returns the final parsed result as an `ExecutionResult`.
function stream_primitive(
primitive: baml.llm.PrimitiveClient,
context: baml.llm.ExecutionContext,
) -> baml.llm.ExecutionResult {
let prompt = primitive.render_prompt(context.jinja_string, context.args);
let specialized = primitive.specialize_prompt(prompt);
let http_request = primitive.build_request_stream(specialized);
let sse = baml.http.fetch_sse(http_request);
let accumulator = primitive.new_stream_accumulator();
let return_type = baml.llm.get_return_type(context.function_name);

while (true) {
let events = sse.next();
if (events == null) { break; }
accumulator.add_events(events);
baml.stream.emit_tick(events);

let content = accumulator.content();
// Silently skip failed parses — early chunks may produce incomplete
// content that can't parse yet. This matches legacy behavior.
let parsed = primitive.partial_parse(content, return_type) catch (_e) {
_ => null
};
if (parsed != null) {
baml.stream.emit_partial(parsed);
}
}

sse.close();

if (accumulator.is_done() == false) {
return baml.llm.ExecutionResult { ok: false, value: null };
}

let finish_reason = accumulator.finish_reason();
let finish_reason_value = match (finish_reason) {
null => "",
reason: string => reason,
};
primitive.validate_finish_reason(finish_reason_value);
let final_content = accumulator.content();
let final_value = primitive.final_parse(final_content, return_type);
baml.llm.ExecutionResult { ok: true, value: final_value }
}

/// Execute a streaming client with retry semantics, evaluating strategies lazily.
///
/// Mirrors `execute_client` but uses `stream_primitive` at the leaf level.
function execute_client_stream(
llm_client: baml.llm.Client,
context: baml.llm.ExecutionContext,
inherited_delay_ms: int,
) -> baml.llm.ExecutionResult {
let retry = llm_client.retry;
if (retry == null) {
return baml.llm.execute_client_once_stream(
llm_client,
context,
inherited_delay_ms,
);
}

let current_delay = retry.initial_delay_ms + 0.0;

for (let attempt = 0; attempt <= retry.max_retries; attempt += 1) {
let attempt_delay = inherited_delay_ms;
if (attempt > 0) {
attempt_delay = baml.math.trunc(current_delay);
let next = current_delay * retry.multiplier;
if (next > retry.max_delay_ms + 0.0) {
current_delay = retry.max_delay_ms + 0.0;
} else {
current_delay = next;
}
}
if (attempt == retry.max_retries) {
attempt_delay = inherited_delay_ms;
}

let result = baml.llm.execute_client_once_stream(
llm_client,
context,
attempt_delay,
);

if (result.ok) {
return result;
}
}

baml.llm.ExecutionResult { ok: false, value: null }
}

/// Execute a single streaming attempt for a client (no retry expansion here).
///
/// Mirrors `execute_client_once` but calls `stream_primitive` for primitive clients.
function execute_client_once_stream(
llm_client: baml.llm.Client,
context: baml.llm.ExecutionContext,
active_delay_ms: int,
) -> baml.llm.ExecutionResult {
match (llm_client.client_type) {
baml.llm.ClientType.Primitive => {
let resolve_fn = baml.llm.resolve_client(llm_client.name);
let primitive = resolve_fn();

let result = baml.llm.stream_primitive(primitive, context);

if (result.ok) {
return result;
}

if (active_delay_ms > 0) {
baml.sys.sleep(active_delay_ms);
}

baml.llm.ExecutionResult { ok: false, value: null }
}

baml.llm.ClientType.Fallback => {
for (let sub in llm_client.sub_clients) {
let result = baml.llm.execute_client_stream(
sub,
context,
active_delay_ms,
);
if (result.ok) {
return result;
}
}
baml.llm.ExecutionResult { ok: false, value: null }
}

baml.llm.ClientType.RoundRobin => {
if (llm_client.sub_clients.length() == 0) {
return baml.llm.ExecutionResult { ok: false, value: null };
}
let idx = baml.llm.round_robin_next(llm_client.name) % llm_client.sub_clients.length();
baml.llm.execute_client_stream(
llm_client.sub_clients.at(idx),
context,
active_delay_ms,
)
}
}
}

/// Stream an LLM function end-to-end with full orchestration.
///
/// The streaming counterpart of `call_llm_function`. Uses the same client tree
/// resolution and retry/fallback/round-robin logic, but opens SSE connections
/// and emits partial values via streaming callbacks.
function stream_llm_function(function_name: string, args: map<string, unknown>) -> unknown throws string | InvalidArgument {
let jinja_string = baml.llm.get_jinja_template(function_name);
let llm_client = baml.llm.get_client(function_name);

let context = baml.llm.ExecutionContext {
jinja_string: jinja_string,
args: args,
function_name: function_name,
};

let result = baml.llm.execute_client_stream(llm_client, context, 0);

if (result.ok) {
return result.value;
}

throw "All streaming orchestration steps failed";
}
Loading
Loading