Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/lemon-ties-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@livekit/agents-plugin-bey': patch
'@livekit/agents': patch
---

Add Session Connection Options and Fix Blocking Speech from High-latency LLM Generation
1 change: 0 additions & 1 deletion agents/src/inference/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ export class LLM extends llm.LLM {
this.client = new OpenAI({
baseURL: this.opts.baseURL,
apiKey: '', // leave a temporary empty string to avoid OpenAI complain about missing key
timeout: 15000,
});
}

Expand Down
7 changes: 5 additions & 2 deletions agents/src/inference/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,17 @@ export class SynthesizeStream<TModel extends TTSModels> extends BaseSynthesizeSt

const createInputTask = async () => {
for await (const data of this.input) {
if (this.abortController.signal.aborted) break;
if (this.abortController.signal.aborted || closing) break;
if (data === SynthesizeStream.FLUSH_SENTINEL) {
sendTokenizerStream.flush();
continue;
}
sendTokenizerStream.pushText(data);
}
sendTokenizerStream.endInput();
// Only call endInput if the stream hasn't been closed by cleanup
if (!closing) {
sendTokenizerStream.endInput();
}
};

const createSentenceStreamTask = async () => {
Expand Down
4 changes: 2 additions & 2 deletions agents/src/llm/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { APIConnectionError, APIError } from '../_exceptions.js';
import { log } from '../log.js';
import type { LLMMetrics } from '../metrics/base.js';
import { recordException, traceTypes, tracer } from '../telemetry/index.js';
import type { APIConnectOptions } from '../types.js';
import { type APIConnectOptions, intervalForRetry } from '../types.js';
import { AsyncIterableQueue, delay, startSoon, toError } from '../utils.js';
import { type ChatContext, type ChatRole, type FunctionCall } from './chat_context.js';
import type { ToolChoice, ToolContext } from './tool_context.js';
Expand Down Expand Up @@ -158,7 +158,7 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
);
} catch (error) {
if (error instanceof APIError) {
const retryInterval = this._connOptions._intervalForRetry(i);
const retryInterval = intervalForRetry(this._connOptions, i);

if (this._connOptions.maxRetry === 0 || !error.retryable) {
this.emitError({ error, recoverable: false });
Expand Down
9 changes: 5 additions & 4 deletions agents/src/stt/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
import type { AudioFrame } from '@livekit/rtc-node';
import { log } from '../log.js';
import type { APIConnectOptions } from '../types.js';
import type { VAD, VADStream } from '../vad.js';
import { VADEventType } from '../vad.js';
import type { SpeechEvent } from './stt.js';
Expand All @@ -28,8 +29,8 @@ export class StreamAdapter extends STT {
return this.#stt.recognize(frame);
}

stream(): StreamAdapterWrapper {
return new StreamAdapterWrapper(this.#stt, this.#vad);
stream(options?: { connOptions?: APIConnectOptions }): StreamAdapterWrapper {
return new StreamAdapterWrapper(this.#stt, this.#vad, options?.connOptions);
}
}

Expand All @@ -38,8 +39,8 @@ export class StreamAdapterWrapper extends SpeechStream {
#vadStream: VADStream;
label: string;

constructor(stt: STT, vad: VAD) {
super(stt);
constructor(stt: STT, vad: VAD, connOptions?: APIConnectOptions) {
super(stt, undefined, connOptions);
this.#stt = stt;
this.#vadStream = vad.stream();
this.label = `stt.StreamAdapterWrapper<${this.#stt.label}>`;
Expand Down
8 changes: 5 additions & 3 deletions agents/src/stt/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { calculateAudioDurationSeconds } from '../audio.js';
import { log } from '../log.js';
import type { STTMetrics } from '../metrics/base.js';
import { DeferredReadableStream } from '../stream/deferred_stream.js';
import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js';
import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS, intervalForRetry } from '../types.js';
import type { AudioBuffer } from '../utils.js';
import { AsyncIterableQueue, delay, startSoon, toError } from '../utils.js';

Expand Down Expand Up @@ -133,8 +133,10 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter<STTCal
/**
* Returns a {@link SpeechStream} that can be used to push audio frames and receive
* transcriptions
*
* @param options - Optional configuration including connection options
*/
abstract stream(): SpeechStream;
abstract stream(options?: { connOptions?: APIConnectOptions }): SpeechStream;

async close(): Promise<void> {
return;
Expand Down Expand Up @@ -196,7 +198,7 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
return await this.run();
} catch (error) {
if (error instanceof APIError) {
const retryInterval = this._connOptions._intervalForRetry(i);
const retryInterval = intervalForRetry(this._connOptions, i);

if (this._connOptions.maxRetry === 0 || !error.retryable) {
this.emitError({ error, recoverable: false });
Expand Down
9 changes: 5 additions & 4 deletions agents/src/tts/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { SentenceStream, SentenceTokenizer } from '../tokenize/index.js';
import type { APIConnectOptions } from '../types.js';
import { Task } from '../utils.js';
import type { ChunkedStream } from './tts.js';
import { SynthesizeStream, TTS } from './tts.js';
Expand All @@ -27,8 +28,8 @@ export class StreamAdapter extends TTS {
return this.#tts.synthesize(text);
}

stream(): StreamAdapterWrapper {
return new StreamAdapterWrapper(this.#tts, this.#sentenceTokenizer);
stream(options?: { connOptions?: APIConnectOptions }): StreamAdapterWrapper {
return new StreamAdapterWrapper(this.#tts, this.#sentenceTokenizer, options?.connOptions);
}
}

Expand All @@ -37,8 +38,8 @@ export class StreamAdapterWrapper extends SynthesizeStream {
#sentenceStream: SentenceStream;
label: string;

constructor(tts: TTS, sentenceTokenizer: SentenceTokenizer) {
super(tts);
constructor(tts: TTS, sentenceTokenizer: SentenceTokenizer, connOptions?: APIConnectOptions) {
super(tts, connOptions);
this.#tts = tts;
this.#sentenceStream = sentenceTokenizer.stream();
this.label = `tts.StreamAdapterWrapper<${this.#tts.label}>`;
Expand Down
10 changes: 6 additions & 4 deletions agents/src/tts/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { log } from '../log.js';
import type { TTSMetrics } from '../metrics/base.js';
import { DeferredReadableStream } from '../stream/deferred_stream.js';
import { recordException, traceTypes, tracer } from '../telemetry/index.js';
import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js';
import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS, intervalForRetry } from '../types.js';
import { AsyncIterableQueue, delay, mergeFrames, startSoon, toError } from '../utils.js';

/** SynthesizedAudio is a packet of speech synthesis as returned by the TTS. */
Expand Down Expand Up @@ -94,8 +94,10 @@ export abstract class TTS extends (EventEmitter as new () => TypedEmitter<TTSCal

/**
* Returns a {@link SynthesizeStream} that can be used to push text and receive audio data
*
* @param options - Optional configuration including connection options
*/
abstract stream(): SynthesizeStream;
abstract stream(options?: { connOptions?: APIConnectOptions }): SynthesizeStream;

async close(): Promise<void> {
return;
Expand Down Expand Up @@ -186,7 +188,7 @@ export abstract class SynthesizeStream
);
} catch (error) {
if (error instanceof APIError) {
const retryInterval = this._connOptions._intervalForRetry(i);
const retryInterval = intervalForRetry(this._connOptions, i);

if (this._connOptions.maxRetry === 0 || !error.retryable) {
this.emitError({ error, recoverable: false });
Expand Down Expand Up @@ -454,7 +456,7 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized
);
} catch (error) {
if (error instanceof APIError) {
const retryInterval = this._connOptions._intervalForRetry(i);
const retryInterval = intervalForRetry(this._connOptions, i);

if (this._connOptions.maxRetry === 0 || !error.retryable) {
this.emitError({ error, recoverable: false });
Expand Down
90 changes: 57 additions & 33 deletions agents/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,66 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
export class APIConnectOptions {
/** Maximum number of retries to connect to the API. */
readonly maxRetry: number;
/** Interval between retries to connect to the API in milliseconds. */
readonly retryIntervalMs: number;
/** Timeout for connecting to the API in milliseconds. */
readonly timeoutMs: number;

constructor(options: Partial<APIConnectOptions> = {}) {
this.maxRetry = options.maxRetry ?? 3;
this.retryIntervalMs = options.retryIntervalMs ?? 2000;
this.timeoutMs = options.timeoutMs ?? 10000;
/**
* Connection options for API calls, controlling retry and timeout behavior.
*/
export interface APIConnectOptions {
/** Maximum number of retries to connect to the API. Default: 3 */
maxRetry: number;
/** Interval between retries to connect to the API in milliseconds. Default: 2000 */
retryIntervalMs: number;
/** Timeout for connecting to the API in milliseconds. Default: 10000 */
timeoutMs: number;
}

if (this.maxRetry < 0) {
throw new Error('maxRetry must be greater than or equal to 0');
}
if (this.retryIntervalMs < 0) {
throw new Error('retryIntervalMs must be greater than or equal to 0');
}
if (this.timeoutMs < 0) {
throw new Error('timeoutMs must be greater than or equal to 0');
}
}
export const DEFAULT_API_CONNECT_OPTIONS: APIConnectOptions = {
maxRetry: 3,
retryIntervalMs: 2000,
timeoutMs: 10000,
};

/** @internal */
_intervalForRetry(numRetries: number): number {
/**
* Return the interval for the given number of retries.
*
* The first retry is immediate, and then uses specified retryIntervalMs
*/
if (numRetries === 0) {
return 0.1;
}
return this.retryIntervalMs;
/**
* Return the interval for the given number of retries.
* The first retry is immediate, and then uses specified retryIntervalMs.
* @internal
*/
export function intervalForRetry(connOptions: APIConnectOptions, numRetries: number): number {
if (numRetries === 0) {
return 0.1;
}
return connOptions.retryIntervalMs;
}

/**
* Connection options for the agent session, controlling retry and timeout behavior
* for STT, LLM, and TTS connections.
*/
export interface SessionConnectOptions {
/** Connection options for speech-to-text. */
sttConnOptions?: Partial<APIConnectOptions>;
/** Connection options for the language model. */
llmConnOptions?: Partial<APIConnectOptions>;
/** Connection options for text-to-speech. */
ttsConnOptions?: Partial<APIConnectOptions>;
/** Maximum number of consecutive unrecoverable errors from LLM or TTS before closing the session. Default: 3 */
maxUnrecoverableErrors?: number;
}

/**
* Resolved session connect options with all values populated.
* @internal
*/
export interface ResolvedSessionConnectOptions {
sttConnOptions: APIConnectOptions;
llmConnOptions: APIConnectOptions;
ttsConnOptions: APIConnectOptions;
maxUnrecoverableErrors: number;
}

export const DEFAULT_API_CONNECT_OPTIONS = new APIConnectOptions();
export const DEFAULT_SESSION_CONNECT_OPTIONS: ResolvedSessionConnectOptions = {
sttConnOptions: DEFAULT_API_CONNECT_OPTIONS,
llmConnOptions: DEFAULT_API_CONNECT_OPTIONS,
ttsConnOptions: DEFAULT_API_CONNECT_OPTIONS,
maxUnrecoverableErrors: 3,
};
8 changes: 6 additions & 2 deletions agents/src/voice/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ export class Agent<UserData = any> {
wrapped_stt = new STTStreamAdapter(wrapped_stt, agent.vad);
}

const stream = wrapped_stt.stream();
const connOptions = activity.agentSession.connOptions.sttConnOptions;
const stream = wrapped_stt.stream({ connOptions });
stream.updateInputStream(audio);

return new ReadableStream({
Expand Down Expand Up @@ -304,11 +305,13 @@ export class Agent<UserData = any> {

// TODO(brian): make parallelToolCalls configurable
const { toolChoice } = modelSettings;
const connOptions = activity.agentSession.connOptions.llmConnOptions;

const stream = activity.llm.chat({
chatCtx,
toolCtx,
toolChoice,
connOptions,
parallelToolCalls: true,
});
return new ReadableStream({
Expand Down Expand Up @@ -340,7 +343,8 @@ export class Agent<UserData = any> {
wrapped_tts = new TTSStreamAdapter(wrapped_tts, new BasicSentenceTokenizer());
}

const stream = wrapped_tts.stream();
const connOptions = activity.agentSession.connOptions.ttsConnOptions;
const stream = wrapped_tts.stream({ connOptions });
stream.updateInputStream(text);

return new ReadableStream({
Expand Down
Loading