diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java index 241f7d8b5..1f815f7fd 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java @@ -4,19 +4,12 @@ package io.modelcontextprotocol.spec; -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.TypeRef; import io.modelcontextprotocol.server.McpAsyncServerExchange; import io.modelcontextprotocol.server.McpInitRequestHandler; import io.modelcontextprotocol.server.McpNotificationHandler; import io.modelcontextprotocol.server.McpRequestHandler; -import io.modelcontextprotocol.json.TypeRef; import io.modelcontextprotocol.util.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +17,13 @@ import reactor.core.publisher.MonoSink; import reactor.core.publisher.Sinks; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + /** * Represents a Model Context Protocol (MCP) session on the server side. It manages * bidirectional JSON-RPC communication with the client. @@ -36,7 +36,9 @@ public class McpServerSession implements McpLoggableSession { private final String id; - /** Duration to wait for request responses before timing out */ + /** + * Duration to wait for request responses before timing out + */ private final Duration requestTimeout; private final AtomicLong requestCounter = new AtomicLong(0); @@ -165,6 +167,8 @@ public Mono sendRequest(String method, Object requestParams, TypeRef t this.pendingResponses.remove(requestId); sink.error(error); }); + // remove pending response when sink is disposed(e.g. timeout) + sink.onDispose(() -> this.pendingResponses.remove(requestId)); }).timeout(requestTimeout).handle((jsonRpcResponse, sink) -> { if (jsonRpcResponse.error() != null) { sink.error(new McpError(jsonRpcResponse.error())); @@ -345,13 +349,15 @@ private MethodNotFoundError getMethodNotFoundError(String method) { @Override public Mono closeGracefully() { - // TODO: clear pendingResponses and emit errors? + this.pendingResponses.values().forEach(sink -> sink.error(new RuntimeException("Session closed"))); + this.pendingResponses.clear(); return this.transport.closeGracefully(); } @Override public void close() { - // TODO: clear pendingResponses and emit errors? + this.pendingResponses.values().forEach(sink -> sink.error(new RuntimeException("Session closed"))); + this.pendingResponses.clear(); this.transport.close(); }