From 43cb015844a92c6a08bb5914aee78c2615d4a9cd Mon Sep 17 00:00:00 2001 From: ihrpr Date: Sun, 13 Apr 2025 21:40:15 +0100 Subject: [PATCH 01/10] resume long running requst from new client --- src/client/streamableHttp.ts | 25 +++++++++++++++--- src/examples/client/simpleStreamableHttp.ts | 16 ++++++++++-- src/examples/server/simpleStreamableHttp.ts | 20 +++++++++------ src/server/streamableHttp.ts | 28 +++++++++------------ src/shared/protocol.ts | 18 ++++++++++--- src/shared/transport.ts | 2 +- 6 files changed, 76 insertions(+), 33 deletions(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 9451e8d9..77acc9c4 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -71,6 +71,11 @@ export type StreamableHTTPClientTransportOptions = { * Options to configure the reconnection behavior. */ reconnectionOptions?: StreamableHTTPReconnectionOptions; + /** + * Session ID for the connection. This is used to identify the session on the server. + * When not provided and connecting to a server that supports session IDs, the server will generate a new session ID. + */ + sessionId?: string; }; /** @@ -98,6 +103,7 @@ export class StreamableHTTPClientTransport implements Transport { this._requestInit = opts?.requestInit; this._authProvider = opts?.authProvider; this._reconnectionOptions = opts?.reconnectionOptions || this._defaultReconnectionOptions; + this._sessionId = opts?.sessionId; } private async _authThenStart(): Promise { @@ -240,7 +246,7 @@ export class StreamableHTTPClientTransport implements Transport { }, delay); } - private _handleSseStream(stream: ReadableStream | null): void { + private _handleSseStream(stream: ReadableStream | null, onLastEventIdUpdate?: (event: string) => void): void { if (!stream) { return; } @@ -266,6 +272,7 @@ export class StreamableHTTPClientTransport implements Transport { // Update last event ID if provided if (event.id) { lastEventId = event.id; + onLastEventIdUpdate?.(lastEventId); } if (!event.event || event.event === "message") { @@ -330,8 +337,16 @@ export class StreamableHTTPClientTransport implements Transport { this.onclose?.(); } - async send(message: JSONRPCMessage | JSONRPCMessage[]): Promise { + async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { lastEventId?: string, onLastEventIdUpdate?: (event: string) => void }): Promise { try { + // If client passes in a lastEventId in the request options, we need to reconnect the SSE stream + const { lastEventId, onLastEventIdUpdate } = options ?? {}; + if (lastEventId) { + // If we have at last event ID, we need to reconnect the SSE stream + this._startOrAuthStandaloneSSE(lastEventId).catch(err => this.onerror?.(err)); + return; + } + const headers = await this._commonHeaders(); headers.set("content-type", "application/json"); headers.set("accept", "application/json, text/event-stream"); @@ -393,7 +408,7 @@ export class StreamableHTTPClientTransport implements Transport { // Handle SSE stream responses for requests // We use the same handler as standalone streams, which now supports // reconnection with the last event ID - this._handleSseStream(response.body); + this._handleSseStream(response.body, onLastEventIdUpdate); } else if (contentType?.includes("application/json")) { // For non-streaming servers, we might get direct JSON responses const data = await response.json(); @@ -416,4 +431,8 @@ export class StreamableHTTPClientTransport implements Transport { throw error; } } + + get sessionId(): string | undefined { + return this._sessionId; + } } diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index 923ffbc2..922aa0ce 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -29,6 +29,8 @@ let notificationCount = 0; let client: Client | null = null; let transport: StreamableHTTPClientTransport | null = null; let serverUrl = 'http://localhost:3000/mcp'; +let notificationsToolLastEventId: string | undefined = undefined; +let sessionId: string | undefined = undefined; async function main(): Promise { console.log('MCP Interactive Client'); @@ -186,7 +188,10 @@ async function connect(url?: string): Promise { } transport = new StreamableHTTPClientTransport( - new URL(serverUrl) + new URL(serverUrl), + { + sessionId: sessionId + } ); // Set up notification handlers @@ -218,6 +223,8 @@ async function connect(url?: string): Promise { // Connect the client await client.connect(transport); + sessionId = transport.sessionId + console.log('Transport created with session ID:', sessionId); console.log('Connected to MCP server'); } catch (error) { console.error('Failed to connect:', error); @@ -291,7 +298,12 @@ async function callTool(name: string, args: Record): Promise { + notificationsToolLastEventId = event; + }; + const result = await client.request(request, CallToolResultSchema, { + lastEventId: notificationsToolLastEventId, onLastEventIdUpdate + }); console.log('Tool result:'); result.content.forEach(item => { diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index a45b3158..dc03f1db 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -172,14 +172,18 @@ server.tool( while (count === 0 || counter < count) { counter++; - await sendNotification({ - method: "notifications/message", - params: { - level: "info", - data: `Periodic notification #${counter} at ${new Date().toISOString()}` - } - }); - + try { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Periodic notification #${counter} at ${new Date().toISOString()}` + } + }); + } + catch (error) { + console.error("Error sending notification:", error); + } // Wait for the specified interval await sleep(interval); } diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index b8be6d15..7740a08f 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -217,7 +217,6 @@ export class StreamableHTTPServerTransport implements Transport { // Assign the response to the standalone SSE stream this._streamMapping.set(this._standaloneSseStreamId, res); - // Set up close handler for client disconnects res.on("close", () => { this._streamMapping.delete(this._standaloneSseStreamId); @@ -345,8 +344,10 @@ export class StreamableHTTPServerTransport implements Transport { const isInitializationRequest = messages.some( msg => 'method' in msg && msg.method === 'initialize' ); + const mcpSessionId = req.headers["mcp-session-id"] as string | undefined; if (isInitializationRequest) { - if (this._initialized) { + // if generateSessionId is not set, the server does not support session management + if (this._initialized && this.sessionId !== undefined && mcpSessionId !== this.sessionId) { res.writeHead(400).end(JSON.stringify({ jsonrpc: "2.0", error: { @@ -368,7 +369,7 @@ export class StreamableHTTPServerTransport implements Transport { })); return; } - this.sessionId = this.sessionIdGenerator(); + this.sessionId = mcpSessionId ?? this.sessionIdGenerator(); this._initialized = true; } @@ -419,17 +420,8 @@ export class StreamableHTTPServerTransport implements Transport { this._requestToStreamMapping.set(message.id, streamId); } } - // Set up close handler for client disconnects res.on("close", () => { - // find a stream ID for this response - // Remove all entries that reference this response - for (const [id, stream] of this._requestToStreamMapping.entries()) { - if (streamId === stream) { - this._requestToStreamMapping.delete(id); - this._requestResponseMap.delete(id); - } - } this._streamMapping.delete(streamId); }); @@ -577,7 +569,7 @@ export class StreamableHTTPServerTransport implements Transport { // Get the response for this request const streamId = this._requestToStreamMapping.get(requestId); const response = this._streamMapping.get(streamId!); - if (!streamId || !response) { + if (!streamId) { throw new Error(`No connection established for request ID: ${String(requestId)}`); } @@ -588,9 +580,10 @@ export class StreamableHTTPServerTransport implements Transport { if (this._eventStore) { eventId = await this._eventStore.storeEvent(streamId, message); } - - // Write the event to the response stream - this.writeSSEEvent(response, message, eventId); + if (response) { + // Write the event to the response stream + this.writeSSEEvent(response, message, eventId); + } } if (isJSONRPCResponse(message)) { @@ -603,6 +596,9 @@ export class StreamableHTTPServerTransport implements Transport { const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id)); if (allResponsesReady) { + if (!response) { + throw new Error(`No connection established for request ID: ${String(requestId)}`); + } if (this._enableJsonResponse) { // All responses ready, send as JSON const headers: Record = { diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index 07bfc02c..6193f5f1 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -87,6 +87,18 @@ export type RequestOptions = { * May be used to indicate to the transport which incoming request to associate this outgoing request with. */ relatedRequestId?: RequestId; + + /** + * May be used to indicate to the transport which last event ID to associate this outgoing request with. + * This is used to resume a long-running requests that may have been interrupted and a new instance of a client is being created. + */ + lastEventId?: string; + + /** + * A callback that is invoked when the last event ID is updated. + * This is used to notidy the client that the last event ID has changed, so that client can update its state accordingly. + */ + onLastEventIdUpdate?: (event: string) => void; }; /** @@ -501,7 +513,7 @@ export abstract class Protocol< resultSchema: T, options?: RequestOptions, ): Promise> { - const { relatedRequestId } = options ?? {}; + const { relatedRequestId, lastEventId, onLastEventIdUpdate } = options ?? {}; return new Promise((resolve, reject) => { if (!this._transport) { @@ -543,7 +555,7 @@ export abstract class Protocol< requestId: messageId, reason: String(reason), }, - }, { relatedRequestId }) + }, { relatedRequestId, lastEventId, onLastEventIdUpdate }) .catch((error) => this._onerror(new Error(`Failed to send cancellation: ${error}`)), ); @@ -581,7 +593,7 @@ export abstract class Protocol< this._setupTimeout(messageId, timeout, options?.maxTotalTimeout, timeoutHandler, options?.resetTimeoutOnProgress ?? false); - this._transport.send(jsonrpcRequest, { relatedRequestId }).catch((error) => { + this._transport.send(jsonrpcRequest, { relatedRequestId, lastEventId, onLastEventIdUpdate }).catch((error) => { this._cleanupTimeout(messageId); reject(error); }); diff --git a/src/shared/transport.ts b/src/shared/transport.ts index e464653b..cfbe8461 100644 --- a/src/shared/transport.ts +++ b/src/shared/transport.ts @@ -18,7 +18,7 @@ export interface Transport { * * If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with. */ - send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise; + send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId, lastEventId?: string, onLastEventIdUpdate?: (event: string) => void }): Promise; /** * Closes the connection. From c49769b97a8420008d9f53923bd5652f602eda24 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Sun, 13 Apr 2025 23:46:38 +0100 Subject: [PATCH 02/10] integration tests --- .../taskResumability.test.ts | 329 ++++++++++++++++++ 1 file changed, 329 insertions(+) create mode 100644 src/integration-tests/taskResumability.test.ts diff --git a/src/integration-tests/taskResumability.test.ts b/src/integration-tests/taskResumability.test.ts new file mode 100644 index 00000000..9397d4a0 --- /dev/null +++ b/src/integration-tests/taskResumability.test.ts @@ -0,0 +1,329 @@ +import { createServer, type Server } from 'node:http'; +import { AddressInfo } from 'node:net'; +import { randomUUID } from 'node:crypto'; +import { Client } from '../client/index.js'; +import { StreamableHTTPClientTransport } from '../client/streamableHttp.js'; +import { McpServer } from '../server/mcp.js'; +import { EventStore, StreamableHTTPServerTransport } from '../server/streamableHttp.js'; +import { CallToolResult, CallToolResultSchema, JSONRPCMessage, LoggingMessageNotificationSchema } from '../types.js'; +import { z } from 'zod'; + +/** + * Simple in-memory event store implementation for resumability + */ +class InMemoryEventStore implements EventStore { + private events: Map = new Map(); + + generateEventId(streamId: string): string { + return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; + } + + getStreamIdFromEventId(eventId: string): string { + const parts = eventId.split('_'); + return parts.length > 0 ? parts[0] : ''; + } + + async storeEvent(streamId: string, message: JSONRPCMessage): Promise { + const eventId = this.generateEventId(streamId); + this.events.set(eventId, { streamId, message }); + return eventId; + } + + async getEventsAfter(lastEventId: string): Promise> { + if (!lastEventId || !this.events.has(lastEventId)) { + return []; + } + + // Extract the stream ID from the event ID + const streamId = this.getStreamIdFromEventId(lastEventId); + const result: Array<{ eventId: string, message: JSONRPCMessage }> = []; + let foundLastEvent = false; + + // Sort events by eventId for chronological ordering + const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0])); + + for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) { + // Only include events from the same stream + if (eventStreamId !== streamId) { + continue; + } + + // Start collecting events after we find the lastEventId + if (eventId === lastEventId) { + foundLastEvent = true; + continue; + } + + if (foundLastEvent) { + result.push({ eventId, message }); + } + } + + return result; + } +} + + +describe('Transport resumability', () => { + let server: Server; + let mcpServer: McpServer; + let serverTransport: StreamableHTTPServerTransport; + let baseUrl: URL; + let eventStore: InMemoryEventStore; + + beforeEach(async () => { + // Create event store for resumability + eventStore = new InMemoryEventStore(); + + // Create a simple MCP server + mcpServer = new McpServer( + { name: 'test-server', version: '1.0.0' }, + { capabilities: { logging: {} } } + ); + + // Add a simple notification tool that completes quickly + mcpServer.tool( + 'send-notification', + 'Sends a single notification', + { + message: z.string().describe('Message to send').default('Test notification') + }, + async ({ message }, { sendNotification }) => { + // Send notification immediately + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: message + } + }); + + return { + content: [{ type: 'text', text: 'Notification sent' }] + }; + } + ); + + // Add a long-running tool that sends multiple notifications + mcpServer.tool( + 'run-notifications', + 'Sends multiple notifications over time', + { + count: z.number().describe('Number of notifications to send').default(10), + interval: z.number().describe('Interval between notifications in ms').default(50) + }, + async ({ count, interval }, { sendNotification }) => { + // Send notifications at specified intervals + for (let i = 0; i < count; i++) { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Notification ${i + 1} of ${count}` + } + }); + + // Wait for the specified interval before sending next notification + if (i < count - 1) { + await new Promise(resolve => setTimeout(resolve, interval)); + } + } + + return { + content: [{ type: 'text', text: `Sent ${count} notifications` }] + }; + } + ); + + // Create a transport with the event store + serverTransport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + eventStore + }); + + // Connect the transport to the MCP server + await mcpServer.connect(serverTransport); + + // Create and start an HTTP server + server = createServer(async (req, res) => { + await serverTransport.handleRequest(req, res); + }); + + // Start the server on a random port + baseUrl = await new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => { + const addr = server.address() as AddressInfo; + resolve(new URL(`http://127.0.0.1:${addr.port}`)); + }); + }); + }); + + afterEach(async () => { + // Clean up resources + await mcpServer.close().catch(() => { }); + await serverTransport.close().catch(() => { }); + server.close(); + }); + + it('should store session ID when client connects', async () => { + // Create and connect a client + const client = new Client({ + name: 'test-client', + version: '1.0.0' + }); + + const transport = new StreamableHTTPClientTransport(baseUrl); + await client.connect(transport); + + // Verify session ID was generated + expect(transport.sessionId).toBeDefined(); + + // Clean up + await transport.close(); + }); + + it('should have session ID functionality', async () => { + // The ability to store a session ID when connecting + const client = new Client({ + name: 'test-client-reconnection', + version: '1.0.0' + }); + + const transport = new StreamableHTTPClientTransport(baseUrl); + + // Make sure the client can connect and get a session ID + await client.connect(transport); + expect(transport.sessionId).toBeDefined(); + + // Clean up + await transport.close(); + }); + + // This test demonstrates the capability to resume long-running tools + // across client disconnection/reconnection + it('should resume long-running notifications with lastEventId', async () => { + // Create unique client ID for this test + const clientId = 'test-client-long-running'; + const notifications: any[] = []; + let sessionId: string | undefined; + let lastEventId: string | undefined; + + // Create first client + let client1 = new Client({ + id: clientId, + name: 'test-client', + version: '1.0.0' + }); + + // Set up notification handler for first client + client1.setNotificationHandler(LoggingMessageNotificationSchema, (notification: any) => { + if (notification.method === 'notifications/message') { + notifications.push(notification.params); + } + }); + + // Connect first client + const transport1 = new StreamableHTTPClientTransport(baseUrl); + await client1.connect(transport1); + sessionId = transport1.sessionId; + expect(sessionId).toBeDefined(); + + // Start a long-running notification stream with tracking of lastEventId + const onLastEventIdUpdate = jest.fn((eventId: string) => { + lastEventId = eventId; + }); + + // Start the notification tool with event tracking using request + const toolPromise = client1.request({ + method: 'tools/call', + params: { + name: 'run-notifications', + arguments: { + count: 5, + interval: 10 + } + } + }, CallToolResultSchema, { + lastEventId, + onLastEventIdUpdate + }); + + // Wait for some notifications to arrive (not all) + await new Promise(resolve => setTimeout(resolve, 20)); + + // Verify we received some notifications and lastEventId was updated + expect(notifications.length).toBeGreaterThan(0); + expect(notifications.length).toBeLessThan(5); + expect(onLastEventIdUpdate).toHaveBeenCalled(); + expect(lastEventId).toBeDefined(); + + // Store original notification count for later comparison + const firstClientNotificationCount = notifications.length; + + // Disconnect first client without waiting for completion + // When we close the connection, it will cause a ConnectionClosed error for + // any in-progress requests, which is expected behavior + // We need to catch the error since closing the transport will + // cause the pending toolPromise to reject with a ConnectionClosed error + await transport1.close(); + + // Try to cancel the promise, but ignore errors since it's already being handled + toolPromise.catch(err => { + // This error is expected - the connection was intentionally closed + if (err?.code !== -32000) { // ConnectionClosed error code + console.error("Unexpected error type during transport close:", err); + } + }); + + + // Create second client with same client ID + const client2 = new Client({ + id: clientId, + name: 'test-client', + version: '1.0.0' + }); + + // Set up notification handler for second client + client2.setNotificationHandler(LoggingMessageNotificationSchema, (notification: any) => { + if (notification.method === 'notifications/message') { + notifications.push(notification.params); + } + }); + + // Connect second client with same session ID + const transport2 = new StreamableHTTPClientTransport(baseUrl, { + sessionId + }); + await client2.connect(transport2); + + // Resume the notification stream using lastEventId + // This is the key part - we're resuming the same long-running tool using lastEventId + const resumedToolPromise = client2.request({ + method: 'tools/call', + params: { + name: 'run-notifications', + arguments: { + count: 5, + interval: 50 + } + } + }, CallToolResultSchema, { + lastEventId, // Pass the lastEventId from the previous session + onLastEventIdUpdate + }); + + // Wait for remaining notifications + await new Promise(resolve => setTimeout(resolve, 200)); + + // Verify we eventually received at leaset a few motifications + expect(notifications.length).toBeGreaterThan(2); + + // Verify the second client received notifications that the first client didn't + expect(notifications.length).toBeGreaterThan(firstClientNotificationCount); + + // Clean up + + await transport2.close(); + + }); +}); \ No newline at end of file From c21b9a0ac2af49a353ee856fe60bc4a974cdf214 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Mon, 14 Apr 2025 09:08:01 +0100 Subject: [PATCH 03/10] fix lint --- .../taskResumability.test.ts | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/integration-tests/taskResumability.test.ts b/src/integration-tests/taskResumability.test.ts index 9397d4a0..c63d3f04 100644 --- a/src/integration-tests/taskResumability.test.ts +++ b/src/integration-tests/taskResumability.test.ts @@ -5,7 +5,7 @@ import { Client } from '../client/index.js'; import { StreamableHTTPClientTransport } from '../client/streamableHttp.js'; import { McpServer } from '../server/mcp.js'; import { EventStore, StreamableHTTPServerTransport } from '../server/streamableHttp.js'; -import { CallToolResult, CallToolResultSchema, JSONRPCMessage, LoggingMessageNotificationSchema } from '../types.js'; +import { CallToolResultSchema, JSONRPCMessage, LoggingMessageNotificationSchema } from '../types.js'; import { z } from 'zod'; /** @@ -204,19 +204,18 @@ describe('Transport resumability', () => { it('should resume long-running notifications with lastEventId', async () => { // Create unique client ID for this test const clientId = 'test-client-long-running'; - const notifications: any[] = []; - let sessionId: string | undefined; + const notifications = []; let lastEventId: string | undefined; // Create first client - let client1 = new Client({ + const client1 = new Client({ id: clientId, name: 'test-client', version: '1.0.0' }); // Set up notification handler for first client - client1.setNotificationHandler(LoggingMessageNotificationSchema, (notification: any) => { + client1.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { if (notification.method === 'notifications/message') { notifications.push(notification.params); } @@ -225,14 +224,14 @@ describe('Transport resumability', () => { // Connect first client const transport1 = new StreamableHTTPClientTransport(baseUrl); await client1.connect(transport1); - sessionId = transport1.sessionId; + const sessionId = transport1.sessionId; expect(sessionId).toBeDefined(); // Start a long-running notification stream with tracking of lastEventId const onLastEventIdUpdate = jest.fn((eventId: string) => { lastEventId = eventId; }); - + expect(lastEventId).toBeUndefined(); // Start the notification tool with event tracking using request const toolPromise = client1.request({ method: 'tools/call', @@ -284,7 +283,7 @@ describe('Transport resumability', () => { }); // Set up notification handler for second client - client2.setNotificationHandler(LoggingMessageNotificationSchema, (notification: any) => { + client2.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { if (notification.method === 'notifications/message') { notifications.push(notification.params); } @@ -298,12 +297,12 @@ describe('Transport resumability', () => { // Resume the notification stream using lastEventId // This is the key part - we're resuming the same long-running tool using lastEventId - const resumedToolPromise = client2.request({ + await client2.request({ method: 'tools/call', params: { name: 'run-notifications', arguments: { - count: 5, + count: 1, interval: 50 } } @@ -312,9 +311,6 @@ describe('Transport resumability', () => { onLastEventIdUpdate }); - // Wait for remaining notifications - await new Promise(resolve => setTimeout(resolve, 200)); - // Verify we eventually received at leaset a few motifications expect(notifications.length).toBeGreaterThan(2); From 7aeb67988a5d4d5d3d065087f0d5cefb2fdfe767 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Mon, 14 Apr 2025 09:37:19 +0100 Subject: [PATCH 04/10] improve comments --- src/client/streamableHttp.test.ts | 10 +++++----- src/client/streamableHttp.ts | 10 +++++----- src/server/streamableHttp.ts | 3 ++- src/shared/protocol.ts | 7 +++---- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index a73927ac..2dfa5869 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -164,7 +164,7 @@ describe("StreamableHTTPClientTransport", () => { // We expect the 405 error to be caught and handled gracefully // This should not throw an error that breaks the transport await transport.start(); - await expect(transport["_startOrAuthStandaloneSSE"]()).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed"); + await expect(transport["_startOrAuthSse"]()).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed"); // Check that GET was attempted expect(global.fetch).toHaveBeenCalledWith( expect.anything(), @@ -208,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => { transport.onmessage = messageSpy; await transport.start(); - await transport["_startOrAuthStandaloneSSE"](); + await transport["_startOrAuthSse"](); // Give time for the SSE event to be processed await new Promise(resolve => setTimeout(resolve, 50)); @@ -313,9 +313,9 @@ describe("StreamableHTTPClientTransport", () => { await transport.start(); // Type assertion to access private method const transportWithPrivateMethods = transport as unknown as { - _startOrAuthStandaloneSSE: (lastEventId?: string) => Promise + _startOrAuthSse: (lastEventId?: string) => Promise }; - await transportWithPrivateMethods._startOrAuthStandaloneSSE("test-event-id"); + await transportWithPrivateMethods._startOrAuthSse("test-event-id"); // Verify fetch was called with the lastEventId header expect(fetchSpy).toHaveBeenCalled(); @@ -382,7 +382,7 @@ describe("StreamableHTTPClientTransport", () => { await transport.start(); - await transport["_startOrAuthStandaloneSSE"](); + await transport["_startOrAuthSse"](); expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue"); requestInit.headers["X-Custom-Header"] = "SecondCustomValue"; diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 77acc9c4..43030239 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -123,7 +123,7 @@ export class StreamableHTTPClientTransport implements Transport { throw new UnauthorizedError(); } - return await this._startOrAuthStandaloneSSE(); + return await this._startOrAuthSse(); } private async _commonHeaders(): Promise { @@ -144,7 +144,7 @@ export class StreamableHTTPClientTransport implements Transport { ); } - private async _startOrAuthStandaloneSSE(lastEventId?: string): Promise { + private async _startOrAuthSse(lastEventId?: string): Promise { try { // Try to open an initial SSE stream with GET to listen for server messages // This is optional according to the spec - server may not support it @@ -238,7 +238,7 @@ export class StreamableHTTPClientTransport implements Transport { // Schedule the reconnection setTimeout(() => { // Use the last event ID to resume where we left off - this._startOrAuthStandaloneSSE(lastEventId).catch(error => { + this._startOrAuthSse(lastEventId).catch(error => { this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); // Schedule another attempt if this one failed, incrementing the attempt counter this._scheduleReconnection(lastEventId, attemptCount + 1); @@ -343,7 +343,7 @@ export class StreamableHTTPClientTransport implements Transport { const { lastEventId, onLastEventIdUpdate } = options ?? {}; if (lastEventId) { // If we have at last event ID, we need to reconnect the SSE stream - this._startOrAuthStandaloneSSE(lastEventId).catch(err => this.onerror?.(err)); + this._startOrAuthSse(lastEventId).catch(err => this.onerror?.(err)); return; } @@ -390,7 +390,7 @@ export class StreamableHTTPClientTransport implements Transport { // if it's supported by the server if (isJSONRPCNotification(message) && message.method === "notifications/initialized") { // Start without a lastEventId since this is a fresh connection - this._startOrAuthStandaloneSSE().catch(err => this.onerror?.(err)); + this._startOrAuthSse().catch(err => this.onerror?.(err)); } return; } diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 7740a08f..0487e1af 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -346,7 +346,8 @@ export class StreamableHTTPServerTransport implements Transport { ); const mcpSessionId = req.headers["mcp-session-id"] as string | undefined; if (isInitializationRequest) { - // if generateSessionId is not set, the server does not support session management + // If it's a server with session management and the session ID is already set we should reject the request + // to avoid re-initialization. if (this._initialized && this.sessionId !== undefined && mcpSessionId !== this.sessionId) { res.writeHead(400).end(JSON.stringify({ jsonrpc: "2.0", diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index 6193f5f1..bc7882e4 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -87,16 +87,15 @@ export type RequestOptions = { * May be used to indicate to the transport which incoming request to associate this outgoing request with. */ relatedRequestId?: RequestId; - /** - * May be used to indicate to the transport which last event ID to associate this outgoing request with. - * This is used to resume a long-running requests that may have been interrupted and a new instance of a client is being created. + * The last event ID to associate with the request. + * Used to resume long-running requests that were interrupted when creating a new client instance. */ lastEventId?: string; /** * A callback that is invoked when the last event ID is updated. - * This is used to notidy the client that the last event ID has changed, so that client can update its state accordingly. + * This is used to notify the client that the last event ID has changed, so the client can update its state accordingly. */ onLastEventIdUpdate?: (event: string) => void; }; From 1359e9fe6a69913a450a5cc88ebb480af8551f51 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 15 Apr 2025 11:42:16 +0100 Subject: [PATCH 05/10] pull transport send options into own named type --- src/client/streamableHttp.ts | 6 +++-- src/examples/client/simpleStreamableHttp.ts | 2 +- .../taskResumability.test.ts | 8 +++---- src/shared/protocol.ts | 21 +++++++++-------- src/shared/transport.ts | 23 ++++++++++++++++++- 5 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 63677d55..c1c93ebe 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -88,6 +88,7 @@ export type StreamableHTTPClientTransportOptions = { * Options to configure the reconnection behavior. */ reconnectionOptions?: StreamableHTTPReconnectionOptions; + /** * Session ID for the connection. This is used to identify the session on the server. * When not provided and connecting to a server that supports session IDs, the server will generate a new session ID. @@ -345,10 +346,11 @@ export class StreamableHTTPClientTransport implements Transport { this.onclose?.(); } - async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { lastEventId?: string, onLastEventIdUpdate?: (event: string) => void }): Promise { + async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { resumptionToken?: string, onresumptiontoken?: (event: string) => void }): Promise { try { // If client passes in a lastEventId in the request options, we need to reconnect the SSE stream - const { lastEventId, onLastEventIdUpdate } = options ?? {}; + const lastEventId = options?.resumptionToken + const onLastEventIdUpdate = options?.onresumptiontoken; if (lastEventId) { // If we have at last event ID, we need to reconnect the SSE stream this._startOrAuthSse({ lastEventId }).catch(err => this.onerror?.(err)); diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index 922aa0ce..6cfad987 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -302,7 +302,7 @@ async function callTool(name: string, args: Record): Promise { } } }, CallToolResultSchema, { - lastEventId, - onLastEventIdUpdate + resumptionToken: lastEventId, + onresumptiontoken: onLastEventIdUpdate }); // Wait for some notifications to arrive (not all) @@ -311,8 +311,8 @@ describe('Transport resumability', () => { } } }, CallToolResultSchema, { - lastEventId, // Pass the lastEventId from the previous session - onLastEventIdUpdate + resumptionToken: lastEventId, // Pass the lastEventId from the previous session + onresumptiontoken: onLastEventIdUpdate }); // Verify we eventually received at leaset a few motifications diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index bc7882e4..3b098193 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -87,17 +87,20 @@ export type RequestOptions = { * May be used to indicate to the transport which incoming request to associate this outgoing request with. */ relatedRequestId?: RequestId; + /** - * The last event ID to associate with the request. - * Used to resume long-running requests that were interrupted when creating a new client instance. + * The resumption token used to continue long-running requests that were interrupted. + * + * This allows clients to reconnect and continue from where they left off, if supported by the transport. */ - lastEventId?: string; + resumptionToken?: string; /** - * A callback that is invoked when the last event ID is updated. - * This is used to notify the client that the last event ID has changed, so the client can update its state accordingly. + * A callback that is invoked when the resumption token changes, if supported by the transport. + * + * This allows clients to persist the latest token for potential reconnection. */ - onLastEventIdUpdate?: (event: string) => void; + onresumptiontoken?: (token: string) => void; }; /** @@ -512,7 +515,7 @@ export abstract class Protocol< resultSchema: T, options?: RequestOptions, ): Promise> { - const { relatedRequestId, lastEventId, onLastEventIdUpdate } = options ?? {}; + const { relatedRequestId, resumptionToken, onresumptiontoken } = options ?? {}; return new Promise((resolve, reject) => { if (!this._transport) { @@ -554,7 +557,7 @@ export abstract class Protocol< requestId: messageId, reason: String(reason), }, - }, { relatedRequestId, lastEventId, onLastEventIdUpdate }) + }, { relatedRequestId, resumptionToken, onresumptiontoken }) .catch((error) => this._onerror(new Error(`Failed to send cancellation: ${error}`)), ); @@ -592,7 +595,7 @@ export abstract class Protocol< this._setupTimeout(messageId, timeout, options?.maxTotalTimeout, timeoutHandler, options?.resetTimeoutOnProgress ?? false); - this._transport.send(jsonrpcRequest, { relatedRequestId, lastEventId, onLastEventIdUpdate }).catch((error) => { + this._transport.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch((error) => { this._cleanupTimeout(messageId); reject(error); }); diff --git a/src/shared/transport.ts b/src/shared/transport.ts index cfbe8461..601a86af 100644 --- a/src/shared/transport.ts +++ b/src/shared/transport.ts @@ -1,5 +1,26 @@ import { JSONRPCMessage, RequestId } from "../types.js"; +/** + * Options for sending a JSON-RPC message. + */ +export interface TransportSendOptions { + /** + * If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with. + */ + relatedRequestId?: RequestId; + /** + * The resumption token used to continue long-running requests that were interrupted. + * + * This allows clients to reconnect and continue from where they left off, if supported by the transport. + */ + resumptionToken?: string; + /** + * A callback that is invoked when the resumption token changes, if supported by the transport. + * + * This allows clients to persist the latest token for potential reconnection. + */ + onresumptiontoken?: (token: string) => void; +} /** * Describes the minimal contract for a MCP transport that a client or server can communicate over. */ @@ -18,7 +39,7 @@ export interface Transport { * * If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with. */ - send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId, lastEventId?: string, onLastEventIdUpdate?: (event: string) => void }): Promise; + send(message: JSONRPCMessage, options?: TransportSendOptions): Promise; /** * Closes the connection. From 54d6929149387f197c4554115b9242f31e6dbe7c Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 15 Apr 2025 11:58:43 +0100 Subject: [PATCH 06/10] don't need to re-initialize when we reconnect a session --- src/client/index.ts | 6 +++++- src/server/streamableHttp.ts | 5 ++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index 64ea62c4..a3edd0be 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -126,7 +126,11 @@ export class Client< override async connect(transport: Transport, options?: RequestOptions): Promise { await super.connect(transport); - + // When transport sessionId is already set this means we are trying to reconnect. + // In this case we don't need to initialize again. + if (transport.sessionId !== undefined) { + return; + } try { const result = await this.request( { diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 012458f2..fed85ad9 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -331,11 +331,10 @@ export class StreamableHTTPServerTransport implements Transport { const isInitializationRequest = messages.some( msg => 'method' in msg && msg.method === 'initialize' ); - const mcpSessionId = req.headers["mcp-session-id"] as string | undefined; if (isInitializationRequest) { // If it's a server with session management and the session ID is already set we should reject the request // to avoid re-initialization. - if (this._initialized && this.sessionId !== undefined && mcpSessionId !== this.sessionId) { + if (this._initialized) { res.writeHead(400).end(JSON.stringify({ jsonrpc: "2.0", error: { @@ -357,7 +356,7 @@ export class StreamableHTTPServerTransport implements Transport { })); return; } - this.sessionId = mcpSessionId ?? this.sessionIdGenerator(); + this.sessionId = this.sessionIdGenerator(); this._initialized = true; } From f1e6acb17d35ccaaef2c156f24c786d0cc123de5 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 15 Apr 2025 16:37:31 +0100 Subject: [PATCH 07/10] resumed tool to return value --- src/client/streamableHttp.ts | 31 +++++++++++++------ src/examples/client/simpleStreamableHttp.ts | 2 +- .../taskResumability.test.ts | 29 ++++++++--------- 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index c1c93ebe..770b5d72 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -1,5 +1,5 @@ import { Transport } from "../shared/transport.js"; -import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; +import { isJSONRPCNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; import { EventSourceParserStream } from "eventsource-parser/stream"; @@ -28,6 +28,14 @@ export interface StartSSEOptions { * The ID of the last received event, used for resuming a disconnected stream */ lastEventId?: string; + /** + * The callback function that is invoked when the last event ID changes + */ + onLastEventIdUpdate?: (event: string) => void + /** + * When reconnecting to a long-running SSE stream, we need to make sure that message id matches + */ + replayMessageId?: string | number; } /** @@ -200,7 +208,7 @@ export class StreamableHTTPClientTransport implements Transport { ); } - this._handleSseStream(response.body); + this._handleSseStream(response.body, options); } catch (error) { this.onerror?.(error as Error); throw error; @@ -231,7 +239,7 @@ export class StreamableHTTPClientTransport implements Transport { * @param lastEventId The ID of the last received event for resumability * @param attemptCount Current reconnection attempt count for this specific stream */ - private _scheduleReconnection(lastEventId: string, attemptCount = 0): void { + private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0): void { // Use provided options or default options const maxRetries = this._reconnectionOptions.maxRetries; @@ -247,18 +255,19 @@ export class StreamableHTTPClientTransport implements Transport { // Schedule the reconnection setTimeout(() => { // Use the last event ID to resume where we left off - this._startOrAuthSse({ lastEventId }).catch(error => { + this._startOrAuthSse(options).catch(error => { this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); // Schedule another attempt if this one failed, incrementing the attempt counter - this._scheduleReconnection(lastEventId, attemptCount + 1); + this._scheduleReconnection(options, attemptCount + 1); }); }, delay); } - private _handleSseStream(stream: ReadableStream | null, onLastEventIdUpdate?: (event: string) => void): void { + private _handleSseStream(stream: ReadableStream | null, options: StartSSEOptions): void { if (!stream) { return; } + const { onLastEventIdUpdate, replayMessageId } = options; let lastEventId: string | undefined; const processStream = async () => { @@ -287,6 +296,9 @@ export class StreamableHTTPClientTransport implements Transport { if (!event.event || event.event === "message") { try { const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); + if (replayMessageId !== undefined && isJSONRPCResponse(message)) { + message.id = replayMessageId; + } this.onmessage?.(message); } catch (error) { this.onerror?.(error as Error); @@ -302,7 +314,7 @@ export class StreamableHTTPClientTransport implements Transport { // Use the exponential backoff reconnection strategy if (lastEventId !== undefined) { try { - this._scheduleReconnection(lastEventId, 0); + this._scheduleReconnection(options, 0); } catch (error) { this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`)); @@ -352,8 +364,9 @@ export class StreamableHTTPClientTransport implements Transport { const lastEventId = options?.resumptionToken const onLastEventIdUpdate = options?.onresumptiontoken; if (lastEventId) { + // If we have at last event ID, we need to reconnect the SSE stream - this._startOrAuthSse({ lastEventId }).catch(err => this.onerror?.(err)); + this._startOrAuthSse({ lastEventId, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch(err => this.onerror?.(err)); return; } @@ -418,7 +431,7 @@ export class StreamableHTTPClientTransport implements Transport { // Handle SSE stream responses for requests // We use the same handler as standalone streams, which now supports // reconnection with the last event ID - this._handleSseStream(response.body, onLastEventIdUpdate); + this._handleSseStream(response.body, { onLastEventIdUpdate }); } else if (contentType?.includes("application/json")) { // For non-streaming servers, we might get direct JSON responses const data = await response.json(); diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index 6cfad987..d0d5408b 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -111,7 +111,7 @@ function commandLoop(): void { case 'start-notifications': { const interval = args[1] ? parseInt(args[1], 10) : 2000; - const count = args[2] ? parseInt(args[2], 10) : 0; + const count = args[2] ? parseInt(args[2], 10) : 10; await startNotifications(interval, count); break; } diff --git a/src/integration-tests/taskResumability.test.ts b/src/integration-tests/taskResumability.test.ts index 7a53ee01..7101925f 100644 --- a/src/integration-tests/taskResumability.test.ts +++ b/src/integration-tests/taskResumability.test.ts @@ -242,7 +242,7 @@ describe('Transport resumability', () => { params: { name: 'run-notifications', arguments: { - count: 5, + count: 3, interval: 10 } } @@ -251,27 +251,23 @@ describe('Transport resumability', () => { onresumptiontoken: onLastEventIdUpdate }); - // Wait for some notifications to arrive (not all) + // Wait for some notifications to arrive (not all) - shorter wait time await new Promise(resolve => setTimeout(resolve, 20)); // Verify we received some notifications and lastEventId was updated expect(notifications.length).toBeGreaterThan(0); - expect(notifications.length).toBeLessThan(5); + expect(notifications.length).toBeLessThan(4); expect(onLastEventIdUpdate).toHaveBeenCalled(); expect(lastEventId).toBeDefined(); // Store original notification count for later comparison const firstClientNotificationCount = notifications.length; - // Disconnect first client without waiting for completion // When we close the connection, it will cause a ConnectionClosed error for // any in-progress requests, which is expected behavior - // We need to catch the error since closing the transport will - // cause the pending toolPromise to reject with a ConnectionClosed error await transport1.close(); - - // Try to cancel the promise, but ignore errors since it's already being handled - toolPromise.catch(err => { + // Save the promise so we can catch it after closing + const catchPromise = toolPromise.catch(err => { // This error is expected - the connection was intentionally closed if (err?.code !== -32000) { // ConnectionClosed error code console.error("Unexpected error type during transport close:", err); @@ -279,6 +275,14 @@ describe('Transport resumability', () => { }); + + // Add a short delay to ensure clean disconnect before reconnecting + await new Promise(resolve => setTimeout(resolve, 10)); + + // Wait for the rejection to be handled + await catchPromise; + + // Create second client with same client ID const client2 = new Client({ id: clientId, @@ -307,7 +311,7 @@ describe('Transport resumability', () => { name: 'run-notifications', arguments: { count: 1, - interval: 50 + interval: 5 } } }, CallToolResultSchema, { @@ -316,13 +320,10 @@ describe('Transport resumability', () => { }); // Verify we eventually received at leaset a few motifications - expect(notifications.length).toBeGreaterThan(2); + expect(notifications.length).toBeGreaterThan(1); - // Verify the second client received notifications that the first client didn't - expect(notifications.length).toBeGreaterThan(firstClientNotificationCount); // Clean up - await transport2.close(); }); From f900592a2f7634555c56fd9307f567fc10c0160d Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 15 Apr 2025 16:43:24 +0100 Subject: [PATCH 08/10] lint --- src/integration-tests/taskResumability.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/integration-tests/taskResumability.test.ts b/src/integration-tests/taskResumability.test.ts index 7101925f..3bd23498 100644 --- a/src/integration-tests/taskResumability.test.ts +++ b/src/integration-tests/taskResumability.test.ts @@ -260,8 +260,7 @@ describe('Transport resumability', () => { expect(onLastEventIdUpdate).toHaveBeenCalled(); expect(lastEventId).toBeDefined(); - // Store original notification count for later comparison - const firstClientNotificationCount = notifications.length; + // Disconnect first client without waiting for completion // When we close the connection, it will cause a ConnectionClosed error for // any in-progress requests, which is expected behavior From f1c5ef1a8d9f5cec8e465f1d8f7571b21048ed2e Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 15 Apr 2025 20:40:26 +0100 Subject: [PATCH 09/10] apply suggested changes --- src/client/streamableHttp.ts | 9 ++++++--- src/shared/protocol.ts | 23 ++--------------------- src/shared/transport.ts | 10 ++++++---- 3 files changed, 14 insertions(+), 28 deletions(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 770b5d72..44bda015 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -23,17 +23,20 @@ export class StreamableHTTPError extends Error { /** * Options for starting or authenticating an SSE connection */ -export interface StartSSEOptions { +interface StartSSEOptions { /** * The ID of the last received event, used for resuming a disconnected stream */ lastEventId?: string; + /** * The callback function that is invoked when the last event ID changes */ onLastEventIdUpdate?: (event: string) => void + /** - * When reconnecting to a long-running SSE stream, we need to make sure that message id matches + * Override Message ID to associate with the replay message + * so that response can be associate with the new resumed request. */ replayMessageId?: string | number; } @@ -358,7 +361,7 @@ export class StreamableHTTPClientTransport implements Transport { this.onclose?.(); } - async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { resumptionToken?: string, onresumptiontoken?: (event: string) => void }): Promise { + async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { resumptionToken?: string, onresumptiontoken?: (token: string) => void }): Promise { try { // If client passes in a lastEventId in the request options, we need to reconnect the SSE stream const lastEventId = options?.resumptionToken diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index 3b098193..289c68d3 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -22,7 +22,7 @@ import { Result, ServerCapabilities, } from "../types.js"; -import { Transport } from "./transport.js"; +import { Transport, TransportSendOptions } from "./transport.js"; /** * Callback for progress notifications. @@ -82,26 +82,7 @@ export type RequestOptions = { * If not specified, there is no maximum total timeout. */ maxTotalTimeout?: number; - - /** - * May be used to indicate to the transport which incoming request to associate this outgoing request with. - */ - relatedRequestId?: RequestId; - - /** - * The resumption token used to continue long-running requests that were interrupted. - * - * This allows clients to reconnect and continue from where they left off, if supported by the transport. - */ - resumptionToken?: string; - - /** - * A callback that is invoked when the resumption token changes, if supported by the transport. - * - * This allows clients to persist the latest token for potential reconnection. - */ - onresumptiontoken?: (token: string) => void; -}; +} & TransportSendOptions; /** * Options that can be given per notification. diff --git a/src/shared/transport.ts b/src/shared/transport.ts index 601a86af..5d559674 100644 --- a/src/shared/transport.ts +++ b/src/shared/transport.ts @@ -1,19 +1,21 @@ import { JSONRPCMessage, RequestId } from "../types.js"; /** - * Options for sending a JSON-RPC message. - */ -export interface TransportSendOptions { + * Options for sending a JSON-RPC message. + */ +export type TransportSendOptions = { /** * If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with. - */ + */ relatedRequestId?: RequestId; + /** * The resumption token used to continue long-running requests that were interrupted. * * This allows clients to reconnect and continue from where they left off, if supported by the transport. */ resumptionToken?: string; + /** * A callback that is invoked when the resumption token changes, if supported by the transport. * From 8bc49f9f347e99a7c2dc0b8b69b3d02982bd9481 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 15 Apr 2025 22:06:10 +0100 Subject: [PATCH 10/10] change names to resumptionToken instead of last event id --- src/client/streamableHttp.test.ts | 4 +-- src/client/streamableHttp.ts | 42 ++++++++++++++++++------------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 22958582..93e44150 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -313,9 +313,9 @@ describe("StreamableHTTPClientTransport", () => { await transport.start(); // Type assertion to access private method const transportWithPrivateMethods = transport as unknown as { - _startOrAuthSse: (options: { lastEventId?: string }) => Promise + _startOrAuthSse: (options: { resumptionToken?: string }) => Promise }; - await transportWithPrivateMethods._startOrAuthSse({ lastEventId: "test-event-id" }); + await transportWithPrivateMethods._startOrAuthSse({ resumptionToken: "test-event-id" }); // Verify fetch was called with the lastEventId header expect(fetchSpy).toHaveBeenCalled(); diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 44bda015..077b0f15 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -25,14 +25,18 @@ export class StreamableHTTPError extends Error { */ interface StartSSEOptions { /** - * The ID of the last received event, used for resuming a disconnected stream + * The resumption token used to continue long-running requests that were interrupted. + * + * This allows clients to reconnect and continue from where they left off. */ - lastEventId?: string; + resumptionToken?: string; /** - * The callback function that is invoked when the last event ID changes + * A callback that is invoked when the resumption token changes. + * + * This allows clients to persist the latest token for potential reconnection. */ - onLastEventIdUpdate?: (event: string) => void + onresumptiontoken?: (token: string) => void; /** * Override Message ID to associate with the replay message @@ -152,7 +156,7 @@ export class StreamableHTTPClientTransport implements Transport { throw new UnauthorizedError(); } - return await this._startOrAuthSse({ lastEventId: undefined }); + return await this._startOrAuthSse({ resumptionToken: undefined }); } private async _commonHeaders(): Promise { @@ -175,7 +179,7 @@ export class StreamableHTTPClientTransport implements Transport { private async _startOrAuthSse(options: StartSSEOptions): Promise { - const { lastEventId } = options; + const { resumptionToken } = options; try { // Try to open an initial SSE stream with GET to listen for server messages // This is optional according to the spec - server may not support it @@ -183,8 +187,8 @@ export class StreamableHTTPClientTransport implements Transport { headers.set("Accept", "text/event-stream"); // Include Last-Event-ID header for resumable streams if provided - if (lastEventId) { - headers.set("last-event-id", lastEventId); + if (resumptionToken) { + headers.set("last-event-id", resumptionToken); } const response = await fetch(this._url, { @@ -270,7 +274,7 @@ export class StreamableHTTPClientTransport implements Transport { if (!stream) { return; } - const { onLastEventIdUpdate, replayMessageId } = options; + const { onresumptiontoken, replayMessageId } = options; let lastEventId: string | undefined; const processStream = async () => { @@ -293,7 +297,7 @@ export class StreamableHTTPClientTransport implements Transport { // Update last event ID if provided if (event.id) { lastEventId = event.id; - onLastEventIdUpdate?.(lastEventId); + onresumptiontoken?.(event.id); } if (!event.event || event.event === "message") { @@ -317,7 +321,11 @@ export class StreamableHTTPClientTransport implements Transport { // Use the exponential backoff reconnection strategy if (lastEventId !== undefined) { try { - this._scheduleReconnection(options, 0); + this._scheduleReconnection({ + resumptionToken: lastEventId, + onresumptiontoken, + replayMessageId + }, 0); } catch (error) { this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`)); @@ -363,13 +371,11 @@ export class StreamableHTTPClientTransport implements Transport { async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { resumptionToken?: string, onresumptiontoken?: (token: string) => void }): Promise { try { - // If client passes in a lastEventId in the request options, we need to reconnect the SSE stream - const lastEventId = options?.resumptionToken - const onLastEventIdUpdate = options?.onresumptiontoken; - if (lastEventId) { + const { resumptionToken, onresumptiontoken } = options || {}; + if (resumptionToken) { // If we have at last event ID, we need to reconnect the SSE stream - this._startOrAuthSse({ lastEventId, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch(err => this.onerror?.(err)); + this._startOrAuthSse({ resumptionToken, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch(err => this.onerror?.(err)); return; } @@ -416,7 +422,7 @@ export class StreamableHTTPClientTransport implements Transport { // if it's supported by the server if (isJSONRPCNotification(message) && message.method === "notifications/initialized") { // Start without a lastEventId since this is a fresh connection - this._startOrAuthSse({ lastEventId: undefined }).catch(err => this.onerror?.(err)); + this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err)); } return; } @@ -434,7 +440,7 @@ export class StreamableHTTPClientTransport implements Transport { // Handle SSE stream responses for requests // We use the same handler as standalone streams, which now supports // reconnection with the last event ID - this._handleSseStream(response.body, { onLastEventIdUpdate }); + this._handleSseStream(response.body, { onresumptiontoken }); } else if (contentType?.includes("application/json")) { // For non-streaming servers, we might get direct JSON responses const data = await response.json();