Skip to content

Commit 9ed2254

Browse files
authored
Merge pull request #332 from modelcontextprotocol/resumability
Streamable HTTP - resume streams on disconnect
2 parents 2f5fc17 + b062d75 commit 9ed2254

File tree

6 files changed

+1037
-183
lines changed

6 files changed

+1037
-183
lines changed

src/client/streamableHttp.test.ts

+81-30
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { StreamableHTTPClientTransport } from "./streamableHttp.js";
1+
import { StreamableHTTPClientTransport, StreamableHTTPReconnectionOptions } from "./streamableHttp.js";
22
import { JSONRPCMessage } from "../types.js";
33

44

@@ -164,7 +164,7 @@ describe("StreamableHTTPClientTransport", () => {
164164
// We expect the 405 error to be caught and handled gracefully
165165
// This should not throw an error that breaks the transport
166166
await transport.start();
167-
await expect(transport["_startOrAuthStandaloneSSE"]()).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
167+
await expect(transport["_startOrAuthStandaloneSSE"]({})).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
168168
// Check that GET was attempted
169169
expect(global.fetch).toHaveBeenCalledWith(
170170
expect.anything(),
@@ -208,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => {
208208
transport.onmessage = messageSpy;
209209

210210
await transport.start();
211-
await transport["_startOrAuthStandaloneSSE"]();
211+
await transport["_startOrAuthStandaloneSSE"]({});
212212

213213
// Give time for the SSE event to be processed
214214
await new Promise(resolve => setTimeout(resolve, 50));
@@ -275,45 +275,62 @@ describe("StreamableHTTPClientTransport", () => {
275275
})).toBe(true);
276276
});
277277

278-
it("should include last-event-id header when resuming a broken connection", async () => {
279-
// First make a successful connection that provides an event ID
280-
const encoder = new TextEncoder();
281-
const stream = new ReadableStream({
282-
start(controller) {
283-
const event = "id: event-123\nevent: message\ndata: {\"jsonrpc\": \"2.0\", \"method\": \"serverNotification\", \"params\": {}}\n\n";
284-
controller.enqueue(encoder.encode(event));
285-
controller.close();
278+
it("should support custom reconnection options", () => {
279+
// Create a transport with custom reconnection options
280+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
281+
reconnectionOptions: {
282+
initialReconnectionDelay: 500,
283+
maxReconnectionDelay: 10000,
284+
reconnectionDelayGrowFactor: 2,
285+
maxRetries: 5,
286286
}
287287
});
288288

289-
(global.fetch as jest.Mock).mockResolvedValueOnce({
290-
ok: true,
291-
status: 200,
292-
headers: new Headers({ "content-type": "text/event-stream" }),
293-
body: stream
294-
});
289+
// Verify options were set correctly (checking implementation details)
290+
// Access private properties for testing
291+
const transportInstance = transport as unknown as {
292+
_reconnectionOptions: StreamableHTTPReconnectionOptions;
293+
};
294+
expect(transportInstance._reconnectionOptions.initialReconnectionDelay).toBe(500);
295+
expect(transportInstance._reconnectionOptions.maxRetries).toBe(5);
296+
});
295297

296-
await transport.start();
297-
await transport["_startOrAuthStandaloneSSE"]();
298-
await new Promise(resolve => setTimeout(resolve, 50));
298+
it("should pass lastEventId when reconnecting", async () => {
299+
// Create a fresh transport
300+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"));
299301

300-
// Now simulate attempting to reconnect
301-
(global.fetch as jest.Mock).mockResolvedValueOnce({
302+
// Mock fetch to verify headers sent
303+
const fetchSpy = global.fetch as jest.Mock;
304+
fetchSpy.mockReset();
305+
fetchSpy.mockResolvedValue({
302306
ok: true,
303307
status: 200,
304308
headers: new Headers({ "content-type": "text/event-stream" }),
305-
body: null
309+
body: new ReadableStream()
306310
});
307311

308-
await transport["_startOrAuthStandaloneSSE"]();
312+
// Call the reconnect method directly with a lastEventId
313+
await transport.start();
314+
// Type assertion to access private method
315+
const transportWithPrivateMethods = transport as unknown as {
316+
_startOrAuthStandaloneSSE: (options: { lastEventId?: string }) => Promise<void>
317+
};
318+
await transportWithPrivateMethods._startOrAuthStandaloneSSE({ lastEventId: "test-event-id" });
309319

310-
// Check that Last-Event-ID was included
311-
const calls = (global.fetch as jest.Mock).mock.calls;
312-
const lastCall = calls[calls.length - 1];
313-
expect(lastCall[1].headers.get("last-event-id")).toBe("event-123");
320+
// Verify fetch was called with the lastEventId header
321+
expect(fetchSpy).toHaveBeenCalled();
322+
const fetchCall = fetchSpy.mock.calls[0];
323+
const headers = fetchCall[1].headers;
324+
expect(headers.get("last-event-id")).toBe("test-event-id");
314325
});
315326

316327
it("should throw error when invalid content-type is received", async () => {
328+
// Clear any previous state from other tests
329+
jest.clearAllMocks();
330+
331+
// Create a fresh transport instance
332+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"));
333+
317334
const message: JSONRPCMessage = {
318335
jsonrpc: "2.0",
319336
method: "test",
@@ -323,7 +340,7 @@ describe("StreamableHTTPClientTransport", () => {
323340

324341
const stream = new ReadableStream({
325342
start(controller) {
326-
controller.enqueue("invalid text response");
343+
controller.enqueue(new TextEncoder().encode("invalid text response"));
327344
controller.close();
328345
}
329346
});
@@ -365,7 +382,7 @@ describe("StreamableHTTPClientTransport", () => {
365382

366383
await transport.start();
367384

368-
await transport["_startOrAuthStandaloneSSE"]();
385+
await transport["_startOrAuthStandaloneSSE"]({});
369386
expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue");
370387

371388
requestInit.headers["X-Custom-Header"] = "SecondCustomValue";
@@ -375,4 +392,38 @@ describe("StreamableHTTPClientTransport", () => {
375392

376393
expect(global.fetch).toHaveBeenCalledTimes(2);
377394
});
395+
396+
397+
it("should have exponential backoff with configurable maxRetries", () => {
398+
// This test verifies the maxRetries and backoff calculation directly
399+
400+
// Create transport with specific options for testing
401+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
402+
reconnectionOptions: {
403+
initialReconnectionDelay: 100,
404+
maxReconnectionDelay: 5000,
405+
reconnectionDelayGrowFactor: 2,
406+
maxRetries: 3,
407+
}
408+
});
409+
410+
// Get access to the internal implementation
411+
const getDelay = transport["_getNextReconnectionDelay"].bind(transport);
412+
413+
// First retry - should use initial delay
414+
expect(getDelay(0)).toBe(100);
415+
416+
// Second retry - should double (2^1 * 100 = 200)
417+
expect(getDelay(1)).toBe(200);
418+
419+
// Third retry - should double again (2^2 * 100 = 400)
420+
expect(getDelay(2)).toBe(400);
421+
422+
// Fourth retry - should double again (2^3 * 100 = 800)
423+
expect(getDelay(3)).toBe(800);
424+
425+
// Tenth retry - should be capped at maxReconnectionDelay
426+
expect(getDelay(10)).toBe(5000);
427+
});
428+
378429
});

0 commit comments

Comments
 (0)