From 094088d4988d9a4106bab500679c2145df045422 Mon Sep 17 00:00:00 2001 From: Dennis Kawurek Date: Wed, 9 Apr 2025 21:36:28 +0200 Subject: [PATCH 1/4] feat: Add Pagination for requests Adds the Pagination feature as described in the specification for prompts, resources, resource templates and tools. To make this possible mainly two changes are made: 1. The logic for cursor handling is added. 2. Handling for invalid parameters (MCP error code `-32602 (Invalid params)`) is added to the `McpServerSession`. For now the cursor is the base64 encoded start index of the next page and the hash value of the collection at time of computing. The page size is set to 10. When parameters are found to be invalid the newly introduced `McpParamsValidationError` is returned to handle it properly in the `McpServerSession`. --- .../WebFluxSseIntegrationTests.java | 518 +++++++++++++++++- .../server/McpAsyncServer.java | 161 +++++- .../spec/McpParamsValidationError.java | 9 + .../spec/McpServerSession.java | 18 +- 4 files changed, 687 insertions(+), 19 deletions(-) create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/McpParamsValidationError.java diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java index 2ba047461..d0dd5a0fd 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java @@ -5,6 +5,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -13,6 +14,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.client.McpClient; @@ -26,10 +28,11 @@ import io.modelcontextprotocol.spec.McpError; import io.modelcontextprotocol.spec.McpSchema; import io.modelcontextprotocol.spec.McpSchema.*; -import io.modelcontextprotocol.spec.McpSchema.ServerCapabilities.CompletionCapabilities; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; @@ -40,8 +43,10 @@ import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.server.RouterFunctions; +import static io.modelcontextprotocol.spec.McpSchema.ErrorCodes.INVALID_PARAMS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertWith; import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.mock; @@ -629,6 +634,138 @@ void testToolListChangeHandlingSuccess(String clientType) { mcpServer.close(); } + // --------------------------------------- + // Tests for Paginated Tool List Results + // --------------------------------------- + + @ParameterizedTest(name = "{0} ({1}) : {displayName} ") + @MethodSource("providePaginationTestParams") + void testListToolsSuccess(String clientType, int availableElements) { + + var clientBuilder = clientBuilders.get(clientType); + + // Setup list of prompts + List tools = new ArrayList<>(); + + for (int i = 0; i < availableElements; i++) { + var mock = new McpSchema.Tool("test-tool-" + i, "Test Tool Description", emptyJsonSchema); + var spec = new McpServerFeatures.SyncToolSpecification(mock, null); + + tools.add(spec); + } + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tools) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + var returnedElements = new HashSet(); + + var hasEntries = true; + String nextCursor = null; + + while (hasEntries) { + var res = mcpClient.listTools(nextCursor); + + res.tools().forEach(e -> returnedElements.add(e.name())); // store unique + // attribute + + nextCursor = res.nextCursor(); + + if (nextCursor == null) { + hasEntries = false; + } + } + + assertThat(returnedElements.size()).isEqualTo(availableElements); + } + + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListToolsCursorInvalidListChanged(String clientType) { + + var clientBuilder = clientBuilders.get(clientType); + + // Setup list of prompts + var pageSize = 10; + List tools = new ArrayList<>(); + + for (int i = 0; i <= pageSize; i++) { + var mock = new McpSchema.Tool("test-tool-" + i, "Test Tool Description", emptyJsonSchema); + var spec = new McpServerFeatures.SyncToolSpecification(mock, null); + + tools.add(spec); + } + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tools) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + var res = mcpClient.listTools(null); + + // Change list + var mock = new McpSchema.Tool("test-tool-xyz", "Test Tool Description", emptyJsonSchema); + mcpServer.addTool(new McpServerFeatures.SyncToolSpecification(mock, null)); + + assertThatThrownBy(() -> mcpClient.listTools(res.nextCursor())).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); + + } + + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListToolsInvalidCursor(String clientType) { + + var clientBuilder = clientBuilders.get(clientType); + + var mock = new McpSchema.Tool("test-tool", "Test Tool Description", emptyJsonSchema); + var spec = new McpServerFeatures.SyncToolSpecification(mock, null); + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(spec) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + assertThatThrownBy(() -> mcpClient.listTools("INVALID")).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); + + } + + mcpServer.close(); + } + @ParameterizedTest(name = "{0} : {displayName} ") @ValueSource(strings = { "httpclient", "webflux" }) void testInitialize(String clientType) { @@ -803,4 +940,381 @@ void testCompletionShouldReturnExpectedSuggestions(String clientType) { mcpServer.close(); } -} \ No newline at end of file + // --------------------------------------- + // Tests for Paginated Prompt List Results + // --------------------------------------- + + @ParameterizedTest(name = "{0} ({1}) : {displayName} ") + @MethodSource("providePaginationTestParams") + void testListPromptsSuccess(String clientType, int availableElements) { + + var clientBuilder = clientBuilders.get(clientType); + + // Setup list of prompts + List prompts = new ArrayList<>(); + + for (int i = 0; i < availableElements; i++) { + var mock = new McpSchema.Prompt("test-prompt-" + i, "Test Prompt Description", + List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); + var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); + + prompts.add(spec); + } + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().prompts(true).build()) + .prompts(prompts) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + var returnedElements = new HashSet(); + + var hasEntries = true; + String nextCursor = null; + + while (hasEntries) { + var res = mcpClient.listPrompts(nextCursor); + + res.prompts().forEach(e -> returnedElements.add(e.name())); // store + // unique + // attribute + + nextCursor = res.nextCursor(); + + if (nextCursor == null) { + hasEntries = false; + } + } + + assertThat(returnedElements.size()).isEqualTo(availableElements); + + } + + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListPromptsCursorInvalidListChanged(String clientType) { + + var clientBuilder = clientBuilders.get(clientType); + + // Setup list of prompts + var pageSize = 10; + List prompts = new ArrayList<>(); + + for (int i = 0; i <= pageSize; i++) { + var mock = new McpSchema.Prompt("test-prompt-" + i, "Test Prompt Description", + List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); + var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); + + prompts.add(spec); + } + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().prompts(true).build()) + .prompts(prompts) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + var res = mcpClient.listPrompts(null); + + // Change list + var mock = new McpSchema.Prompt("test-prompt-xyz", "Test Prompt Description", + List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); + + mcpServer.addPrompt(new McpServerFeatures.SyncPromptSpecification(mock, null)); + + assertThatThrownBy(() -> mcpClient.listPrompts(res.nextCursor())).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); + + } + + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListPromptsInvalidCursor(String clientType) { + + var clientBuilder = clientBuilders.get(clientType); + + var mock = new McpSchema.Prompt("test-prompt", "Test Prompt Description", + List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); + + var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().prompts(true).build()) + .prompts(spec) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + assertThatThrownBy(() -> mcpClient.listPrompts("INVALID")).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); + + } + + mcpServer.close(); + } + + // --------------------------------------- + // Tests for Paginated Resources List Results + // --------------------------------------- + + @ParameterizedTest(name = "{0} ({1}) : {displayName} ") + @MethodSource("providePaginationTestParams") + void testListResourcesSuccess(String clientType, int availableElements) { + + var clientBuilder = clientBuilders.get(clientType); + + // Setup list of prompts + List resources = new ArrayList<>(); + + for (int i = 0; i < availableElements; i++) { + var mock = new McpSchema.Resource("file://example-" + i + ".txt", "test-resource", + "Test Resource Description", "application/octet-stream", null); + var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); + + resources.add(spec); + } + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resources(resources) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + var returnedElements = new HashSet(); + + var hasEntries = true; + String nextCursor = null; + + while (hasEntries) { + var res = mcpClient.listResources(nextCursor); + + res.resources().forEach(e -> returnedElements.add(e.uri())); // store + // unique + // attribute + + nextCursor = res.nextCursor(); + + if (nextCursor == null) { + hasEntries = false; + } + } + + assertThat(returnedElements.size()).isEqualTo(availableElements); + } + + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListResourcesCursorInvalidListChanged(String clientType) { + + var clientBuilder = clientBuilders.get(clientType); + + // Setup list of prompts + var pageSize = 10; + List resources = new ArrayList<>(); + + for (int i = 0; i <= pageSize; i++) { + var mock = new McpSchema.Resource("file://example-" + i + ".txt", "test-resource", + "Test Resource Description", "application/octet-stream", null); + var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); + + resources.add(spec); + } + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resources(resources) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + var res = mcpClient.listResources(null); + + // Change list + var mock = new McpSchema.Resource("file://example-xyz.txt", "test-resource", "Test Resource Description", + "application/octet-stream", null); + mcpServer.addResource(new McpServerFeatures.SyncResourceSpecification(mock, null)); + + assertThatThrownBy(() -> mcpClient.listResources(res.nextCursor())).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); + + } + + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListResourcesInvalidCursor(String clientType) { + + var clientBuilder = clientBuilders.get(clientType); + + var mock = new McpSchema.Resource("file://example.txt", "test-resource", "Test Resource Description", + "application/octet-stream", null); + var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resources(spec) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + assertThatThrownBy(() -> mcpClient.listResources("INVALID")).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); + + } + + mcpServer.close(); + } + + // --------------------------------------- + // Tests for Paginated Resource Templates Results + // --------------------------------------- + + @ParameterizedTest(name = "{0} ({1}) : {displayName} ") + @MethodSource("providePaginationTestParams") + void testListResourceTemplatesSuccess(String clientType, int availableElements) { + + var clientBuilder = clientBuilders.get(clientType); + + // Setup list of prompts + List resourceTemplates = new ArrayList<>(); + + for (int i = 0; i < availableElements; i++) { + resourceTemplates.add(new McpSchema.ResourceTemplate("file://{path}-" + i + ".txt", "test-resource", + "Test Resource Description", "application/octet-stream", null)); + } + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resourceTemplates(resourceTemplates) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + var returnedElements = new HashSet(); + + var hasEntries = true; + String nextCursor = null; + + while (hasEntries) { + var res = mcpClient.listResourceTemplates(nextCursor); + + res.resourceTemplates().forEach(e -> returnedElements.add(e.uriTemplate())); // store + // unique + // attribute + + nextCursor = res.nextCursor(); + + if (nextCursor == null) { + hasEntries = false; + } + } + + assertThat(returnedElements.size()).isEqualTo(availableElements); + } + + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListResourceTemplatesInvalidCursor(String clientType) { + + var clientBuilder = clientBuilders.get(clientType); + + var mock = new McpSchema.ResourceTemplate("file://{path}.txt", "test-resource", "Test Resource Description", + "application/octet-stream", null); + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resourceTemplates(mock) + .build(); + + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + assertThatThrownBy(() -> mcpClient.listResourceTemplates("INVALID")).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); + + } + + mcpServer.close(); + } + + // --------------------------------------- + // Helpers for Tests of Paginated Lists + // --------------------------------------- + + /** + * Helper function for pagination tests. This provides a stream of the following + * parameters: 1. Client type (e.g. httpclient, webflux) 2. Number of available + * elements in the list + * @return a stream of arguments with test parameters + */ + static Stream providePaginationTestParams() { + return Stream.of(Arguments.of("httpclient", 0), Arguments.of("httpclient", 1), Arguments.of("httpclient", 21), + Arguments.of("webflux", 0), Arguments.of("webflux", 1), Arguments.of("webflux", 21)); + } + +} diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 1efa13de3..ae253be56 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -6,6 +6,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -19,13 +20,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.spec.McpClientSession; import io.modelcontextprotocol.spec.McpError; +import io.modelcontextprotocol.spec.McpParamsValidationError; import io.modelcontextprotocol.spec.McpSchema; import io.modelcontextprotocol.spec.McpSchema.CallToolResult; import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; import io.modelcontextprotocol.spec.McpSchema.ResourceTemplate; import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest; -import io.modelcontextprotocol.spec.McpSchema.Tool; import io.modelcontextprotocol.spec.McpServerSession; import io.modelcontextprotocol.spec.McpServerTransportProvider; import io.modelcontextprotocol.util.DeafaultMcpUriTemplateManagerFactory; @@ -100,6 +101,8 @@ public class McpAsyncServer { private final ConcurrentHashMap prompts = new ConcurrentHashMap<>(); + private static final int PAGE_SIZE = 10; + // FIXME: this field is deprecated and should be remvoed together with the // broadcasting loggingNotification. private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG; @@ -340,9 +343,25 @@ public Mono notifyToolsListChanged() { private McpServerSession.RequestHandler toolsListRequestHandler() { return (exchange, params) -> { - List tools = this.tools.stream().map(McpServerFeatures.AsyncToolSpecification::tool).toList(); + McpSchema.PaginatedRequest request = objectMapper.convertValue(params, + new TypeReference() { + }); + + int mapSize = this.tools.size(); + int mapHash = this.tools.hashCode(); + + int requestedStartIndex = handleCursor(request.cursor(), mapSize, mapHash).block(); + int endIndex = Math.min(requestedStartIndex + PAGE_SIZE, mapSize); - return Mono.just(new McpSchema.ListToolsResult(tools, null)); + var nextCursor = getCursor(endIndex, mapSize, mapHash); + + var resultList = this.tools.stream() + .skip(requestedStartIndex) + .limit(endIndex - requestedStartIndex) + .map(McpServerFeatures.AsyncToolSpecification::tool) + .toList(); + + return Mono.just(new McpSchema.ListToolsResult(resultList, nextCursor)); }; } @@ -432,18 +451,49 @@ public Mono notifyResourcesListChanged() { private McpServerSession.RequestHandler resourcesListRequestHandler() { return (exchange, params) -> { - var resourceList = this.resources.values() + McpSchema.PaginatedRequest request = objectMapper.convertValue(params, + new TypeReference() { + }); + + int mapSize = this.resources.size(); + int mapHash = this.resources.hashCode(); + + int requestedStartIndex = handleCursor(request.cursor(), mapSize, mapHash).block(); + int endIndex = Math.min(requestedStartIndex + PAGE_SIZE, mapSize); + + var nextCursor = getCursor(endIndex, mapSize, mapHash); + + var resultList = this.resources.values() .stream() + .skip(requestedStartIndex) + .limit(endIndex - requestedStartIndex) .map(McpServerFeatures.AsyncResourceSpecification::resource) .toList(); - return Mono.just(new McpSchema.ListResourcesResult(resourceList, null)); + + return Mono.just(new McpSchema.ListResourcesResult(resultList, nextCursor)); }; } private McpServerSession.RequestHandler resourceTemplateListRequestHandler() { - return (exchange, params) -> Mono - .just(new McpSchema.ListResourceTemplatesResult(this.getResourceTemplates(), null)); + return (exchange, params) -> { + McpSchema.PaginatedRequest request = objectMapper.convertValue(params, + new TypeReference() { + }); + var all = this.getResourceTemplates(); + + int mapSize = all.size(); + int mapHash = all.hashCode(); + + int requestedStartIndex = handleCursor(request.cursor(), mapSize, mapHash).block(); + int endIndex = Math.min(requestedStartIndex + PAGE_SIZE, mapSize); + + var nextCursor = getCursor(endIndex, mapSize, mapHash); + + var resultList = all.stream().skip(requestedStartIndex).limit(endIndex - requestedStartIndex).toList(); + + return Mono.just(new McpSchema.ListResourceTemplatesResult(resultList, nextCursor)); + }; } private List getResourceTemplates() { @@ -559,17 +609,27 @@ public Mono notifyPromptsListChanged() { private McpServerSession.RequestHandler promptsListRequestHandler() { return (exchange, params) -> { - // TODO: Implement pagination - // McpSchema.PaginatedRequest request = objectMapper.convertValue(params, - // new TypeReference() { - // }); - var promptList = this.prompts.values() + McpSchema.PaginatedRequest request = objectMapper.convertValue(params, + new TypeReference() { + }); + + int mapSize = this.prompts.size(); + int mapHash = this.prompts.hashCode(); + + int requestedStartIndex = handleCursor(request.cursor(), mapSize, mapHash).block(); + int endIndex = Math.min(requestedStartIndex + PAGE_SIZE, mapSize); + + var nextCursor = getCursor(endIndex, mapSize, mapHash); + + var resultList = this.prompts.values() .stream() + .skip(requestedStartIndex) + .limit(endIndex - requestedStartIndex) .map(McpServerFeatures.AsyncPromptSpecification::prompt) .toList(); - return Mono.just(new McpSchema.ListPromptsResult(promptList, null)); + return Mono.just(new McpSchema.ListPromptsResult(resultList, nextCursor)); }; } @@ -738,4 +798,79 @@ void setProtocolVersions(List protocolVersions) { this.protocolVersions = protocolVersions; } + // --------------------------------------- + // Cursor Handling for paginated requests + // --------------------------------------- + + /** + * Handles the cursor by decoding, validating and reading the index of it. + * @param cursor the base64 representation of the cursor. + * @param mapSize the size of the map from which the values should be read. + * @param mapHash the hash of the map to compare the cursor value to. + * @return a {@link Mono} which contains the index to which the cursor points. + */ + private Mono handleCursor(String cursor, int mapSize, int mapHash) { + if (cursor == null) { + return Mono.just(0); + } + + var decodedCursor = decodeCursor(cursor); + + if (!isCursorValid(decodedCursor, mapSize, mapHash)) { + return Mono.error(new McpParamsValidationError("Invalid cursor")); + } + + return Mono.just(getCursorIndex(decodedCursor)); + } + + private String getCursor(int endIndex, int mapSize, int mapHash) { + if (endIndex >= mapSize) { + return null; + } + return encodeCursor(endIndex, mapHash); + } + + private int getCursorIndex(String cursor) { + return Integer.parseInt(cursor.split(":")[0]); + } + + private boolean isCursorValid(String cursor, int maxPageSize, int currentHash) { + var cursorElements = cursor.split(":"); + + if (cursorElements.length != 2) { + logger.debug("Length of elements in cursor doesn't match expected number. Cursor: {} Actual number: {}", + cursor, cursorElements.length); + return false; + } + + int index; + int hash; + + try { + index = Integer.parseInt(cursorElements[0]); + hash = Integer.parseInt(cursorElements[1]); + } + catch (NumberFormatException e) { + logger.debug("Failed to parse cursor elements."); + return false; + } + + if (index < 0 || index > maxPageSize || hash != currentHash) { + logger.debug("Cursor boundaries are invalid."); + return false; + } + + return true; + } + + private String encodeCursor(int index, int hash) { + var cursor = index + ":" + hash; + + return Base64.getEncoder().encodeToString(cursor.getBytes()); + } + + private String decodeCursor(String base64Cursor) { + return new String(Base64.getDecoder().decode(base64Cursor)); + } + } diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpParamsValidationError.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpParamsValidationError.java new file mode 100644 index 000000000..e7ecb0058 --- /dev/null +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpParamsValidationError.java @@ -0,0 +1,9 @@ +package io.modelcontextprotocol.spec; + +public class McpParamsValidationError extends McpError { + + public McpParamsValidationError(String error) { + super(error); + } + +} diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java index 86906d859..1f6feaa17 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java @@ -226,10 +226,20 @@ private Mono handleIncomingRequest(McpSchema.JSONRPCR } return resultMono .map(result -> new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), result, null)) - .onErrorResume(error -> Mono.just(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), - null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR, - error.getMessage(), null)))); // TODO: add error message - // through the data field + .onErrorResume(error -> { + + var errorCode = McpSchema.ErrorCodes.INTERNAL_ERROR; + + if (error instanceof McpParamsValidationError) { + errorCode = McpSchema.ErrorCodes.INVALID_PARAMS; + } + + // TODO: add error message through the data field + var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null, + new McpSchema.JSONRPCResponse.JSONRPCError(errorCode, error.getMessage(), null)); + + return Mono.just(errorResponse); + }); }); } From bfc5e81a5a012a80ec3a5b6ab03d5be9fe2e54a6 Mon Sep 17 00:00:00 2001 From: Dennis Kawurek Date: Tue, 29 Jul 2025 22:05:46 +0200 Subject: [PATCH 2/4] chore: Apply formatter --- .../WebFluxSseIntegrationTests.java | 538 +++++++++--------- 1 file changed, 269 insertions(+), 269 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java index cd2b7b6fa..93d0764d7 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java @@ -1523,381 +1523,381 @@ private double evaluateExpression(String expression) { }; } - // --------------------------------------- - // Tests for Paginated Prompt List Results - // --------------------------------------- + // --------------------------------------- + // Tests for Paginated Prompt List Results + // --------------------------------------- - @ParameterizedTest(name = "{0} ({1}) : {displayName} ") - @MethodSource("providePaginationTestParams") - void testListPromptsSuccess(String clientType, int availableElements) { + @ParameterizedTest(name = "{0} ({1}) : {displayName} ") + @MethodSource("providePaginationTestParams") + void testListPromptsSuccess(String clientType, int availableElements) { - var clientBuilder = clientBuilders.get(clientType); + var clientBuilder = clientBuilders.get(clientType); - // Setup list of prompts - List prompts = new ArrayList<>(); + // Setup list of prompts + List prompts = new ArrayList<>(); - for (int i = 0; i < availableElements; i++) { - var mock = new McpSchema.Prompt("test-prompt-" + i, "Test Prompt Description", - List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); - var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); + for (int i = 0; i < availableElements; i++) { + var mock = new McpSchema.Prompt("test-prompt-" + i, "Test Prompt Description", + List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); + var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); - prompts.add(spec); - } + prompts.add(spec); + } - var mcpServer = McpServer.sync(mcpServerTransportProvider) - .capabilities(ServerCapabilities.builder().prompts(true).build()) - .prompts(prompts) - .build(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().prompts(true).build()) + .prompts(prompts) + .build(); - try (var mcpClient = clientBuilder.build()) { + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - var returnedElements = new HashSet(); + var returnedElements = new HashSet(); - var hasEntries = true; - String nextCursor = null; + var hasEntries = true; + String nextCursor = null; - while (hasEntries) { - var res = mcpClient.listPrompts(nextCursor); + while (hasEntries) { + var res = mcpClient.listPrompts(nextCursor); - res.prompts().forEach(e -> returnedElements.add(e.name())); // store - // unique - // attribute + res.prompts().forEach(e -> returnedElements.add(e.name())); // store + // unique + // attribute - nextCursor = res.nextCursor(); + nextCursor = res.nextCursor(); - if (nextCursor == null) { - hasEntries = false; - } - } + if (nextCursor == null) { + hasEntries = false; + } + } - assertThat(returnedElements.size()).isEqualTo(availableElements); + assertThat(returnedElements.size()).isEqualTo(availableElements); - } + } - mcpServer.close(); - } + mcpServer.close(); + } - @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = { "httpclient", "webflux" }) - void testListPromptsCursorInvalidListChanged(String clientType) { + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListPromptsCursorInvalidListChanged(String clientType) { - var clientBuilder = clientBuilders.get(clientType); + var clientBuilder = clientBuilders.get(clientType); - // Setup list of prompts - var pageSize = 10; - List prompts = new ArrayList<>(); + // Setup list of prompts + var pageSize = 10; + List prompts = new ArrayList<>(); - for (int i = 0; i <= pageSize; i++) { - var mock = new McpSchema.Prompt("test-prompt-" + i, "Test Prompt Description", - List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); - var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); + for (int i = 0; i <= pageSize; i++) { + var mock = new McpSchema.Prompt("test-prompt-" + i, "Test Prompt Description", + List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); + var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); - prompts.add(spec); - } + prompts.add(spec); + } - var mcpServer = McpServer.sync(mcpServerTransportProvider) - .capabilities(ServerCapabilities.builder().prompts(true).build()) - .prompts(prompts) - .build(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().prompts(true).build()) + .prompts(prompts) + .build(); - try (var mcpClient = clientBuilder.build()) { + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - var res = mcpClient.listPrompts(null); + var res = mcpClient.listPrompts(null); - // Change list - var mock = new McpSchema.Prompt("test-prompt-xyz", "Test Prompt Description", - List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); + // Change list + var mock = new McpSchema.Prompt("test-prompt-xyz", "Test Prompt Description", + List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); - mcpServer.addPrompt(new McpServerFeatures.SyncPromptSpecification(mock, null)); + mcpServer.addPrompt(new McpServerFeatures.SyncPromptSpecification(mock, null)); - assertThatThrownBy(() -> mcpClient.listPrompts(res.nextCursor())).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + assertThatThrownBy(() -> mcpClient.listPrompts(res.nextCursor())).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); - } + } - mcpServer.close(); - } + mcpServer.close(); + } - @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = { "httpclient", "webflux" }) - void testListPromptsInvalidCursor(String clientType) { + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListPromptsInvalidCursor(String clientType) { - var clientBuilder = clientBuilders.get(clientType); + var clientBuilder = clientBuilders.get(clientType); - var mock = new McpSchema.Prompt("test-prompt", "Test Prompt Description", - List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); + var mock = new McpSchema.Prompt("test-prompt", "Test Prompt Description", + List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); - var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); + var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); - var mcpServer = McpServer.sync(mcpServerTransportProvider) - .capabilities(ServerCapabilities.builder().prompts(true).build()) - .prompts(spec) - .build(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().prompts(true).build()) + .prompts(spec) + .build(); - try (var mcpClient = clientBuilder.build()) { + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThatThrownBy(() -> mcpClient.listPrompts("INVALID")).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + assertThatThrownBy(() -> mcpClient.listPrompts("INVALID")).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); - } + } - mcpServer.close(); - } + mcpServer.close(); + } - // --------------------------------------- - // Tests for Paginated Resources List Results - // --------------------------------------- + // --------------------------------------- + // Tests for Paginated Resources List Results + // --------------------------------------- - @ParameterizedTest(name = "{0} ({1}) : {displayName} ") - @MethodSource("providePaginationTestParams") - void testListResourcesSuccess(String clientType, int availableElements) { + @ParameterizedTest(name = "{0} ({1}) : {displayName} ") + @MethodSource("providePaginationTestParams") + void testListResourcesSuccess(String clientType, int availableElements) { - var clientBuilder = clientBuilders.get(clientType); + var clientBuilder = clientBuilders.get(clientType); - // Setup list of prompts - List resources = new ArrayList<>(); + // Setup list of prompts + List resources = new ArrayList<>(); - for (int i = 0; i < availableElements; i++) { - var mock = new McpSchema.Resource("file://example-" + i + ".txt", "test-resource", - "Test Resource Description", "application/octet-stream", null); - var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); + for (int i = 0; i < availableElements; i++) { + var mock = new McpSchema.Resource("file://example-" + i + ".txt", "test-resource", + "Test Resource Description", "application/octet-stream", null); + var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); - resources.add(spec); - } + resources.add(spec); + } - var mcpServer = McpServer.sync(mcpServerTransportProvider) - .capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resources(resources) - .build(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resources(resources) + .build(); - try (var mcpClient = clientBuilder.build()) { + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - var returnedElements = new HashSet(); + var returnedElements = new HashSet(); - var hasEntries = true; - String nextCursor = null; + var hasEntries = true; + String nextCursor = null; - while (hasEntries) { - var res = mcpClient.listResources(nextCursor); + while (hasEntries) { + var res = mcpClient.listResources(nextCursor); - res.resources().forEach(e -> returnedElements.add(e.uri())); // store - // unique - // attribute + res.resources().forEach(e -> returnedElements.add(e.uri())); // store + // unique + // attribute - nextCursor = res.nextCursor(); + nextCursor = res.nextCursor(); - if (nextCursor == null) { - hasEntries = false; - } - } + if (nextCursor == null) { + hasEntries = false; + } + } - assertThat(returnedElements.size()).isEqualTo(availableElements); - } + assertThat(returnedElements.size()).isEqualTo(availableElements); + } - mcpServer.close(); - } + mcpServer.close(); + } - @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = { "httpclient", "webflux" }) - void testListResourcesCursorInvalidListChanged(String clientType) { + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListResourcesCursorInvalidListChanged(String clientType) { - var clientBuilder = clientBuilders.get(clientType); + var clientBuilder = clientBuilders.get(clientType); - // Setup list of prompts - var pageSize = 10; - List resources = new ArrayList<>(); + // Setup list of prompts + var pageSize = 10; + List resources = new ArrayList<>(); - for (int i = 0; i <= pageSize; i++) { - var mock = new McpSchema.Resource("file://example-" + i + ".txt", "test-resource", - "Test Resource Description", "application/octet-stream", null); - var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); + for (int i = 0; i <= pageSize; i++) { + var mock = new McpSchema.Resource("file://example-" + i + ".txt", "test-resource", + "Test Resource Description", "application/octet-stream", null); + var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); - resources.add(spec); - } + resources.add(spec); + } - var mcpServer = McpServer.sync(mcpServerTransportProvider) - .capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resources(resources) - .build(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resources(resources) + .build(); - try (var mcpClient = clientBuilder.build()) { + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - var res = mcpClient.listResources(null); + var res = mcpClient.listResources(null); - // Change list - var mock = new McpSchema.Resource("file://example-xyz.txt", "test-resource", "Test Resource Description", - "application/octet-stream", null); - mcpServer.addResource(new McpServerFeatures.SyncResourceSpecification(mock, null)); + // Change list + var mock = new McpSchema.Resource("file://example-xyz.txt", "test-resource", "Test Resource Description", + "application/octet-stream", null); + mcpServer.addResource(new McpServerFeatures.SyncResourceSpecification(mock, null)); - assertThatThrownBy(() -> mcpClient.listResources(res.nextCursor())).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + assertThatThrownBy(() -> mcpClient.listResources(res.nextCursor())).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); - } + } - mcpServer.close(); - } + mcpServer.close(); + } - @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = { "httpclient", "webflux" }) - void testListResourcesInvalidCursor(String clientType) { + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListResourcesInvalidCursor(String clientType) { - var clientBuilder = clientBuilders.get(clientType); + var clientBuilder = clientBuilders.get(clientType); - var mock = new McpSchema.Resource("file://example.txt", "test-resource", "Test Resource Description", - "application/octet-stream", null); - var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); + var mock = new McpSchema.Resource("file://example.txt", "test-resource", "Test Resource Description", + "application/octet-stream", null); + var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); - var mcpServer = McpServer.sync(mcpServerTransportProvider) - .capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resources(spec) - .build(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resources(spec) + .build(); - try (var mcpClient = clientBuilder.build()) { + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThatThrownBy(() -> mcpClient.listResources("INVALID")).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + assertThatThrownBy(() -> mcpClient.listResources("INVALID")).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); - } + } - mcpServer.close(); - } + mcpServer.close(); + } - // --------------------------------------- - // Tests for Paginated Resource Templates Results - // --------------------------------------- + // --------------------------------------- + // Tests for Paginated Resource Templates Results + // --------------------------------------- - @ParameterizedTest(name = "{0} ({1}) : {displayName} ") - @MethodSource("providePaginationTestParams") - void testListResourceTemplatesSuccess(String clientType, int availableElements) { + @ParameterizedTest(name = "{0} ({1}) : {displayName} ") + @MethodSource("providePaginationTestParams") + void testListResourceTemplatesSuccess(String clientType, int availableElements) { - var clientBuilder = clientBuilders.get(clientType); + var clientBuilder = clientBuilders.get(clientType); - // Setup list of prompts - List resourceTemplates = new ArrayList<>(); + // Setup list of prompts + List resourceTemplates = new ArrayList<>(); - for (int i = 0; i < availableElements; i++) { - resourceTemplates.add(new McpSchema.ResourceTemplate("file://{path}-" + i + ".txt", "test-resource", - "Test Resource Description", "application/octet-stream", null)); - } + for (int i = 0; i < availableElements; i++) { + resourceTemplates.add(new McpSchema.ResourceTemplate("file://{path}-" + i + ".txt", "test-resource", + "Test Resource Description", "application/octet-stream", null)); + } - var mcpServer = McpServer.sync(mcpServerTransportProvider) - .capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resourceTemplates(resourceTemplates) - .build(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resourceTemplates(resourceTemplates) + .build(); - try (var mcpClient = clientBuilder.build()) { + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - var returnedElements = new HashSet(); + var returnedElements = new HashSet(); - var hasEntries = true; - String nextCursor = null; + var hasEntries = true; + String nextCursor = null; - while (hasEntries) { - var res = mcpClient.listResourceTemplates(nextCursor); + while (hasEntries) { + var res = mcpClient.listResourceTemplates(nextCursor); - res.resourceTemplates().forEach(e -> returnedElements.add(e.uriTemplate())); // store - // unique - // attribute + res.resourceTemplates().forEach(e -> returnedElements.add(e.uriTemplate())); // store + // unique + // attribute - nextCursor = res.nextCursor(); + nextCursor = res.nextCursor(); - if (nextCursor == null) { - hasEntries = false; - } - } + if (nextCursor == null) { + hasEntries = false; + } + } - assertThat(returnedElements.size()).isEqualTo(availableElements); - } + assertThat(returnedElements.size()).isEqualTo(availableElements); + } - mcpServer.close(); - } + mcpServer.close(); + } - @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = { "httpclient", "webflux" }) - void testListResourceTemplatesInvalidCursor(String clientType) { + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testListResourceTemplatesInvalidCursor(String clientType) { - var clientBuilder = clientBuilders.get(clientType); + var clientBuilder = clientBuilders.get(clientType); - var mock = new McpSchema.ResourceTemplate("file://{path}.txt", "test-resource", "Test Resource Description", - "application/octet-stream", null); + var mock = new McpSchema.ResourceTemplate("file://{path}.txt", "test-resource", "Test Resource Description", + "application/octet-stream", null); - var mcpServer = McpServer.sync(mcpServerTransportProvider) - .capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resourceTemplates(mock) - .build(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resourceTemplates(mock) + .build(); - try (var mcpClient = clientBuilder.build()) { + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThatThrownBy(() -> mcpClient.listResourceTemplates("INVALID")).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + assertThatThrownBy(() -> mcpClient.listResourceTemplates("INVALID")).isInstanceOf(McpError.class) + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); - } + } - mcpServer.close(); - } + mcpServer.close(); + } - // --------------------------------------- - // Helpers for Tests of Paginated Lists - // --------------------------------------- + // --------------------------------------- + // Helpers for Tests of Paginated Lists + // --------------------------------------- - /** - * Helper function for pagination tests. This provides a stream of the following - * parameters: 1. Client type (e.g. httpclient, webflux) 2. Number of available - * elements in the list - * @return a stream of arguments with test parameters - */ - static Stream providePaginationTestParams() { - return Stream.of(Arguments.of("httpclient", 0), Arguments.of("httpclient", 1), Arguments.of("httpclient", 21), - Arguments.of("webflux", 0), Arguments.of("webflux", 1), Arguments.of("webflux", 21)); - } + /** + * Helper function for pagination tests. This provides a stream of the following + * parameters: 1. Client type (e.g. httpclient, webflux) 2. Number of available + * elements in the list + * @return a stream of arguments with test parameters + */ + static Stream providePaginationTestParams() { + return Stream.of(Arguments.of("httpclient", 0), Arguments.of("httpclient", 1), Arguments.of("httpclient", 21), + Arguments.of("webflux", 0), Arguments.of("webflux", 1), Arguments.of("webflux", 21)); + } } From 1998f1793595130f13406ea50616891ffc561124 Mon Sep 17 00:00:00 2001 From: Dennis Kawurek Date: Tue, 19 Aug 2025 19:56:18 +0200 Subject: [PATCH 3/4] feat: Handle invalid cursor in streamable transport provider --- ...FluxStreamableServerTransportProvider.java | 21 ++++++++++++------- ...bMvcStreamableServerTransportProvider.java | 15 ++++++------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java index f3f6c2c33..785149b28 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java @@ -8,13 +8,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.server.DefaultMcpTransportContext; import io.modelcontextprotocol.server.McpTransportContextExtractor; -import io.modelcontextprotocol.spec.HttpHeaders; -import io.modelcontextprotocol.spec.McpError; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpStreamableServerSession; -import io.modelcontextprotocol.spec.McpStreamableServerTransport; -import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider; -import io.modelcontextprotocol.spec.ProtocolVersions; +import io.modelcontextprotocol.spec.*; import io.modelcontextprotocol.server.McpTransportContext; import io.modelcontextprotocol.util.Assert; import io.modelcontextprotocol.util.KeepAliveScheduler; @@ -278,6 +272,19 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) { WebFluxStreamableMcpSessionTransport st = new WebFluxStreamableMcpSessionTransport(sink); Mono stream = session.responseStream(jsonrpcRequest, st); Disposable streamSubscription = stream.onErrorComplete(err -> { + if (err instanceof McpParamsValidationError) { + var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), + null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INVALID_PARAMS, + err.getMessage(), null)); + + var event = ServerSentEvent.builder() + .event(MESSAGE_EVENT_TYPE) + .data(errorResponse) + .build(); + + sink.next(event); + return true; + } sink.error(err); return true; }).contextWrite(sink.contextView()).subscribe(); diff --git a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java index fa51a0130..7c86436d9 100644 --- a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java +++ b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java @@ -10,6 +10,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; +import io.modelcontextprotocol.spec.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; @@ -26,13 +27,6 @@ import io.modelcontextprotocol.server.DefaultMcpTransportContext; import io.modelcontextprotocol.server.McpTransportContext; import io.modelcontextprotocol.server.McpTransportContextExtractor; -import io.modelcontextprotocol.spec.HttpHeaders; -import io.modelcontextprotocol.spec.McpError; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpStreamableServerSession; -import io.modelcontextprotocol.spec.McpStreamableServerTransport; -import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider; -import io.modelcontextprotocol.spec.ProtocolVersions; import io.modelcontextprotocol.util.Assert; import io.modelcontextprotocol.util.KeepAliveScheduler; import reactor.core.publisher.Flux; @@ -395,8 +389,11 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) { session.responseStream(jsonrpcRequest, sessionTransport) .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) .block(); - } - catch (Exception e) { + } catch (McpParamsValidationError e) { + var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), null, + new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INVALID_PARAMS, e.getMessage(), null)); + sessionTransport.sendMessage(errorResponse).block(); + } catch (Exception e) { logger.error("Failed to handle request stream: {}", e.getMessage()); sseBuilder.error(e); } From d366e25adb60034b8919eeb594f0d7738ea04c8b Mon Sep 17 00:00:00 2001 From: Dennis Kawurek Date: Tue, 19 Aug 2025 19:56:43 +0200 Subject: [PATCH 4/4] chore: Apply formatter --- ...FluxStreamableServerTransportProvider.java | 12 +- ...bMvcStreamableServerTransportProvider.java | 11 +- ...stractMcpClientServerIntegrationTests.java | 921 +++++++++--------- .../server/McpAsyncServer.java | 120 +-- 4 files changed, 535 insertions(+), 529 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java index 785149b28..64c20e238 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java @@ -273,14 +273,14 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) { Mono stream = session.responseStream(jsonrpcRequest, st); Disposable streamSubscription = stream.onErrorComplete(err -> { if (err instanceof McpParamsValidationError) { - var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), - null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INVALID_PARAMS, - err.getMessage(), null)); + var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, + jsonrpcRequest.id(), null, new McpSchema.JSONRPCResponse.JSONRPCError( + McpSchema.ErrorCodes.INVALID_PARAMS, err.getMessage(), null)); var event = ServerSentEvent.builder() - .event(MESSAGE_EVENT_TYPE) - .data(errorResponse) - .build(); + .event(MESSAGE_EVENT_TYPE) + .data(errorResponse) + .build(); sink.next(event); return true; diff --git a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java index 7c86436d9..e28aaef4c 100644 --- a/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java +++ b/mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java @@ -389,11 +389,14 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) { session.responseStream(jsonrpcRequest, sessionTransport) .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) .block(); - } catch (McpParamsValidationError e) { - var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), null, - new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INVALID_PARAMS, e.getMessage(), null)); + } + catch (McpParamsValidationError e) { + var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, + jsonrpcRequest.id(), null, new McpSchema.JSONRPCResponse.JSONRPCError( + McpSchema.ErrorCodes.INVALID_PARAMS, e.getMessage(), null)); sessionTransport.sendMessage(errorResponse).block(); - } catch (Exception e) { + } + catch (Exception e) { logger.error("Failed to handle request stream: {}", e.getMessage()); sseBuilder.error(e); } diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java index d7b0dd594..32f209b20 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java @@ -81,14 +81,14 @@ void simple(String clientType) { var clientBuilder = clientBuilders.get(clientType); var server = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0") - .requestTimeout(Duration.ofSeconds(1000)) - .build(); + .requestTimeout(Duration.ofSeconds(1000)) + .build(); try ( // Create client without sampling capabilities var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")) - .requestTimeout(Duration.ofSeconds(1000)) - .build()) { + .requestTimeout(Duration.ofSeconds(1000)) + .build()) { assertThat(client.initialize()).isNotNull(); @@ -106,19 +106,19 @@ void testCreateMessageWithoutSamplingCapabilities(String clientType) { var clientBuilder = clientBuilders.get(clientType); McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { - exchange.createMessage(mock(McpSchema.CreateMessageRequest.class)).block(); - return Mono.just(mock(CallToolResult.class)); - }) - .build(); + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { + exchange.createMessage(mock(McpSchema.CreateMessageRequest.class)).block(); + return Mono.just(mock(CallToolResult.class)); + }) + .build(); var server = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0").tools(tool).build(); try ( // Create client without sampling capabilities var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")) - .build()) { + .build()) { assertThat(client.initialize()).isNotNull(); @@ -127,7 +127,7 @@ void testCreateMessageWithoutSamplingCapabilities(String clientType) { } catch (McpError e) { assertThat(e).isInstanceOf(McpError.class) - .hasMessage("Client must be configured with sampling capabilities"); + .hasMessage("Client must be configured with sampling capabilities"); } } server.closeGracefully(); @@ -153,32 +153,32 @@ void testCreateMessageSuccess(String clientType) { AtomicReference samplingResult = new AtomicReference<>(); McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { + + var createMessageRequest = McpSchema.CreateMessageRequest.builder() + .messages(List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, + new McpSchema.TextContent("Test message")))) + .modelPreferences(ModelPreferences.builder() + .hints(List.of()) + .costPriority(1.0) + .speedPriority(1.0) + .intelligencePriority(1.0) + .build()) + .build(); - var createMessageRequest = McpSchema.CreateMessageRequest.builder() - .messages(List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, - new McpSchema.TextContent("Test message")))) - .modelPreferences(ModelPreferences.builder() - .hints(List.of()) - .costPriority(1.0) - .speedPriority(1.0) - .intelligencePriority(1.0) - .build()) - .build(); - - return exchange.createMessage(createMessageRequest) - .doOnNext(samplingResult::set) - .thenReturn(callResponse); - }) - .build(); + return exchange.createMessage(createMessageRequest) + .doOnNext(samplingResult::set) + .thenReturn(callResponse); + }) + .build(); var mcpServer = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0").tools(tool).build(); try (var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) - .capabilities(ClientCapabilities.builder().sampling().build()) - .sampling(samplingHandler) - .build()) { + .capabilities(ClientCapabilities.builder().sampling().build()) + .sampling(samplingHandler) + .build()) { InitializeResult initResult = mcpClient.initialize(); assertThat(initResult).isNotNull(); @@ -222,9 +222,9 @@ void testCreateMessageWithRequestTimeoutSuccess(String clientType) throws Interr }; var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) - .capabilities(ClientCapabilities.builder().sampling().build()) - .sampling(samplingHandler) - .build(); + .capabilities(ClientCapabilities.builder().sampling().build()) + .sampling(samplingHandler) + .build(); // Server @@ -234,30 +234,30 @@ void testCreateMessageWithRequestTimeoutSuccess(String clientType) throws Interr AtomicReference samplingResult = new AtomicReference<>(); McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { - - var createMessageRequest = McpSchema.CreateMessageRequest.builder() - .messages(List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, - new McpSchema.TextContent("Test message")))) - .modelPreferences(ModelPreferences.builder() - .hints(List.of()) - .costPriority(1.0) - .speedPriority(1.0) - .intelligencePriority(1.0) - .build()) - .build(); + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { + + var createMessageRequest = McpSchema.CreateMessageRequest.builder() + .messages(List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, + new McpSchema.TextContent("Test message")))) + .modelPreferences(ModelPreferences.builder() + .hints(List.of()) + .costPriority(1.0) + .speedPriority(1.0) + .intelligencePriority(1.0) + .build()) + .build(); - return exchange.createMessage(createMessageRequest) - .doOnNext(samplingResult::set) - .thenReturn(callResponse); - }) - .build(); + return exchange.createMessage(createMessageRequest) + .doOnNext(samplingResult::set) + .thenReturn(callResponse); + }) + .build(); var mcpServer = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0") - .requestTimeout(Duration.ofSeconds(4)) - .tools(tool) - .build(); + .requestTimeout(Duration.ofSeconds(4)) + .tools(tool) + .build(); InitializeResult initResult = mcpClient.initialize(); assertThat(initResult).isNotNull(); @@ -300,36 +300,36 @@ void testCreateMessageWithRequestTimeoutFail(String clientType) throws Interrupt }; var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) - .capabilities(ClientCapabilities.builder().sampling().build()) - .sampling(samplingHandler) - .build(); + .capabilities(ClientCapabilities.builder().sampling().build()) + .sampling(samplingHandler) + .build(); CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { - - var createMessageRequest = McpSchema.CreateMessageRequest.builder() - .messages(List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, - new McpSchema.TextContent("Test message")))) - .modelPreferences(ModelPreferences.builder() - .hints(List.of()) - .costPriority(1.0) - .speedPriority(1.0) - .intelligencePriority(1.0) - .build()) - .build(); + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { + + var createMessageRequest = McpSchema.CreateMessageRequest.builder() + .messages(List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, + new McpSchema.TextContent("Test message")))) + .modelPreferences(ModelPreferences.builder() + .hints(List.of()) + .costPriority(1.0) + .speedPriority(1.0) + .intelligencePriority(1.0) + .build()) + .build(); - return exchange.createMessage(createMessageRequest).thenReturn(callResponse); - }) - .build(); + return exchange.createMessage(createMessageRequest).thenReturn(callResponse); + }) + .build(); var mcpServer = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0") - .requestTimeout(Duration.ofSeconds(1)) - .tools(tool) - .build(); + .requestTimeout(Duration.ofSeconds(1)) + .tools(tool) + .build(); InitializeResult initResult = mcpClient.initialize(); assertThat(initResult).isNotNull(); @@ -352,10 +352,10 @@ void testCreateElicitationWithoutElicitationCapabilities(String clientType) { var clientBuilder = clientBuilders.get(clientType); McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> exchange.createElicitation(mock(ElicitRequest.class)) - .then(Mono.just(mock(CallToolResult.class)))) - .build(); + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> exchange.createElicitation(mock(ElicitRequest.class)) + .then(Mono.just(mock(CallToolResult.class)))) + .build(); var server = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0").tools(tool).build(); @@ -369,7 +369,7 @@ void testCreateElicitationWithoutElicitationCapabilities(String clientType) { } catch (McpError e) { assertThat(e).isInstanceOf(McpError.class) - .hasMessage("Client must be configured with elicitation capabilities"); + .hasMessage("Client must be configured with elicitation capabilities"); } } server.closeGracefully().block(); @@ -393,31 +393,31 @@ void testCreateElicitationSuccess(String clientType) { null); McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { - var elicitationRequest = McpSchema.ElicitRequest.builder() - .message("Test message") - .requestedSchema( - Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string")))) - .build(); + var elicitationRequest = McpSchema.ElicitRequest.builder() + .message("Test message") + .requestedSchema( + Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string")))) + .build(); - StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> { - assertThat(result).isNotNull(); - assertThat(result.action()).isEqualTo(McpSchema.ElicitResult.Action.ACCEPT); - assertThat(result.content().get("message")).isEqualTo("Test message"); - }).verifyComplete(); + StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> { + assertThat(result).isNotNull(); + assertThat(result.action()).isEqualTo(McpSchema.ElicitResult.Action.ACCEPT); + assertThat(result.content().get("message")).isEqualTo("Test message"); + }).verifyComplete(); - return Mono.just(callResponse); - }) - .build(); + return Mono.just(callResponse); + }) + .build(); var mcpServer = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0").tools(tool).build(); try (var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) - .capabilities(ClientCapabilities.builder().elicitation().build()) - .elicitation(elicitationHandler) - .build()) { + .capabilities(ClientCapabilities.builder().elicitation().build()) + .elicitation(elicitationHandler) + .build()) { InitializeResult initResult = mcpClient.initialize(); assertThat(initResult).isNotNull(); @@ -443,9 +443,9 @@ void testCreateElicitationWithRequestTimeoutSuccess(String clientType) { }; var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) - .capabilities(ClientCapabilities.builder().elicitation().build()) - .elicitation(elicitationHandler) - .build(); + .capabilities(ClientCapabilities.builder().elicitation().build()) + .elicitation(elicitationHandler) + .build(); CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); @@ -453,25 +453,25 @@ void testCreateElicitationWithRequestTimeoutSuccess(String clientType) { AtomicReference resultRef = new AtomicReference<>(); McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { - var elicitationRequest = McpSchema.ElicitRequest.builder() - .message("Test message") - .requestedSchema( - Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string")))) - .build(); + var elicitationRequest = McpSchema.ElicitRequest.builder() + .message("Test message") + .requestedSchema( + Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string")))) + .build(); - return exchange.createElicitation(elicitationRequest) - .doOnNext(resultRef::set) - .then(Mono.just(callResponse)); - }) - .build(); + return exchange.createElicitation(elicitationRequest) + .doOnNext(resultRef::set) + .then(Mono.just(callResponse)); + }) + .build(); var mcpServer = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0") - .requestTimeout(Duration.ofSeconds(3)) - .tools(tool) - .build(); + .requestTimeout(Duration.ofSeconds(3)) + .tools(tool) + .build(); InitializeResult initResult = mcpClient.initialize(); assertThat(initResult).isNotNull(); @@ -514,34 +514,34 @@ void testCreateElicitationWithRequestTimeoutFail(String clientType) { }; var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) - .capabilities(ClientCapabilities.builder().elicitation().build()) - .elicitation(elicitationHandler) - .build(); + .capabilities(ClientCapabilities.builder().elicitation().build()) + .elicitation(elicitationHandler) + .build(); CallToolResult callResponse = new CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); AtomicReference resultRef = new AtomicReference<>(); McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { - var elicitationRequest = ElicitRequest.builder() - .message("Test message") - .requestedSchema( - Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string")))) - .build(); + var elicitationRequest = ElicitRequest.builder() + .message("Test message") + .requestedSchema( + Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string")))) + .build(); - return exchange.createElicitation(elicitationRequest) - .doOnNext(resultRef::set) - .then(Mono.just(callResponse)); - }) - .build(); + return exchange.createElicitation(elicitationRequest) + .doOnNext(resultRef::set) + .then(Mono.just(callResponse)); + }) + .build(); var mcpServer = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0") - .requestTimeout(Duration.ofSeconds(1)) // 1 second. - .tools(tool) - .build(); + .requestTimeout(Duration.ofSeconds(1)) // 1 second. + .tools(tool) + .build(); InitializeResult initResult = mcpClient.initialize(); assertThat(initResult).isNotNull(); @@ -570,12 +570,12 @@ void testRootsSuccess(String clientType) { AtomicReference> rootsRef = new AtomicReference<>(); var mcpServer = prepareSyncServerBuilder() - .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) - .build(); + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) + .build(); try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) - .roots(roots) - .build()) { + .roots(roots) + .build()) { InitializeResult initResult = mcpClient.initialize(); assertThat(initResult).isNotNull(); @@ -614,14 +614,14 @@ void testRootsWithoutCapability(String clientType) { var clientBuilder = clientBuilders.get(clientType); McpServerFeatures.SyncToolSpecification tool = McpServerFeatures.SyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { - exchange.listRoots(); // try to list roots + exchange.listRoots(); // try to list roots - return mock(CallToolResult.class); - }) - .build(); + return mock(CallToolResult.class); + }) + .build(); var mcpServer = prepareSyncServerBuilder().rootsChangeHandler((exchange, rootsUpdate) -> { }).tools(tool).build(); @@ -654,12 +654,12 @@ void testRootsNotificationWithEmptyRootsList(String clientType) { AtomicReference> rootsRef = new AtomicReference<>(); var mcpServer = prepareSyncServerBuilder() - .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) - .build(); + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) + .build(); try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) - .roots(List.of()) // Empty roots list - .build()) { + .roots(List.of()) // Empty roots list + .build()) { InitializeResult initResult = mcpClient.initialize(); assertThat(initResult).isNotNull(); @@ -686,13 +686,13 @@ void testRootsWithMultipleHandlers(String clientType) { AtomicReference> rootsRef2 = new AtomicReference<>(); var mcpServer = prepareSyncServerBuilder() - .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef1.set(rootsUpdate)) - .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef2.set(rootsUpdate)) - .build(); + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef1.set(rootsUpdate)) + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef2.set(rootsUpdate)) + .build(); try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) - .roots(roots) - .build()) { + .roots(roots) + .build()) { assertThat(mcpClient.initialize()).isNotNull(); @@ -718,12 +718,12 @@ void testRootsServerCloseWithActiveSubscription(String clientType) { AtomicReference> rootsRef = new AtomicReference<>(); var mcpServer = prepareSyncServerBuilder() - .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) - .build(); + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) + .build(); try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) - .roots(roots) - .build()) { + .roots(roots) + .build()) { InitializeResult initResult = mcpClient.initialize(); assertThat(initResult).isNotNull(); @@ -757,30 +757,30 @@ void testToolCallSuccess(String clientType) { var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); McpServerFeatures.SyncToolSpecification tool1 = McpServerFeatures.SyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { - try { - HttpResponse response = HttpClient.newHttpClient() - .send(HttpRequest.newBuilder() - .uri(URI.create( - "https://raw.githubusercontent.com/modelcontextprotocol/java-sdk/refs/heads/main/README.md")) - .GET() - .build(), HttpResponse.BodyHandlers.ofString()); - String responseBody = response.body(); - assertThat(responseBody).isNotBlank(); - } - catch (Exception e) { - e.printStackTrace(); - } - - return callResponse; - }) - .build(); + try { + HttpResponse response = HttpClient.newHttpClient() + .send(HttpRequest.newBuilder() + .uri(URI.create( + "https://raw.githubusercontent.com/modelcontextprotocol/java-sdk/refs/heads/main/README.md")) + .GET() + .build(), HttpResponse.BodyHandlers.ofString()); + String responseBody = response.body(); + assertThat(responseBody).isNotBlank(); + } + catch (Exception e) { + e.printStackTrace(); + } + + return callResponse; + }) + .build(); var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tool1) - .build(); + .tools(tool1) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -804,20 +804,20 @@ void testThrowingToolCallIsCaughtBeforeTimeout(String clientType) { var clientBuilder = clientBuilders.get(clientType); McpSyncServer mcpServer = prepareSyncServerBuilder() - .capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(McpServerFeatures.SyncToolSpecification.builder() - .tool(Tool.builder() - .name("tool1") - .description("tool1 description") - .inputSchema(emptyJsonSchema) - .build()) - .callHandler((exchange, request) -> { - // We trigger a timeout on blocking read, raising an exception - Mono.never().block(Duration.ofSeconds(1)); - return null; - }) - .build()) - .build(); + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(McpServerFeatures.SyncToolSpecification.builder() + .tool(Tool.builder() + .name("tool1") + .description("tool1 description") + .inputSchema(emptyJsonSchema) + .build()) + .callHandler((exchange, request) -> { + // We trigger a timeout on blocking read, raising an exception + Mono.never().block(Duration.ofSeconds(1)); + return null; + }) + .build()) + .build(); try (var mcpClient = clientBuilder.requestTimeout(Duration.ofMillis(6666)).build()) { InitializeResult initResult = mcpClient.initialize(); @@ -827,8 +827,8 @@ void testThrowingToolCallIsCaughtBeforeTimeout(String clientType) { // the offending tool // instead of getting back a timeout. assertThatExceptionOfType(McpError.class) - .isThrownBy(() -> mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()))) - .withMessageContaining("Timeout on blocking read"); + .isThrownBy(() -> mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()))) + .withMessageContaining("Timeout on blocking read"); } mcpServer.close(); @@ -842,41 +842,41 @@ void testToolListChangeHandlingSuccess(String clientType) { var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); McpServerFeatures.SyncToolSpecification tool1 = McpServerFeatures.SyncToolSpecification.builder() - .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) - .callHandler((exchange, request) -> { - // perform a blocking call to a remote service - try { - HttpResponse response = HttpClient.newHttpClient() - .send(HttpRequest.newBuilder() - .uri(URI.create( - "https://raw.githubusercontent.com/modelcontextprotocol/java-sdk/refs/heads/main/README.md")) - .GET() - .build(), HttpResponse.BodyHandlers.ofString()); - String responseBody = response.body(); - assertThat(responseBody).isNotBlank(); - } - catch (Exception e) { - e.printStackTrace(); - } - return callResponse; - }) - .build(); + .tool(Tool.builder().name("tool1").description("tool1 description").inputSchema(emptyJsonSchema).build()) + .callHandler((exchange, request) -> { + // perform a blocking call to a remote service + try { + HttpResponse response = HttpClient.newHttpClient() + .send(HttpRequest.newBuilder() + .uri(URI.create( + "https://raw.githubusercontent.com/modelcontextprotocol/java-sdk/refs/heads/main/README.md")) + .GET() + .build(), HttpResponse.BodyHandlers.ofString()); + String responseBody = response.body(); + assertThat(responseBody).isNotBlank(); + } + catch (Exception e) { + e.printStackTrace(); + } + return callResponse; + }) + .build(); AtomicReference> toolsRef = new AtomicReference<>(); var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tool1) - .build(); + .tools(tool1) + .build(); try (var mcpClient = clientBuilder.toolsChangeConsumer(toolsUpdate -> { // perform a blocking call to a remote service try { HttpResponse response = HttpClient.newHttpClient() - .send(HttpRequest.newBuilder() - .uri(URI.create( - "https://raw.githubusercontent.com/modelcontextprotocol/java-sdk/refs/heads/main/README.md")) - .GET() - .build(), HttpResponse.BodyHandlers.ofString()); + .send(HttpRequest.newBuilder() + .uri(URI.create( + "https://raw.githubusercontent.com/modelcontextprotocol/java-sdk/refs/heads/main/README.md")) + .GET() + .build(), HttpResponse.BodyHandlers.ofString()); String responseBody = response.body(); assertThat(responseBody).isNotBlank(); toolsRef.set(toolsUpdate); @@ -908,13 +908,13 @@ void testToolListChangeHandlingSuccess(String clientType) { // Add a new tool McpServerFeatures.SyncToolSpecification tool2 = McpServerFeatures.SyncToolSpecification.builder() - .tool(Tool.builder() - .name("tool2") - .description("tool2 description") - .inputSchema(emptyJsonSchema) - .build()) - .callHandler((exchange, request) -> callResponse) - .build(); + .tool(Tool.builder() + .name("tool2") + .description("tool2 description") + .inputSchema(emptyJsonSchema) + .build()) + .callHandler((exchange, request) -> callResponse) + .build(); mcpServer.addTool(tool2); @@ -947,8 +947,8 @@ void testListToolsSuccess(String clientType, int availableElements) { } var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tools) - .build(); + .tools(tools) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -997,8 +997,8 @@ void testListToolsCursorInvalidListChanged(String clientType) { } var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tools) - .build(); + .tools(tools) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1012,12 +1012,12 @@ void testListToolsCursorInvalidListChanged(String clientType) { mcpServer.addTool(new McpServerFeatures.SyncToolSpecification(mock, null)); assertThatThrownBy(() -> mcpClient.listTools(res.nextCursor())).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); } @@ -1034,8 +1034,8 @@ void testListToolsInvalidCursor(String clientType) { var spec = new McpServerFeatures.SyncToolSpecification(mock, null); var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(spec) - .build(); + .tools(spec) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1043,12 +1043,12 @@ void testListToolsInvalidCursor(String clientType) { assertThat(initResult).isNotNull(); assertThatThrownBy(() -> mcpClient.listTools("INVALID")).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); } @@ -1087,16 +1087,16 @@ void testLoggingNotification(String clientType) throws InterruptedException { // Create server with a tool that sends logging notifications McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder() - .name("logging-test") - .description("Test logging notifications") - .inputSchema(emptyJsonSchema) - .build()) - .callHandler((exchange, request) -> { + .tool(Tool.builder() + .name("logging-test") + .description("Test logging notifications") + .inputSchema(emptyJsonSchema) + .build()) + .callHandler((exchange, request) -> { - // Create and send notifications with different levels + // Create and send notifications with different levels - //@formatter:off + //@formatter:off return exchange // This should be filtered out (DEBUG < NOTICE) .loggingNotification(McpSchema.LoggingMessageNotification.builder() .level(McpSchema.LoggingLevel.DEBUG) @@ -1129,13 +1129,13 @@ void testLoggingNotification(String clientType) throws InterruptedException { .build())) .thenReturn(new CallToolResult("Logging test completed", false)); //@formatter:on - }) - .build(); + }) + .build(); var mcpServer = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tool) - .build(); + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tool) + .build(); try ( // Create client with logging notification handler @@ -1163,7 +1163,7 @@ void testLoggingNotification(String clientType) throws InterruptedException { assertThat(receivedNotifications).hasSize(expectedNotificationsCount); Map notificationMap = receivedNotifications.stream() - .collect(Collectors.toMap(n -> n.data(), n -> n)); + .collect(Collectors.toMap(n -> n.data(), n -> n)); // First notification should be NOTICE level assertThat(notificationMap.get("Notice message").level()).isEqualTo(McpSchema.LoggingLevel.NOTICE); @@ -1199,35 +1199,35 @@ void testProgressNotification(String clientType) throws InterruptedException { // Create server with a tool that sends logging notifications McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(McpSchema.Tool.builder() - .name("progress-test") - .description("Test progress notifications") - .inputSchema(emptyJsonSchema) - .build()) - .callHandler((exchange, request) -> { - - // Create and send notifications - var progressToken = (String) request.meta().get("progressToken"); - - return exchange - .progressNotification( - new McpSchema.ProgressNotification(progressToken, 0.0, 1.0, "Processing started")) - .then(exchange.progressNotification( - new McpSchema.ProgressNotification(progressToken, 0.5, 1.0, "Processing data"))) - .then(// Send a progress notification with another progress value - // should - exchange.progressNotification(new McpSchema.ProgressNotification("another-progress-token", - 0.0, 1.0, "Another processing started"))) - .then(exchange.progressNotification( - new McpSchema.ProgressNotification(progressToken, 1.0, 1.0, "Processing completed"))) - .thenReturn(new CallToolResult(("Progress test completed"), false)); - }) - .build(); + .tool(McpSchema.Tool.builder() + .name("progress-test") + .description("Test progress notifications") + .inputSchema(emptyJsonSchema) + .build()) + .callHandler((exchange, request) -> { + + // Create and send notifications + var progressToken = (String) request.meta().get("progressToken"); + + return exchange + .progressNotification( + new McpSchema.ProgressNotification(progressToken, 0.0, 1.0, "Processing started")) + .then(exchange.progressNotification( + new McpSchema.ProgressNotification(progressToken, 0.5, 1.0, "Processing data"))) + .then(// Send a progress notification with another progress value + // should + exchange.progressNotification(new McpSchema.ProgressNotification("another-progress-token", + 0.0, 1.0, "Another processing started"))) + .then(exchange.progressNotification( + new McpSchema.ProgressNotification(progressToken, 1.0, 1.0, "Processing completed"))) + .thenReturn(new CallToolResult(("Progress test completed"), false)); + }) + .build(); var mcpServer = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tool) - .build(); + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tool) + .build(); try ( // Create client with progress notification handler @@ -1242,9 +1242,9 @@ void testProgressNotification(String clientType) throws InterruptedException { // Call the tool that sends progress notifications McpSchema.CallToolRequest callToolRequest = McpSchema.CallToolRequest.builder() - .name("progress-test") - .meta(Map.of("progressToken", "test-progress-token")) - .build(); + .name("progress-test") + .meta(Map.of("progressToken", "test-progress-token")) + .build(); CallToolResult result = mcpClient.callTool(callToolRequest); assertThat(result).isNotNull(); assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class); @@ -1256,7 +1256,7 @@ void testProgressNotification(String clientType) throws InterruptedException { assertThat(receivedNotifications).hasSize(expectedNotificationsCount); Map notificationMap = receivedNotifications.stream() - .collect(Collectors.toMap(n -> n.message(), n -> n)); + .collect(Collectors.toMap(n -> n.message(), n -> n)); // First notification should be 0.0/1.0 progress assertThat(notificationMap.get("Processing started").progressToken()).isEqualTo("test-progress-token"); @@ -1272,11 +1272,11 @@ void testProgressNotification(String clientType) throws InterruptedException { // Third notification should be another progress token with 0.0/1.0 progress assertThat(notificationMap.get("Another processing started").progressToken()) - .isEqualTo("another-progress-token"); + .isEqualTo("another-progress-token"); assertThat(notificationMap.get("Another processing started").progress()).isEqualTo(0.0); assertThat(notificationMap.get("Another processing started").total()).isEqualTo(1.0); assertThat(notificationMap.get("Another processing started").message()) - .isEqualTo("Another processing started"); + .isEqualTo("Another processing started"); // Fourth notification should be 1.0/1.0 progress assertThat(notificationMap.get("Processing completed").progressToken()).isEqualTo("test-progress-token"); @@ -1304,19 +1304,19 @@ void testCompletionShouldReturnExpectedSuggestions(String clientType) { AtomicReference samplingRequest = new AtomicReference<>(); BiFunction completionHandler = (mcpSyncServerExchange, - request) -> { + request) -> { samplingRequest.set(request); return completionResponse; }; var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().completions().build()) - .prompts(new McpServerFeatures.SyncPromptSpecification( - new Prompt("code_review", "Code review", "this is code review prompt", - List.of(new PromptArgument("language", "Language", "string", false))), - (mcpSyncServerExchange, getPromptRequest) -> null)) - .completions(new McpServerFeatures.SyncCompletionSpecification( - new McpSchema.PromptReference("ref/prompt", "code_review", "Code review"), completionHandler)) - .build(); + .prompts(new McpServerFeatures.SyncPromptSpecification( + new Prompt("code_review", "Code review", "this is code review prompt", + List.of(new PromptArgument("language", "Language", "string", false))), + (mcpSyncServerExchange, getPromptRequest) -> null)) + .completions(new McpServerFeatures.SyncCompletionSpecification( + new McpSchema.PromptReference("ref/prompt", "code_review", "Code review"), completionHandler)) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1352,35 +1352,35 @@ void testPingSuccess(String clientType) { AtomicReference executionOrder = new AtomicReference<>(""); McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() - .tool(Tool.builder() - .name("ping-async-test") - .description("Test ping async behavior") - .inputSchema(emptyJsonSchema) - .build()) - .callHandler((exchange, request) -> { - - executionOrder.set(executionOrder.get() + "1"); - - // Test async ping behavior - return exchange.ping().doOnNext(result -> { - - assertThat(result).isNotNull(); - // Ping should return an empty object or map - assertThat(result).isInstanceOf(Map.class); - - executionOrder.set(executionOrder.get() + "2"); - assertThat(result).isNotNull(); - }).then(Mono.fromCallable(() -> { - executionOrder.set(executionOrder.get() + "3"); - return new CallToolResult("Async ping test completed", false); - })); - }) - .build(); + .tool(Tool.builder() + .name("ping-async-test") + .description("Test ping async behavior") + .inputSchema(emptyJsonSchema) + .build()) + .callHandler((exchange, request) -> { + + executionOrder.set(executionOrder.get() + "1"); + + // Test async ping behavior + return exchange.ping().doOnNext(result -> { + + assertThat(result).isNotNull(); + // Ping should return an empty object or map + assertThat(result).isInstanceOf(Map.class); + + executionOrder.set(executionOrder.get() + "2"); + assertThat(result).isNotNull(); + }).then(Mono.fromCallable(() -> { + executionOrder.set(executionOrder.get() + "3"); + return new CallToolResult("Async ping test completed", false); + })); + }) + .build(); var mcpServer = prepareAsyncServerBuilder().serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tool) - .build(); + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tool) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1416,27 +1416,27 @@ void testStructuredOutputValidationSuccess(String clientType) { "required", List.of("result", "operation")); Tool calculatorTool = Tool.builder() - .name("calculator") - .description("Performs mathematical calculations") - .outputSchema(outputSchema) - .build(); + .name("calculator") + .description("Performs mathematical calculations") + .outputSchema(outputSchema) + .build(); McpServerFeatures.SyncToolSpecification tool = McpServerFeatures.SyncToolSpecification.builder() - .tool(calculatorTool) - .callHandler((exchange, request) -> { - String expression = (String) request.arguments().getOrDefault("expression", "2 + 3"); - double result = evaluateExpression(expression); - return CallToolResult.builder() - .structuredContent( - Map.of("result", result, "operation", expression, "timestamp", "2024-01-01T10:00:00Z")) - .build(); - }) - .build(); + .tool(calculatorTool) + .callHandler((exchange, request) -> { + String expression = (String) request.arguments().getOrDefault("expression", "2 + 3"); + double result = evaluateExpression(expression); + return CallToolResult.builder() + .structuredContent( + Map.of("result", result, "operation", expression, "timestamp", "2024-01-01T10:00:00Z")) + .build(); + }) + .build(); var mcpServer = prepareSyncServerBuilder().serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tool) - .build(); + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tool) + .build(); try (var mcpClient = clientBuilder.build()) { InitializeResult initResult = mcpClient.initialize(); @@ -1450,7 +1450,7 @@ void testStructuredOutputValidationSuccess(String clientType) { // Call tool with valid structured output CallToolResult response = mcpClient - .callTool(new McpSchema.CallToolRequest("calculator", Map.of("expression", "2 + 3"))); + .callTool(new McpSchema.CallToolRequest("calculator", Map.of("expression", "2 + 3"))); assertThat(response).isNotNull(); assertThat(response.isError()).isFalse(); @@ -1458,8 +1458,8 @@ void testStructuredOutputValidationSuccess(String clientType) { // In WebMVC, structured content is returned properly if (response.structuredContent() != null) { assertThat(response.structuredContent()).containsEntry("result", 5.0) - .containsEntry("operation", "2 + 3") - .containsEntry("timestamp", "2024-01-01T10:00:00Z"); + .containsEntry("operation", "2 + 3") + .containsEntry("timestamp", "2024-01-01T10:00:00Z"); } else { // Fallback to checking content if structured content is not available @@ -1468,9 +1468,9 @@ void testStructuredOutputValidationSuccess(String clientType) { assertThat(response.structuredContent()).isNotNull(); assertThatJson(response.structuredContent()).when(Option.IGNORING_ARRAY_ORDER) - .when(Option.IGNORING_EXTRA_ARRAY_ITEMS) - .isObject() - .isEqualTo(json(""" + .when(Option.IGNORING_EXTRA_ARRAY_ITEMS) + .isObject() + .isEqualTo(json(""" {"result":5.0,"operation":"2 + 3","timestamp":"2024-01-01T10:00:00Z"}""")); } @@ -1489,27 +1489,27 @@ void testStructuredOutputValidationFailure(String clientType) { List.of("result", "operation")); Tool calculatorTool = Tool.builder() - .name("calculator") - .description("Performs mathematical calculations") - .outputSchema(outputSchema) - .build(); + .name("calculator") + .description("Performs mathematical calculations") + .outputSchema(outputSchema) + .build(); McpServerFeatures.SyncToolSpecification tool = McpServerFeatures.SyncToolSpecification.builder() - .tool(calculatorTool) - .callHandler((exchange, request) -> { - // Return invalid structured output. Result should be number, missing - // operation - return CallToolResult.builder() - .addTextContent("Invalid calculation") - .structuredContent(Map.of("result", "not-a-number", "extra", "field")) - .build(); - }) - .build(); + .tool(calculatorTool) + .callHandler((exchange, request) -> { + // Return invalid structured output. Result should be number, missing + // operation + return CallToolResult.builder() + .addTextContent("Invalid calculation") + .structuredContent(Map.of("result", "not-a-number", "extra", "field")) + .build(); + }) + .build(); var mcpServer = prepareSyncServerBuilder().serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tool) - .build(); + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tool) + .build(); try (var mcpClient = clientBuilder.build()) { InitializeResult initResult = mcpClient.initialize(); @@ -1517,7 +1517,7 @@ void testStructuredOutputValidationFailure(String clientType) { // Call tool with invalid structured output CallToolResult response = mcpClient - .callTool(new McpSchema.CallToolRequest("calculator", Map.of("expression", "2 + 3"))); + .callTool(new McpSchema.CallToolRequest("calculator", Map.of("expression", "2 + 3"))); assertThat(response).isNotNull(); assertThat(response.isError()).isTrue(); @@ -1542,23 +1542,23 @@ void testStructuredOutputMissingStructuredContent(String clientType) { Map.of("result", Map.of("type", "number")), "required", List.of("result")); Tool calculatorTool = Tool.builder() - .name("calculator") - .description("Performs mathematical calculations") - .outputSchema(outputSchema) - .build(); + .name("calculator") + .description("Performs mathematical calculations") + .outputSchema(outputSchema) + .build(); McpServerFeatures.SyncToolSpecification tool = McpServerFeatures.SyncToolSpecification.builder() - .tool(calculatorTool) - .callHandler((exchange, request) -> { - // Return result without structured content but tool has output schema - return CallToolResult.builder().addTextContent("Calculation completed").build(); - }) - .build(); + .tool(calculatorTool) + .callHandler((exchange, request) -> { + // Return result without structured content but tool has output schema + return CallToolResult.builder().addTextContent("Calculation completed").build(); + }) + .build(); var mcpServer = prepareSyncServerBuilder().serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().tools(true).build()) - .tools(tool) - .build(); + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tool) + .build(); try (var mcpClient = clientBuilder.build()) { InitializeResult initResult = mcpClient.initialize(); @@ -1566,7 +1566,7 @@ void testStructuredOutputMissingStructuredContent(String clientType) { // Call tool that should return structured content but doesn't CallToolResult response = mcpClient - .callTool(new McpSchema.CallToolRequest("calculator", Map.of("expression", "2 + 3"))); + .callTool(new McpSchema.CallToolRequest("calculator", Map.of("expression", "2 + 3"))); assertThat(response).isNotNull(); assertThat(response.isError()).isTrue(); @@ -1589,8 +1589,8 @@ void testStructuredOutputRuntimeToolAddition(String clientType) { // Start server without tools var mcpServer = prepareSyncServerBuilder().serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().tools(true).build()) - .build(); + .capabilities(ServerCapabilities.builder().tools(true).build()) + .build(); try (var mcpClient = clientBuilder.build()) { InitializeResult initResult = mcpClient.initialize(); @@ -1605,21 +1605,21 @@ void testStructuredOutputRuntimeToolAddition(String clientType) { List.of("message", "count")); Tool dynamicTool = Tool.builder() - .name("dynamic-tool") - .description("Dynamically added tool") - .outputSchema(outputSchema) - .build(); + .name("dynamic-tool") + .description("Dynamically added tool") + .outputSchema(outputSchema) + .build(); McpServerFeatures.SyncToolSpecification toolSpec = McpServerFeatures.SyncToolSpecification.builder() - .tool(dynamicTool) - .callHandler((exchange, request) -> { - int count = (Integer) request.arguments().getOrDefault("count", 1); - return CallToolResult.builder() - .addTextContent("Dynamic tool executed " + count + " times") - .structuredContent(Map.of("message", "Dynamic execution", "count", count)) - .build(); - }) - .build(); + .tool(dynamicTool) + .callHandler((exchange, request) -> { + int count = (Integer) request.arguments().getOrDefault("count", 1); + return CallToolResult.builder() + .addTextContent("Dynamic tool executed " + count + " times") + .structuredContent(Map.of("message", "Dynamic execution", "count", count)) + .build(); + }) + .build(); // Add tool to server mcpServer.addTool(toolSpec); @@ -1637,7 +1637,7 @@ void testStructuredOutputRuntimeToolAddition(String clientType) { // Call dynamically added tool CallToolResult response = mcpClient - .callTool(new McpSchema.CallToolRequest("dynamic-tool", Map.of("count", 3))); + .callTool(new McpSchema.CallToolRequest("dynamic-tool", Map.of("count", 3))); assertThat(response).isNotNull(); assertThat(response.isError()).isFalse(); @@ -1645,21 +1645,19 @@ void testStructuredOutputRuntimeToolAddition(String clientType) { assertThat(response.content()).hasSize(1); assertThat(response.content().get(0)).isInstanceOf(McpSchema.TextContent.class); assertThat(((McpSchema.TextContent) response.content().get(0)).text()) - .isEqualTo("Dynamic tool executed 3 times"); + .isEqualTo("Dynamic tool executed 3 times"); assertThat(response.structuredContent()).isNotNull(); assertThatJson(response.structuredContent()).when(Option.IGNORING_ARRAY_ORDER) - .when(Option.IGNORING_EXTRA_ARRAY_ITEMS) - .isObject() - .isEqualTo(json(""" + .when(Option.IGNORING_EXTRA_ARRAY_ITEMS) + .isObject() + .isEqualTo(json(""" {"count":3,"message":"Dynamic execution"}""")); } mcpServer.close(); } - - // --------------------------------------- // Tests for Paginated Prompt List Results // --------------------------------------- @@ -1682,8 +1680,8 @@ void testListPromptsSuccess(String clientType, int availableElements) { } var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().prompts(true).build()) - .prompts(prompts) - .build(); + .prompts(prompts) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1735,8 +1733,8 @@ void testListPromptsCursorInvalidListChanged(String clientType) { } var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().prompts(true).build()) - .prompts(prompts) - .build(); + .prompts(prompts) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1752,12 +1750,12 @@ void testListPromptsCursorInvalidListChanged(String clientType) { mcpServer.addPrompt(new McpServerFeatures.SyncPromptSpecification(mock, null)); assertThatThrownBy(() -> mcpClient.listPrompts(res.nextCursor())).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); } @@ -1776,8 +1774,8 @@ void testListPromptsInvalidCursor(String clientType) { var spec = new McpServerFeatures.SyncPromptSpecification(mock, null); var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().prompts(true).build()) - .prompts(spec) - .build(); + .prompts(spec) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1785,12 +1783,12 @@ void testListPromptsInvalidCursor(String clientType) { assertThat(initResult).isNotNull(); assertThatThrownBy(() -> mcpClient.listPrompts("INVALID")).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); } @@ -1818,9 +1816,10 @@ void testListResourcesSuccess(String clientType, int availableElements) { resources.add(spec); } - var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resources(resources) - .build(); + var mcpServer = prepareSyncServerBuilder() + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resources(resources) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1870,9 +1869,10 @@ void testListResourcesCursorInvalidListChanged(String clientType) { resources.add(spec); } - var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resources(resources) - .build(); + var mcpServer = prepareSyncServerBuilder() + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resources(resources) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1887,12 +1887,12 @@ void testListResourcesCursorInvalidListChanged(String clientType) { mcpServer.addResource(new McpServerFeatures.SyncResourceSpecification(mock, null)); assertThatThrownBy(() -> mcpClient.listResources(res.nextCursor())).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); } @@ -1909,9 +1909,10 @@ void testListResourcesInvalidCursor(String clientType) { "application/octet-stream", null); var spec = new McpServerFeatures.SyncResourceSpecification(mock, null); - var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resources(spec) - .build(); + var mcpServer = prepareSyncServerBuilder() + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resources(spec) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1919,12 +1920,12 @@ void testListResourcesInvalidCursor(String clientType) { assertThat(initResult).isNotNull(); assertThatThrownBy(() -> mcpClient.listResources("INVALID")).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); } @@ -1949,9 +1950,10 @@ void testListResourceTemplatesSuccess(String clientType, int availableElements) "Test Resource Description", "application/octet-stream", null)); } - var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resourceTemplates(resourceTemplates) - .build(); + var mcpServer = prepareSyncServerBuilder() + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resourceTemplates(resourceTemplates) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -1992,9 +1994,10 @@ void testListResourceTemplatesInvalidCursor(String clientType) { var mock = new McpSchema.ResourceTemplate("file://{path}.txt", "test-resource", "Test Resource Description", "application/octet-stream", null); - var mcpServer = prepareSyncServerBuilder().capabilities(ServerCapabilities.builder().resources(true, true).build()) - .resourceTemplates(mock) - .build(); + var mcpServer = prepareSyncServerBuilder() + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .resourceTemplates(mock) + .build(); try (var mcpClient = clientBuilder.build()) { @@ -2002,12 +2005,12 @@ void testListResourceTemplatesInvalidCursor(String clientType) { assertThat(initResult).isNotNull(); assertThatThrownBy(() -> mcpClient.listResourceTemplates("INVALID")).isInstanceOf(McpError.class) - .hasMessage("Invalid cursor") - .satisfies(exception -> { - var error = (McpError) exception; - assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); - assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); - }); + .hasMessage("Invalid cursor") + .satisfies(exception -> { + var error = (McpError) exception; + assertThat(error.getJsonRpcError().code()).isEqualTo(INVALID_PARAMS); + assertThat(error.getJsonRpcError().message()).isEqualTo("Invalid cursor"); + }); } diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 392db98f8..4939192d8 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -130,8 +130,8 @@ public class McpAsyncServer { * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization */ McpAsyncServer(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper, - McpServerFeatures.Async features, Duration requestTimeout, - McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) { + McpServerFeatures.Async features, Duration requestTimeout, + McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) { this.mcpTransportProvider = mcpTransportProvider; this.objectMapper = objectMapper; this.serverInfo = features.serverInfo(); @@ -155,8 +155,8 @@ public class McpAsyncServer { } McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper, - McpServerFeatures.Async features, Duration requestTimeout, - McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) { + McpServerFeatures.Async features, Duration requestTimeout, + McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) { this.mcpTransportProvider = mcpTransportProvider; this.objectMapper = objectMapper; this.serverInfo = features.serverInfo(); @@ -185,11 +185,11 @@ private Map prepareNotificationHandlers(McpServe notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty()); List, Mono>> rootsChangeConsumers = features - .rootsChangeConsumers(); + .rootsChangeConsumers(); if (Utils.isEmpty(rootsChangeConsumers)) { rootsChangeConsumers = List.of((exchange, roots) -> Mono.fromRunnable(() -> logger - .warn("Roots list changed notification, but no consumers provided. Roots list changed: {}", roots))); + .warn("Roots list changed notification, but no consumers provided. Roots list changed: {}", roots))); } notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED, @@ -302,13 +302,13 @@ public void close() { private McpNotificationHandler asyncRootsListChangedNotificationHandler( List, Mono>> rootsChangeConsumers) { return (exchange, params) -> exchange.listRoots() - .flatMap(listRootsResult -> Flux.fromIterable(rootsChangeConsumers) - .flatMap(consumer -> Mono.defer(() -> consumer.apply(exchange, listRootsResult.roots()))) - .onErrorResume(error -> { - logger.error("Error handling roots list change notification", error); - return Mono.empty(); - }) - .then()); + .flatMap(listRootsResult -> Flux.fromIterable(rootsChangeConsumers) + .flatMap(consumer -> Mono.defer(() -> consumer.apply(exchange, listRootsResult.roots()))) + .onErrorResume(error -> { + logger.error("Error handling roots list change notification", error); + return Mono.empty(); + }) + .then()); } // --------------------------------------- @@ -363,8 +363,8 @@ private static class StructuredOutputCallToolHandler private final Map outputSchema; public StructuredOutputCallToolHandler(JsonSchemaValidator jsonSchemaValidator, - Map outputSchema, - BiFunction> delegateHandler) { + Map outputSchema, + BiFunction> delegateHandler) { Assert.notNull(jsonSchemaValidator, "JsonSchemaValidator must not be null"); Assert.notNull(delegateHandler, "Delegate call tool result handler must not be null"); @@ -450,10 +450,10 @@ private static McpServerFeatures.AsyncToolSpecification withStructuredOutputHand } return McpServerFeatures.AsyncToolSpecification.builder() - .tool(toolSpecification.tool()) - .callHandler(new StructuredOutputCallToolHandler(jsonSchemaValidator, - toolSpecification.tool().outputSchema(), toolSpecification.callHandler())) - .build(); + .tool(toolSpecification.tool()) + .callHandler(new StructuredOutputCallToolHandler(jsonSchemaValidator, + toolSpecification.tool().outputSchema(), toolSpecification.callHandler())) + .build(); } /** @@ -471,7 +471,7 @@ public Mono removeTool(String toolName) { return Mono.defer(() -> { boolean removed = this.tools - .removeIf(toolSpecification -> toolSpecification.tool().name().equals(toolName)); + .removeIf(toolSpecification -> toolSpecification.tool().name().equals(toolName)); if (removed) { logger.debug("Removed tool handler: {}", toolName); if (this.serverCapabilities.tools().listChanged()) { @@ -506,10 +506,10 @@ private McpRequestHandler toolsListRequestHandler() { var nextCursor = getCursor(endIndex, mapSize, mapHash); var resultList = this.tools.stream() - .skip(requestedStartIndex) - .limit(endIndex - requestedStartIndex) - .map(McpServerFeatures.AsyncToolSpecification::tool) - .toList(); + .skip(requestedStartIndex) + .limit(endIndex - requestedStartIndex) + .map(McpServerFeatures.AsyncToolSpecification::tool) + .toList(); return Mono.just(new McpSchema.ListToolsResult(resultList, nextCursor)); }; @@ -522,15 +522,15 @@ private McpRequestHandler toolsCallRequestHandler() { }); Optional toolSpecification = this.tools.stream() - .filter(tr -> callToolRequest.name().equals(tr.tool().name())) - .findAny(); + .filter(tr -> callToolRequest.name().equals(tr.tool().name())) + .findAny(); if (toolSpecification.isEmpty()) { return Mono.error(new McpError("Tool not found: " + callToolRequest.name())); } return toolSpecification.map(tool -> Mono.defer(() -> tool.callHandler().apply(exchange, callToolRequest))) - .orElse(Mono.error(new McpError("Tool not found: " + callToolRequest.name()))); + .orElse(Mono.error(new McpError("Tool not found: " + callToolRequest.name()))); }; } @@ -623,11 +623,11 @@ private McpRequestHandler resourcesListRequestHan var nextCursor = getCursor(endIndex, mapSize, mapHash); var resultList = this.resources.values() - .stream() - .skip(requestedStartIndex) - .limit(endIndex - requestedStartIndex) - .map(McpServerFeatures.AsyncResourceSpecification::resource) - .toList(); + .stream() + .skip(requestedStartIndex) + .limit(endIndex - requestedStartIndex) + .map(McpServerFeatures.AsyncResourceSpecification::resource) + .toList(); return Mono.just(new McpSchema.ListResourcesResult(resultList, nextCursor)); }; @@ -658,15 +658,15 @@ private McpRequestHandler resourceTemplat private List getResourceTemplates() { var list = new ArrayList<>(this.resourceTemplates); List resourceTemplates = this.resources.keySet() - .stream() - .filter(uri -> uri.contains("{")) - .map(uri -> { - var resource = this.resources.get(uri).resource(); - var template = new McpSchema.ResourceTemplate(resource.uri(), resource.name(), resource.title(), - resource.description(), resource.mimeType(), resource.annotations()); - return template; - }) - .toList(); + .stream() + .filter(uri -> uri.contains("{")) + .map(uri -> { + var resource = this.resources.get(uri).resource(); + var template = new McpSchema.ResourceTemplate(resource.uri(), resource.name(), resource.title(), + resource.description(), resource.mimeType(), resource.annotations()); + return template; + }) + .toList(); list.addAll(resourceTemplates); @@ -681,12 +681,12 @@ private McpRequestHandler resourcesReadRequestHand var resourceUri = resourceRequest.uri(); McpServerFeatures.AsyncResourceSpecification specification = this.resources.values() - .stream() - .filter(resourceSpecification -> this.uriTemplateManagerFactory - .create(resourceSpecification.resource().uri()) - .matches(resourceUri)) - .findFirst() - .orElseThrow(() -> new McpError("Resource not found: " + resourceUri)); + .stream() + .filter(resourceSpecification -> this.uriTemplateManagerFactory + .create(resourceSpecification.resource().uri()) + .matches(resourceUri)) + .findFirst() + .orElseThrow(() -> new McpError("Resource not found: " + resourceUri)); return Mono.defer(() -> specification.readHandler().apply(exchange, resourceRequest)); }; @@ -711,7 +711,7 @@ public Mono addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpe return Mono.defer(() -> { McpServerFeatures.AsyncPromptSpecification specification = this.prompts - .putIfAbsent(promptSpecification.prompt().name(), promptSpecification); + .putIfAbsent(promptSpecification.prompt().name(), promptSpecification); if (specification != null) { return Mono.error( new McpError("Prompt with name '" + promptSpecification.prompt().name() + "' already exists")); @@ -782,11 +782,11 @@ private McpRequestHandler promptsListRequestHandler var nextCursor = getCursor(endIndex, mapSize, mapHash); var resultList = this.prompts.values() - .stream() - .skip(requestedStartIndex) - .limit(endIndex - requestedStartIndex) - .map(McpServerFeatures.AsyncPromptSpecification::prompt) - .toList(); + .stream() + .skip(requestedStartIndex) + .limit(endIndex - requestedStartIndex) + .map(McpServerFeatures.AsyncPromptSpecification::prompt) + .toList(); return Mono.just(new McpSchema.ListPromptsResult(resultList, nextCursor)); }; @@ -880,11 +880,11 @@ private McpRequestHandler completionCompleteRequestHan return Mono.error(new McpError("Prompt not found: " + promptReference.name())); } if (!promptSpec.prompt() - .arguments() - .stream() - .filter(arg -> arg.name().equals(argumentName)) - .findFirst() - .isPresent()) { + .arguments() + .stream() + .filter(arg -> arg.name().equals(argumentName)) + .findFirst() + .isPresent()) { return Mono.error(new McpError("Argument not found: " + argumentName)); } @@ -896,8 +896,8 @@ private McpRequestHandler completionCompleteRequestHan return Mono.error(new McpError("Resource not found: " + resourceReference.uri())); } if (!uriTemplateManagerFactory.create(resourceSpec.resource().uri()) - .getVariableNames() - .contains(argumentName)) { + .getVariableNames() + .contains(argumentName)) { return Mono.error(new McpError("Argument not found: " + argumentName)); }