Skip to content

Streamable HTTP - resume streams on disconnect #332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 15, 2025
111 changes: 81 additions & 30 deletions src/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { StreamableHTTPClientTransport } from "./streamableHttp.js";
import { StreamableHTTPClientTransport, StreamableHTTPReconnectionOptions } from "./streamableHttp.js";
import { JSONRPCMessage } from "../types.js";


Expand Down Expand Up @@ -164,7 +164,7 @@ describe("StreamableHTTPClientTransport", () => {
// We expect the 405 error to be caught and handled gracefully
// This should not throw an error that breaks the transport
await transport.start();
await expect(transport["_startOrAuthStandaloneSSE"]()).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
await expect(transport["_startOrAuthStandaloneSSE"]({})).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
// Check that GET was attempted
expect(global.fetch).toHaveBeenCalledWith(
expect.anything(),
Expand Down Expand Up @@ -208,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => {
transport.onmessage = messageSpy;

await transport.start();
await transport["_startOrAuthStandaloneSSE"]();
await transport["_startOrAuthStandaloneSSE"]({});

// Give time for the SSE event to be processed
await new Promise(resolve => setTimeout(resolve, 50));
Expand Down Expand Up @@ -275,45 +275,62 @@ describe("StreamableHTTPClientTransport", () => {
})).toBe(true);
});

it("should include last-event-id header when resuming a broken connection", async () => {
// First make a successful connection that provides an event ID
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
const event = "id: event-123\nevent: message\ndata: {\"jsonrpc\": \"2.0\", \"method\": \"serverNotification\", \"params\": {}}\n\n";
controller.enqueue(encoder.encode(event));
controller.close();
it("should support custom reconnection options", () => {
// Create a transport with custom reconnection options
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
reconnectionOptions: {
initialReconnectionDelay: 500,
maxReconnectionDelay: 10000,
reconnectionDelayGrowFactor: 2,
maxRetries: 5,
}
});

(global.fetch as jest.Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream" }),
body: stream
});
// Verify options were set correctly (checking implementation details)
// Access private properties for testing
const transportInstance = transport as unknown as {
_reconnectionOptions: StreamableHTTPReconnectionOptions;
};
expect(transportInstance._reconnectionOptions.initialReconnectionDelay).toBe(500);
expect(transportInstance._reconnectionOptions.maxRetries).toBe(5);
});

await transport.start();
await transport["_startOrAuthStandaloneSSE"]();
await new Promise(resolve => setTimeout(resolve, 50));
it("should pass lastEventId when reconnecting", async () => {
// Create a fresh transport
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"));

// Now simulate attempting to reconnect
(global.fetch as jest.Mock).mockResolvedValueOnce({
// Mock fetch to verify headers sent
const fetchSpy = global.fetch as jest.Mock;
fetchSpy.mockReset();
fetchSpy.mockResolvedValue({
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream" }),
body: null
body: new ReadableStream()
});

await transport["_startOrAuthStandaloneSSE"]();
// Call the reconnect method directly with a lastEventId
await transport.start();
// Type assertion to access private method
const transportWithPrivateMethods = transport as unknown as {
_startOrAuthStandaloneSSE: (options: { lastEventId?: string }) => Promise<void>
};
await transportWithPrivateMethods._startOrAuthStandaloneSSE({ lastEventId: "test-event-id" });

// Check that Last-Event-ID was included
const calls = (global.fetch as jest.Mock).mock.calls;
const lastCall = calls[calls.length - 1];
expect(lastCall[1].headers.get("last-event-id")).toBe("event-123");
// Verify fetch was called with the lastEventId header
expect(fetchSpy).toHaveBeenCalled();
const fetchCall = fetchSpy.mock.calls[0];
const headers = fetchCall[1].headers;
expect(headers.get("last-event-id")).toBe("test-event-id");
});

it("should throw error when invalid content-type is received", async () => {
// Clear any previous state from other tests
jest.clearAllMocks();

// Create a fresh transport instance
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"));

const message: JSONRPCMessage = {
jsonrpc: "2.0",
method: "test",
Expand All @@ -323,7 +340,7 @@ describe("StreamableHTTPClientTransport", () => {

const stream = new ReadableStream({
start(controller) {
controller.enqueue("invalid text response");
controller.enqueue(new TextEncoder().encode("invalid text response"));
controller.close();
}
});
Expand Down Expand Up @@ -365,7 +382,7 @@ describe("StreamableHTTPClientTransport", () => {

await transport.start();

await transport["_startOrAuthStandaloneSSE"]();
await transport["_startOrAuthStandaloneSSE"]({});
expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue");

requestInit.headers["X-Custom-Header"] = "SecondCustomValue";
Expand All @@ -375,4 +392,38 @@ describe("StreamableHTTPClientTransport", () => {

expect(global.fetch).toHaveBeenCalledTimes(2);
});


it("should have exponential backoff with configurable maxRetries", () => {
// This test verifies the maxRetries and backoff calculation directly

// Create transport with specific options for testing
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
reconnectionOptions: {
initialReconnectionDelay: 100,
maxReconnectionDelay: 5000,
reconnectionDelayGrowFactor: 2,
maxRetries: 3,
}
});

// Get access to the internal implementation
const getDelay = transport["_getNextReconnectionDelay"].bind(transport);

// First retry - should use initial delay
expect(getDelay(0)).toBe(100);

// Second retry - should double (2^1 * 100 = 200)
expect(getDelay(1)).toBe(200);

// Third retry - should double again (2^2 * 100 = 400)
expect(getDelay(2)).toBe(400);

// Fourth retry - should double again (2^3 * 100 = 800)
expect(getDelay(3)).toBe(800);

// Tenth retry - should be capped at maxReconnectionDelay
expect(getDelay(10)).toBe(5000);
});

});
Loading