From 4e13fe21c5570f3bba4c189a6452fbffc0b2a3f8 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sat, 17 Jan 2026 13:09:53 -0600 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=A4=96=20feat:=20add=20profiling=20Vi?= =?UTF-8?q?te=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vite.config.ts | 237 ++++++++++++++++++++++++++----------------------- 1 file changed, 124 insertions(+), 113 deletions(-) diff --git a/vite.config.ts b/vite.config.ts index 84e75902d4..d4348f9326 100644 --- a/vite.config.ts +++ b/vite.config.ts @@ -100,125 +100,136 @@ const basePlugins = [ tailwindcss(), ]; -export default defineConfig(({ mode }) => ({ - // This prevents mermaid initialization errors in production while allowing dev to work - plugins: mode === "development" ? [...basePlugins, topLevelAwait()] : basePlugins, - resolve: { - alias, - }, - base: "./", - build: { - outDir: "dist", - assetsDir: ".", - emptyOutDir: false, - sourcemap: true, - minify: "esbuild", - rollupOptions: { - input: { - main: path.resolve(__dirname, "index.html"), - terminal: path.resolve(__dirname, "terminal.html"), - }, - output: { - format: "es", - inlineDynamicImports: false, - sourcemapExcludeSources: false, - manualChunks(id) { - const normalizedId = id.split(path.sep).join("/"); - if (normalizedId.includes("node_modules/ai-tokenizer/encoding/")) { - const chunkName = path.basename(id, path.extname(id)); - return `tokenizer-encoding-${chunkName}`; - } - if (normalizedId.includes("node_modules/ai-tokenizer/")) { - return "tokenizer-base"; - } - return undefined; +export default defineConfig(({ mode }) => { + const isProfiling = mode === "profiling"; + const aliasMap: Record = { ...alias }; + + if (isProfiling) { + aliasMap["react-dom$"] = "react-dom/profiling"; + aliasMap["scheduler/tracing"] = "scheduler/tracing-profiling"; + } + + return { + // This prevents mermaid initialization errors in production while allowing dev to work + plugins: mode === "development" ? [...basePlugins, topLevelAwait()] : basePlugins, + resolve: { + alias: aliasMap, + }, + ...(isProfiling ? { define: { __PROFILE__: "true" } } : {}), + base: "./", + build: { + outDir: "dist", + assetsDir: ".", + emptyOutDir: false, + sourcemap: true, + minify: "esbuild", + rollupOptions: { + input: { + main: path.resolve(__dirname, "index.html"), + terminal: path.resolve(__dirname, "terminal.html"), + }, + output: { + format: "es", + inlineDynamicImports: false, + sourcemapExcludeSources: false, + manualChunks(id) { + const normalizedId = id.split(path.sep).join("/"); + if (normalizedId.includes("node_modules/ai-tokenizer/encoding/")) { + const chunkName = path.basename(id, path.extname(id)); + return `tokenizer-encoding-${chunkName}`; + } + if (normalizedId.includes("node_modules/ai-tokenizer/")) { + return "tokenizer-base"; + } + return undefined; + }, }, }, + chunkSizeWarningLimit: 2000, + target: "esnext", }, - chunkSizeWarningLimit: 2000, - target: "esnext", - }, - worker: { - format: "es", - plugins: () => [topLevelAwait()], - }, - server: { - host: devServerHost, // Configurable via MUX_VITE_HOST (defaults to 127.0.0.1 for security) - port: devServerPort, - strictPort: true, - allowedHosts: devServerAllowedHosts, - - proxy: { - "/orpc": { - target: backendProxyTarget, - changeOrigin: true, - ws: true, - }, - "/api": { - target: backendProxyTarget, - changeOrigin: true, - }, - "/auth": { - target: backendProxyTarget, - // Preserve the original Host so mux can generate OAuth redirect URLs that - // point back to the public dev-server origin (not 127.0.0.1:3000). - changeOrigin: false, - }, - "/health": { - target: backendProxyTarget, - changeOrigin: true, - }, - "/version": { - target: backendProxyTarget, - changeOrigin: true, - }, + worker: { + format: "es", + plugins: () => [topLevelAwait()], }, - sourcemapIgnoreList: () => false, // Show all sources in DevTools - - watch: { - // Ignore node_modules to drastically reduce file handle usage - ignored: ["**/node_modules/**", "**/dist/**", "**/.git/**"], - - // Use polling on Windows to avoid file handle exhaustion - // This is slightly less efficient but much more stable - usePolling: process.platform === "win32", - - // If using polling, set a reasonable interval (in milliseconds) - interval: 1000, - - // Additional options for Windows specifically - ...(process.platform === "win32" && { - // Increase the binary interval for better Windows performance - binaryInterval: 1000, - // Use a more conservative approach to watching - awaitWriteFinish: { - stabilityThreshold: 500, - pollInterval: 100, + server: { + host: devServerHost, // Configurable via MUX_VITE_HOST (defaults to 127.0.0.1 for security) + port: devServerPort, + strictPort: true, + allowedHosts: devServerAllowedHosts, + + proxy: { + "/orpc": { + target: backendProxyTarget, + changeOrigin: true, + ws: true, }, - }), - }, + "/api": { + target: backendProxyTarget, + changeOrigin: true, + }, + "/auth": { + target: backendProxyTarget, + // Preserve the original Host so mux can generate OAuth redirect URLs that + // point back to the public dev-server origin (not 127.0.0.1:3000). + changeOrigin: false, + }, + "/health": { + target: backendProxyTarget, + changeOrigin: true, + }, + "/version": { + target: backendProxyTarget, + changeOrigin: true, + }, + }, + sourcemapIgnoreList: () => false, // Show all sources in DevTools + + watch: { + // Ignore node_modules to drastically reduce file handle usage + ignored: ["**/node_modules/**", "**/dist/**", "**/.git/**"], + + // Use polling on Windows to avoid file handle exhaustion + // This is slightly less efficient but much more stable + usePolling: process.platform === "win32", + + // If using polling, set a reasonable interval (in milliseconds) + interval: 1000, + + // Additional options for Windows specifically + ...(process.platform === "win32" && { + // Increase the binary interval for better Windows performance + binaryInterval: 1000, + // Use a more conservative approach to watching + awaitWriteFinish: { + stabilityThreshold: 500, + pollInterval: 100, + }, + }), + }, - // Note: leave `server.hmr` unset so Vite derives the websocket URL from the - // served script URL (works when accessed via reverse proxy / custom domain). - }, - preview: { - host: "127.0.0.1", - port: previewPort, - strictPort: true, - allowedHosts: ["localhost", "127.0.0.1"], - }, - optimizeDeps: { - esbuildOptions: { - target: "esnext", + // Note: leave `server.hmr` unset so Vite derives the websocket URL from the + // served script URL (works when accessed via reverse proxy / custom domain). + }, + preview: { + host: "127.0.0.1", + port: previewPort, + strictPort: true, + allowedHosts: ["localhost", "127.0.0.1"], }, + optimizeDeps: { + esbuildOptions: { + target: "esnext", + }, - // Limit dependency pre-bundling scans to the renderer entrypoints. - // Scanning all of src/ includes backend-only code (src/node, src/cli), which can - // pull in Node-only deps and break Vite's dep-scan (notably on Windows). - entries: ["index.html", "terminal.html"], + // Limit dependency pre-bundling scans to the renderer entrypoints. + // Scanning all of src/ includes backend-only code (src/node, src/cli), which can + // pull in Node-only deps and break Vite's dep-scan (notably on Windows). + entries: ["index.html", "terminal.html"], - // Force re-optimize dependencies - force: false, - }, - assetsInclude: ["**/*.wasm"], -})); + // Force re-optimize dependencies + force: false, + }, + assetsInclude: ["**/*.wasm"], + }; +}); From 8dd2300cbaa8457bc04898a4578dc7cc137cc6d5 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sat, 17 Jan 2026 13:47:40 -0600 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=A4=96=20perf:=20stabilize=20displaye?= =?UTF-8?q?d=20message=20references?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../messages/StreamingMessageAggregator.ts | 531 ++++++++++-------- 1 file changed, 294 insertions(+), 237 deletions(-) diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index dad92ab698..5fdcfaed6c 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -164,6 +164,11 @@ export class StreamingMessageAggregator { // Derived value cache - invalidated as a unit on every mutation. // Adding a new cached value? Add it here and it will auto-invalidate. + private displayedMessageCache = new Map< + string, + { version: number; messages: DisplayedMessage[] } + >(); + private messageVersions = new Map(); private cache: { allMessages?: MuxMessage[]; displayedMessages?: DisplayedMessage[]; @@ -370,6 +375,25 @@ export class StreamingMessageAggregator { return serverTimestamp + context.clockOffsetMs; } + + private bumpMessageVersion(messageId: string): void { + const current = this.messageVersions.get(messageId) ?? 0; + this.messageVersions.set(messageId, current + 1); + } + + private markMessageDirty(messageId: string): void { + this.bumpMessageVersion(messageId); + this.invalidateCache(); + } + + private deleteMessage(messageId: string): boolean { + const didDelete = this.messages.delete(messageId); + if (didDelete) { + this.displayedMessageCache.delete(messageId); + this.messageVersions.delete(messageId); + } + return didDelete; + } private invalidateCache(): void { this.cache = {}; this.updateRecency(); @@ -588,7 +612,7 @@ export class StreamingMessageAggregator { // Just store the message - backend assigns historySequence this.messages.set(message.id, message); - this.invalidateCache(); + this.markMessageDirty(message.id); } /** @@ -597,7 +621,7 @@ export class StreamingMessageAggregator { * Rebuilds detected links to remove any that only existed in the removed message. */ removeMessage(messageId: string): void { - if (this.messages.delete(messageId)) { + if (this.deleteMessage(messageId)) { this.invalidateCache(); } } @@ -610,6 +634,8 @@ export class StreamingMessageAggregator { * @param hasActiveStream - Whether there's an active stream in buffered events (for reconnection scenario) */ loadHistoricalMessages(messages: MuxMessage[], hasActiveStream = false): void { + this.displayedMessageCache.clear(); + this.messageVersions.clear(); // First, add all messages to the map for (const message of messages) { this.messages.set(message.id, message); @@ -955,12 +981,22 @@ export class StreamingMessageAggregator { } clearActiveStreams(): void { + const activeMessageIds = Array.from(this.activeStreams.keys()); this.activeStreams.clear(); + + if (activeMessageIds.length > 0) { + for (const messageId of activeMessageIds) { + this.bumpMessageVersion(messageId); + } + this.invalidateCache(); + } } clear(): void { this.messages.clear(); this.activeStreams.clear(); + this.displayedMessageCache.clear(); + this.messageVersions.clear(); this.invalidateCache(); } @@ -975,7 +1011,7 @@ export class StreamingMessageAggregator { for (const [messageId, message] of this.messages.entries()) { const historySeq = message.metadata?.historySequence; if (historySeq !== undefined && sequencesToDelete.has(historySeq)) { - this.messages.delete(messageId); + this.deleteMessage(messageId); } } @@ -1049,7 +1085,7 @@ export class StreamingMessageAggregator { }); this.messages.set(data.messageId, streamingMessage); - this.invalidateCache(); + this.markMessageDirty(data.messageId); } handleStreamDelta(data: StreamDeltaEvent): void { @@ -1076,7 +1112,7 @@ export class StreamingMessageAggregator { // Track delta for token counting and TPS calculation this.trackDelta(data.messageId, data.tokens, data.timestamp, "text"); - this.invalidateCache(); + this.markMessageDirty(data.messageId); } handleStreamEnd(data: StreamEndEvent): void { @@ -1148,7 +1184,7 @@ export class StreamingMessageAggregator { this.cleanupStreamState(data.messageId); } // Assistant message is now stable (completed or reconnected) - invalidate all caches. - this.invalidateCache(); + this.markMessageDirty(data.messageId); } handleStreamAbort(data: StreamAbortEvent): void { @@ -1184,7 +1220,7 @@ export class StreamingMessageAggregator { // Clean up stream-scoped state (active stream tracking, TODOs) this.cleanupStreamState(data.messageId); // Assistant message is now stable (aborted) - invalidate all caches. - this.invalidateCache(); + this.markMessageDirty(data.messageId); } } @@ -1214,7 +1250,7 @@ export class StreamingMessageAggregator { // Clean up stream-scoped state (active stream tracking, TODOs) this.cleanupStreamState(data.messageId); // Assistant message is now stable (errored) - invalidate all caches. - this.invalidateCache(); + this.markMessageDirty(data.messageId); } else { // Pre-stream error (e.g., API key not configured before streaming starts) // Create a synthetic error message since there's no active stream to attach to @@ -1236,7 +1272,7 @@ export class StreamingMessageAggregator { }, }; this.messages.set(data.messageId, errorMessage); - this.invalidateCache(); + this.markMessageDirty(data.messageId); } } @@ -1260,7 +1296,7 @@ export class StreamingMessageAggregator { input: data.args, timestamp: data.timestamp, }); - this.invalidateCache(); + this.markMessageDirty(data.messageId); return; } } @@ -1297,7 +1333,7 @@ export class StreamingMessageAggregator { // Track tokens for tool input this.trackDelta(data.messageId, data.tokens, data.timestamp, "tool-args"); - this.invalidateCache(); + this.markMessageDirty(data.messageId); } handleToolCallDelta(data: ToolCallDeltaEvent): void { @@ -1430,7 +1466,7 @@ export class StreamingMessageAggregator { : nc ); message.parts[parentIndex] = { ...parentPart, nestedCalls: updatedNestedCalls }; - this.invalidateCache(); + this.markMessageDirty(data.messageId); return; } } @@ -1452,10 +1488,10 @@ export class StreamingMessageAggregator { this.processToolResult(data.toolName, toolPart.input, data.result, "streaming"); // Tool output is now stable - invalidate all caches. - this.invalidateCache(); + this.markMessageDirty(data.messageId); } else { // Tool part not found (shouldn't happen normally) - still invalidate display cache. - this.invalidateCache(); + this.markMessageDirty(data.messageId); } } } @@ -1484,7 +1520,7 @@ export class StreamingMessageAggregator { // Track delta for token counting and TPS calculation this.trackDelta(data.messageId, data.tokens, data.timestamp, "reasoning"); - this.invalidateCache(); + this.markMessageDirty(data.messageId); } handleReasoningEnd(_data: ReasoningEndEvent): void { @@ -1585,7 +1621,7 @@ export class StreamingMessageAggregator { } } for (const removeId of messagesToRemove) { - this.messages.delete(removeId); + this.deleteMessage(removeId); } break; // Found and handled the conflict } @@ -1625,6 +1661,237 @@ export class StreamingMessageAggregator { } } + private buildDisplayedMessagesForMessage(message: MuxMessage): DisplayedMessage[] { + const displayedMessages: DisplayedMessage[] = []; + const baseTimestamp = message.metadata?.timestamp; + const historySequence = message.metadata?.historySequence ?? 0; + + // Check for plan-display messages (ephemeral /plan output) + const muxMeta = message.metadata?.muxMetadata; + if (muxMeta?.type === "plan-display") { + const content = message.parts + .filter((p) => p.type === "text") + .map((p) => p.text) + .join(""); + displayedMessages.push({ + type: "plan-display", + id: message.id, + historyId: message.id, + content, + path: muxMeta.path, + historySequence, + }); + return displayedMessages; + } + + if (message.role === "user") { + // User messages: combine all text parts into single block, extract images + const content = message.parts + .filter((p) => p.type === "text") + .map((p) => p.text) + .join(""); + + const imageParts = message.parts + .filter((p): p is MuxImagePart => { + // Accept both new "file" type and legacy "image" type (from before PR #308) + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access + return p.type === "file" || (p as any).type === "image"; + }) + .map((p) => ({ + url: typeof p.url === "string" ? p.url : "", + mediaType: p.mediaType, + })); + + // Check if this is a compaction request message + const compactionRequest = + muxMeta?.type === "compaction-request" + ? { + rawCommand: muxMeta.rawCommand, + parsed: { + model: muxMeta.parsed.model, + maxOutputTokens: muxMeta.parsed.maxOutputTokens, + continueMessage: muxMeta.parsed.continueMessage, + } satisfies CompactionRequestData, + } + : undefined; + + // Extract reviews from muxMetadata for rich UI display (orthogonal to message type) + const reviews = muxMeta?.reviews; + + displayedMessages.push({ + type: "user", + id: message.id, + historyId: message.id, + content: compactionRequest ? compactionRequest.rawCommand : content, + imageParts: imageParts.length > 0 ? imageParts : undefined, + historySequence, + isSynthetic: message.metadata?.synthetic === true ? true : undefined, + timestamp: baseTimestamp, + compactionRequest, + reviews, + }); + return displayedMessages; + } + + if (message.role === "assistant") { + // Assistant messages: each part becomes a separate DisplayedMessage + // Use streamSequence to order parts within this message + let streamSeq = 0; + + // Check if this message has an active stream (for inferring streaming status) + // Direct Map.has() check - O(1) instead of O(n) iteration + const hasActiveStream = this.activeStreams.has(message.id); + + // isPartial from metadata (set by stream-abort event) + const isPartial = message.metadata?.partial === true; + + // Merge adjacent text/reasoning parts for display + const mergedParts = mergeAdjacentParts(message.parts); + + // Find the last part that will produce a DisplayedMessage + // (reasoning, text parts with content, OR tool parts) + let lastPartIndex = -1; + for (let i = mergedParts.length - 1; i >= 0; i--) { + const part = mergedParts[i]; + if ( + part.type === "reasoning" || + (part.type === "text" && part.text) || + isDynamicToolPart(part) + ) { + lastPartIndex = i; + break; + } + } + + mergedParts.forEach((part, partIndex) => { + const isLastPart = partIndex === lastPartIndex; + // Part is streaming if: active stream exists AND this is the last part + const isStreaming = hasActiveStream && isLastPart; + + if (part.type === "reasoning") { + // Reasoning part - shows thinking/reasoning content + displayedMessages.push({ + type: "reasoning", + id: `${message.id}-${partIndex}`, + historyId: message.id, + content: part.text, + historySequence, + streamSequence: streamSeq++, + isStreaming, + isPartial, + isLastPartOfMessage: isLastPart, + timestamp: part.timestamp ?? baseTimestamp, + }); + } else if (part.type === "text" && part.text) { + // Skip empty text parts + displayedMessages.push({ + type: "assistant", + id: `${message.id}-${partIndex}`, + historyId: message.id, + content: part.text, + historySequence, + streamSequence: streamSeq++, + isStreaming, + isPartial, + isLastPartOfMessage: isLastPart, + // Support both new enum ("user"|"idle") and legacy boolean (true) + isCompacted: !!message.metadata?.compacted, + isIdleCompacted: message.metadata?.compacted === "idle", + model: message.metadata?.model, + mode: message.metadata?.mode, + timestamp: part.timestamp ?? baseTimestamp, + }); + } else if (isDynamicToolPart(part)) { + // Determine status based on part state and result + let status: "pending" | "executing" | "completed" | "failed" | "interrupted"; + if (part.state === "output-available") { + // Check if result indicates failure (for tools that return { success: boolean }) + status = hasFailureResult(part.output) ? "failed" : "completed"; + } else if (part.state === "input-available") { + // Most unfinished tool calls in partial messages represent an interruption. + // ask_user_question is different: it's intentionally waiting on user input, + // so after restart we should keep it answerable ("executing") instead of + // showing retry/auto-resume UX. + if (part.toolName === "ask_user_question") { + status = "executing"; + } else if (isPartial) { + status = "interrupted"; + } else { + status = "executing"; + } + } else { + status = "pending"; + } + + // For code_execution, use streaming nestedCalls if present, or reconstruct from result + let nestedCalls = part.nestedCalls; + if ( + !nestedCalls && + part.toolName === "code_execution" && + part.state === "output-available" + ) { + // Reconstruct nestedCalls from result.toolCalls (for historical replay) + const result = part.output as + | { + toolCalls?: Array<{ + toolName: string; + args: unknown; + result?: unknown; + error?: string; + duration_ms: number; + }>; + } + | undefined; + if (result?.toolCalls) { + nestedCalls = result.toolCalls.map((tc, idx) => ({ + toolCallId: `${part.toolCallId}-nested-${idx}`, + toolName: tc.toolName, + input: tc.args, + output: tc.result ?? (tc.error ? { error: tc.error } : undefined), + state: "output-available" as const, + timestamp: part.timestamp, + })); + } + } + + displayedMessages.push({ + type: "tool", + id: `${message.id}-${partIndex}`, + historyId: message.id, + toolCallId: part.toolCallId, + toolName: part.toolName, + args: part.input, + result: part.state === "output-available" ? part.output : undefined, + status, + isPartial, + historySequence, + streamSequence: streamSeq++, + isLastPartOfMessage: isLastPart, + timestamp: part.timestamp ?? baseTimestamp, + nestedCalls, + }); + } + }); + + // Create stream-error DisplayedMessage if message has error metadata + // This happens after all parts are displayed, so error appears at the end + if (message.metadata?.error) { + displayedMessages.push({ + type: "stream-error", + id: `${message.id}-error`, + historyId: message.id, + error: message.metadata.error, + errorType: message.metadata.errorType ?? "unknown", + historySequence, + model: message.metadata.model, + timestamp: baseTimestamp, + }); + } + } + + return displayedMessages; + } + /** * Transform MuxMessages into DisplayedMessages for UI consumption * This splits complex messages with multiple parts into separate UI blocks @@ -1646,229 +1913,19 @@ export class StreamingMessageAggregator { continue; } - const baseTimestamp = message.metadata?.timestamp; - // Get historySequence from backend (required field) - const historySequence = message.metadata?.historySequence ?? 0; - - // Check for plan-display messages (ephemeral /plan output) - const muxMeta = message.metadata?.muxMetadata; - if (muxMeta?.type === "plan-display") { - const content = message.parts - .filter((p) => p.type === "text") - .map((p) => p.text) - .join(""); - displayedMessages.push({ - type: "plan-display", - id: message.id, - historyId: message.id, - content, - path: muxMeta.path, - historySequence, - }); - continue; - } - - if (message.role === "user") { - // User messages: combine all text parts into single block, extract images - const content = message.parts - .filter((p) => p.type === "text") - .map((p) => p.text) - .join(""); - - const imageParts = message.parts - .filter((p): p is MuxImagePart => { - // Accept both new "file" type and legacy "image" type (from before PR #308) - // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access - return p.type === "file" || (p as any).type === "image"; - }) - .map((p) => ({ - url: typeof p.url === "string" ? p.url : "", - mediaType: p.mediaType, - })); - - // Check if this is a compaction request message - const muxMeta = message.metadata?.muxMetadata; - const compactionRequest = - muxMeta?.type === "compaction-request" - ? { - rawCommand: muxMeta.rawCommand, - parsed: { - model: muxMeta.parsed.model, - maxOutputTokens: muxMeta.parsed.maxOutputTokens, - continueMessage: muxMeta.parsed.continueMessage, - } satisfies CompactionRequestData, - } - : undefined; - - // Extract reviews from muxMetadata for rich UI display (orthogonal to message type) - const reviews = muxMeta?.reviews; + const version = this.messageVersions.get(message.id) ?? 0; + const cached = this.displayedMessageCache.get(message.id); + const messageDisplay = + cached?.version === version + ? cached.messages + : this.buildDisplayedMessagesForMessage(message); - displayedMessages.push({ - type: "user", - id: message.id, - historyId: message.id, - content: compactionRequest ? compactionRequest.rawCommand : content, - imageParts: imageParts.length > 0 ? imageParts : undefined, - historySequence, - isSynthetic: isSynthetic ? true : undefined, - timestamp: baseTimestamp, - compactionRequest, - reviews, - }); - } else if (message.role === "assistant") { - // Assistant messages: each part becomes a separate DisplayedMessage - // Use streamSequence to order parts within this message - let streamSeq = 0; - - // Check if this message has an active stream (for inferring streaming status) - // Direct Map.has() check - O(1) instead of O(n) iteration - const hasActiveStream = this.activeStreams.has(message.id); - - // isPartial from metadata (set by stream-abort event) - const isPartial = message.metadata?.partial === true; - - // Merge adjacent text/reasoning parts for display - const mergedParts = mergeAdjacentParts(message.parts); - - // Find the last part that will produce a DisplayedMessage - // (reasoning, text parts with content, OR tool parts) - let lastPartIndex = -1; - for (let i = mergedParts.length - 1; i >= 0; i--) { - const part = mergedParts[i]; - if ( - part.type === "reasoning" || - (part.type === "text" && part.text) || - isDynamicToolPart(part) - ) { - lastPartIndex = i; - break; - } - } - - mergedParts.forEach((part, partIndex) => { - const isLastPart = partIndex === lastPartIndex; - // Part is streaming if: active stream exists AND this is the last part - const isStreaming = hasActiveStream && isLastPart; - - if (part.type === "reasoning") { - // Reasoning part - shows thinking/reasoning content - displayedMessages.push({ - type: "reasoning", - id: `${message.id}-${partIndex}`, - historyId: message.id, - content: part.text, - historySequence, - streamSequence: streamSeq++, - isStreaming, - isPartial, - isLastPartOfMessage: isLastPart, - timestamp: part.timestamp ?? baseTimestamp, - }); - } else if (part.type === "text" && part.text) { - // Skip empty text parts - displayedMessages.push({ - type: "assistant", - id: `${message.id}-${partIndex}`, - historyId: message.id, - content: part.text, - historySequence, - streamSequence: streamSeq++, - isStreaming, - isPartial, - isLastPartOfMessage: isLastPart, - // Support both new enum ("user"|"idle") and legacy boolean (true) - isCompacted: !!message.metadata?.compacted, - isIdleCompacted: message.metadata?.compacted === "idle", - model: message.metadata?.model, - mode: message.metadata?.mode, - timestamp: part.timestamp ?? baseTimestamp, - }); - } else if (isDynamicToolPart(part)) { - // Determine status based on part state and result - let status: "pending" | "executing" | "completed" | "failed" | "interrupted"; - if (part.state === "output-available") { - // Check if result indicates failure (for tools that return { success: boolean }) - status = hasFailureResult(part.output) ? "failed" : "completed"; - } else if (part.state === "input-available") { - // Most unfinished tool calls in partial messages represent an interruption. - // ask_user_question is different: it's intentionally waiting on user input, - // so after restart we should keep it answerable ("executing") instead of - // showing retry/auto-resume UX. - if (part.toolName === "ask_user_question") { - status = "executing"; - } else if (isPartial) { - status = "interrupted"; - } else { - status = "executing"; - } - } else { - status = "pending"; - } - - // For code_execution, use streaming nestedCalls if present, or reconstruct from result - let nestedCalls = part.nestedCalls; - if ( - !nestedCalls && - part.toolName === "code_execution" && - part.state === "output-available" - ) { - // Reconstruct nestedCalls from result.toolCalls (for historical replay) - const result = part.output as - | { - toolCalls?: Array<{ - toolName: string; - args: unknown; - result?: unknown; - error?: string; - duration_ms: number; - }>; - } - | undefined; - if (result?.toolCalls) { - nestedCalls = result.toolCalls.map((tc, idx) => ({ - toolCallId: `${part.toolCallId}-nested-${idx}`, - toolName: tc.toolName, - input: tc.args, - output: tc.result ?? (tc.error ? { error: tc.error } : undefined), - state: "output-available" as const, - timestamp: part.timestamp, - })); - } - } - - displayedMessages.push({ - type: "tool", - id: `${message.id}-${partIndex}`, - historyId: message.id, - toolCallId: part.toolCallId, - toolName: part.toolName, - args: part.input, - result: part.state === "output-available" ? part.output : undefined, - status, - isPartial, - historySequence, - streamSequence: streamSeq++, - isLastPartOfMessage: isLastPart, - timestamp: part.timestamp ?? baseTimestamp, - nestedCalls, - }); - } - }); + if (cached?.version !== version) { + this.displayedMessageCache.set(message.id, { version, messages: messageDisplay }); + } - // Create stream-error DisplayedMessage if message has error metadata - // This happens after all parts are displayed, so error appears at the end - if (message.metadata?.error) { - displayedMessages.push({ - type: "stream-error", - id: `${message.id}-error`, - historyId: message.id, - error: message.metadata.error, - errorType: message.metadata.errorType ?? "unknown", - historySequence, - model: message.metadata.model, - timestamp: baseTimestamp, - }); - } + if (messageDisplay.length > 0) { + displayedMessages.push(...messageDisplay); } }