From e5e7617e119e4025d6f198602ca4b78c2a7a8d8c Mon Sep 17 00:00:00 2001 From: Nauxie Date: Wed, 14 Jan 2026 11:25:45 -0800 Subject: [PATCH 1/3] Handle chunk SSE events for token-by-token streaming --- src/hooks/useChat.ts | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/hooks/useChat.ts b/src/hooks/useChat.ts index 98f6a64..1126fc9 100644 --- a/src/hooks/useChat.ts +++ b/src/hooks/useChat.ts @@ -115,7 +115,7 @@ export function useChat(options: UseChatOptions): UseChatReturn { break; } case 'message': { - // Agent message + // Agent message (complete message, e.g., from say()) const data = event.data as { content: string; role?: string }; if (data.content) { setMessages((prev) => @@ -128,6 +128,20 @@ export function useChat(options: UseChatOptions): UseChatReturn { } break; } + case 'chunk': { + // Streaming chunk - append to existing message content + const data = event.data as { content: string }; + if (data.content) { + setMessages((prev) => + prev.map((m) => + m.id === messageId + ? { ...m, content: m.content + data.content, status: 'streaming' as const } + : m + ) + ); + } + break; + } case 'tool_call': { // Tool/function call const data = event.data as { From 6e398ad62406531f2f537f2fbce02cae228aacb5 Mon Sep 17 00:00:00 2001 From: Nauxie Date: Wed, 14 Jan 2026 11:34:00 -0800 Subject: [PATCH 2/3] typing --- src/types/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/types/index.ts b/src/types/index.ts index b30e56f..9e979be 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -36,6 +36,7 @@ export interface ToolCall { export type SSEEventType = | 'session' // Session info (session_id, is_new) | 'message' // Agent text response (content, role) + | 'chunk' // Streaming chunk (token-by-token content) | 'tool_call' // Tool/function execution | 'waiting' // Agent is processing | 'done' // Stream complete From 585603a2ab5f17ef9263bf420a51e77f473ea17d Mon Sep 17 00:00:00 2001 From: Nauxie Date: Wed, 14 Jan 2026 12:34:47 -0800 Subject: [PATCH 3/3] streaming full support --- src/api/client.ts | 4 +- src/hooks/useChat.ts | 104 ++++++++++++++++++++++++++----------------- 2 files changed, 65 insertions(+), 43 deletions(-) diff --git a/src/api/client.ts b/src/api/client.ts index 3d136a7..ab993ff 100644 --- a/src/api/client.ts +++ b/src/api/client.ts @@ -4,8 +4,8 @@ import type { SendMessageParams, } from '../types'; -export const DEFAULT_ENGINE_URL = 'https://chat-embed-deployment.onrender.com'; -// export const DEFAULT_ENGINE_URL = 'http://localhost:8003'; +//export const DEFAULT_ENGINE_URL = 'https://chat-embed-deployment.onrender.com'; +export const DEFAULT_ENGINE_URL = 'http://localhost:8003'; export function createAPIClient( engineBaseUrl: string = DEFAULT_ENGINE_URL diff --git a/src/hooks/useChat.ts b/src/hooks/useChat.ts index 1126fc9..5082cc7 100644 --- a/src/hooks/useChat.ts +++ b/src/hooks/useChat.ts @@ -55,6 +55,7 @@ export function useChat(options: UseChatOptions): UseChatReturn { const sessionStartTime = useRef(0); const isInitialized = useRef(false); + const streamBuffers = useRef>({}); // Initialize or restore session useEffect(() => { @@ -99,6 +100,31 @@ export function useChat(options: UseChatOptions): UseChatReturn { return ''; }, [config.embedId]); + const upsertAssistantMessage = useCallback((messageId: string, content: string) => { + setMessages((prev) => { + let found = false; + const next = prev.map((m) => { + if (m.id === messageId) { + found = true; + return { ...m, content, status: 'streaming' as const }; + } + return m; + }); + + if (!found) { + next.push({ + id: messageId, + role: 'assistant', + content, + timestamp: Date.now(), + status: 'streaming', + }); + } + + return next; + }); + }, []); + const handleSSEEvent = useCallback( (event: SSEEvent, messageId: string, updateSessionId: (id: string) => void) => { switch (event.type) { @@ -118,13 +144,8 @@ export function useChat(options: UseChatOptions): UseChatReturn { // Agent message (complete message, e.g., from say()) const data = event.data as { content: string; role?: string }; if (data.content) { - setMessages((prev) => - prev.map((m) => - m.id === messageId - ? { ...m, content: data.content, status: 'streaming' as const } - : m - ) - ); + streamBuffers.current[messageId] = data.content; + upsertAssistantMessage(messageId, data.content); } break; } @@ -132,13 +153,10 @@ export function useChat(options: UseChatOptions): UseChatReturn { // Streaming chunk - append to existing message content const data = event.data as { content: string }; if (data.content) { - setMessages((prev) => - prev.map((m) => - m.id === messageId - ? { ...m, content: m.content + data.content, status: 'streaming' as const } - : m - ) - ); + const current = streamBuffers.current[messageId] ?? ''; + const next = current + data.content; + streamBuffers.current[messageId] = next; + upsertAssistantMessage(messageId, next); } break; } @@ -185,13 +203,9 @@ export function useChat(options: UseChatOptions): UseChatReturn { // If tool call has content, also update the message if (data.content) { - setMessages((prev) => - prev.map((m) => - m.id === messageId - ? { ...m, content: data.content as string, status: 'streaming' as const } - : m - ) - ); + const content = data.content as string; + streamBuffers.current[messageId] = content; + upsertAssistantMessage(messageId, content); } break; } @@ -204,26 +218,31 @@ export function useChat(options: UseChatOptions): UseChatReturn { setMessages((prev) => prev.map((m) => (m.id === messageId ? { ...m, status: 'sent' as const } : m)) ); + delete streamBuffers.current[messageId]; break; } case 'completed': { // Conversation ended by agent - setMessages((prev) => - prev.map((m) => (m.id === messageId ? { ...m, status: 'sent' as const } : m)) - ); - // Mark session as completed - if (sessionId) { - storeSession(config.embedId, { - sessionId, - deploymentId: config.deploymentId, - workerId: config.workerId, - flowId: config.flowId, - startTime: sessionStartTime.current, - messages, - toolCalls, - status: 'completed', - }); - } + setMessages((prev) => { + const updatedMessages = prev.map((m) => + m.id === messageId ? { ...m, status: 'sent' as const } : m + ); + // Mark session as completed with current messages + if (sessionId) { + storeSession(config.embedId, { + sessionId, + deploymentId: config.deploymentId, + workerId: config.workerId, + flowId: config.flowId, + startTime: sessionStartTime.current, + messages: updatedMessages, + toolCalls: [], // Tool calls stored separately + status: 'completed', + }); + } + return updatedMessages; + }); + delete streamBuffers.current[messageId]; break; } case 'error': { @@ -243,7 +262,7 @@ export function useChat(options: UseChatOptions): UseChatReturn { } } }, - [config, sessionId, messages, toolCalls, onSessionStart] + [config, sessionId, onSessionStart, upsertAssistantMessage] ); const processSSEStream = useCallback( @@ -260,9 +279,12 @@ export function useChat(options: UseChatOptions): UseChatReturn { buffer += decoder.decode(value, { stream: true }); // SSE messages are separated by double newlines - while (buffer.includes('\n\n')) { - const [message, rest] = buffer.split('\n\n', 2); - buffer = rest; + // Note: We use indexOf instead of split with limit because split(str, 2) + // only returns 2 elements and discards the rest, causing dropped chunks + let idx: number; + while ((idx = buffer.indexOf('\n\n')) !== -1) { + const message = buffer.slice(0, idx); + buffer = buffer.slice(idx + 2); // Parse SSE data lines for (const line of message.split('\n')) {