Skip to content

Commit 7f68ea5

Browse files
authored
Support resource subscriptions (#839)
Clients can now subscribe to specific resources and receive targeted notifications when those resources change. Previously, calling `notifyResourceUpdated` on the server would broadcast the notification to every connected client regardless of interest — now only sessions that have explicitly subscribed to a given resource URI receive the update, making resource change propagation both correct and efficient. The subscription lifecycle is fully handled: clients can subscribe and unsubscribe at any time, and the server cleans up subscription state when a session closes. Supersedes #838. Resolves #837, #776. --------- Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
1 parent ce2c747 commit 7f68ea5

15 files changed

+506
-60
lines changed

conformance-tests/VALIDATION_RESULTS.md

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,22 @@
22

33
## Summary
44

5-
**Server Tests:** 37/40 passed (92.5%)
5+
**Server Tests:** 40/40 passed (100%)
66
**Client Tests:** 3/4 scenarios passed (9/10 checks passed)
77
**Auth Tests:** 12/14 scenarios fully passing (178 passed, 1 failed, 1 warning, 85.7% scenarios, 98.9% checks)
88

99
## Server Test Results
1010

11-
### Passing (37/40)
11+
### Passing (40/40)
1212

1313
- **Lifecycle & Utilities (4/4):** initialize, ping, logging-set-level, completion-complete
1414
- **Tools (11/11):** All scenarios including progress notifications ✨
1515
- **Elicitation (10/10):** SEP-1034 defaults (5 checks), SEP-1330 enums (5 checks)
16-
- **Resources (4/6):** list, read-text, read-binary, templates-read
16+
- **Resources (6/6):** list, read-text, read-binary, templates-read, subscribe, unsubscribe
1717
- **Prompts (4/4):** list, simple, with-args, embedded-resource, with-image
1818
- **SSE Transport (2/2):** Multiple streams
1919
- **Security (2/2):** Localhost validation passes, DNS rebinding protection
2020

21-
### Failing (3/40)
22-
23-
1. **resources-subscribe** - Not implemented in SDK
24-
2. **resources-unsubscribe** - Not implemented in SDK
25-
2621
## Client Test Results
2722

2823
### Passing (3/4 scenarios, 9/10 checks)
@@ -68,10 +63,9 @@ Uses the `client-spring-http-client` module with Spring Security OAuth2 and the
6863

6964
## Known Limitations
7065

71-
1. **Resource Subscriptions:** SDK doesn't implement `resources/subscribe` and `resources/unsubscribe` handlers
72-
2. **Client SSE Retry:** Client doesn't parse or respect the `retry:` field, reconnects immediately, and doesn't send Last-Event-ID header
73-
3. **Auth Scope Step-Up:** Client does not fully handle scope step-up challenges where the server requests additional scopes after initial authorization
74-
4. **Auth Basic CIMD:** Minor conformance warning in the basic Client-Initiated Metadata Discovery flow
66+
1. **Client SSE Retry:** Client doesn't parse or respect the `retry:` field, reconnects immediately, and doesn't send Last-Event-ID header
67+
2. **Auth Scope Step-Up:** Client does not fully handle scope step-up challenges where the server requests additional scopes after initial authorization
68+
3. **Auth Basic CIMD:** Minor conformance warning in the basic Client-Initiated Metadata Discovery flow
7569

7670
## Running Tests
7771

@@ -118,6 +112,5 @@ npx @modelcontextprotocol/conformance@0.1.15 client \
118112

119113
### High Priority
120114
1. Fix client SSE retry field handling in `HttpClientStreamableHttpTransport`
121-
2. Implement resource subscription handlers in `McpStatelessAsyncServer`
122-
3. Implement CIMD
123-
4. Implement scope step up
115+
2. Implement CIMD
116+
3. Implement scope step up

conformance-tests/conformance-baseline.yml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@
22
# This file lists known failing scenarios that are expected to fail until fixed.
33
# See: https://github.com/modelcontextprotocol/conformance/blob/main/SDK_INTEGRATION.md
44

5-
server:
6-
# Resource subscription not implemented in SDK
7-
- resources-subscribe
8-
- resources-unsubscribe
9-
105
client:
116
# SSE retry field handling not implemented
127
# - Client does not parse or respect retry: field timing

conformance-tests/server-servlet/README.md

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ This module contains a comprehensive MCP (Model Context Protocol) server impleme
44

55
## Conformance Test Results
66

7-
**Status: 37 out of 40 tests passing (92.5%)**
7+
**Status: 40 out of 40 tests passing (100%)**
88

99
The server has been validated against the official [MCP conformance test suite](https://github.com/modelcontextprotocol/conformance). See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for detailed results.
1010

@@ -22,9 +22,8 @@ The server has been validated against the official [MCP conformance test suite](
2222
- SEP-1034: Default values for all primitive types
2323
- SEP-1330: All enum schema variants
2424

25-
**Resources** (4/6)
26-
- List, read text/binary, templates
27-
- ⚠️ Subscribe/unsubscribe (SDK limitation)
25+
**Resources** (6/6)
26+
- List, read text/binary, templates, subscribe, unsubscribe
2827

2928
**Prompts** (4/4)
3029
- Simple, parameterized, embedded resources, images
@@ -191,12 +190,7 @@ curl -X POST http://localhost:8080/mcp \
191190

192191
## Known Limitations
193192

194-
See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for details on:
195-
196-
1. **Resource Subscriptions** - Not implemented in Java SDK
197-
2. **DNS Rebinding Protection** - Missing Host/Origin validation
198-
199-
These are SDK-level limitations that require fixes in the core framework.
193+
See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for details on remaining client-side limitations.
200194

201195
## References
202196

mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 81 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
package io.modelcontextprotocol.server;
66

77
import java.time.Duration;
8+
import java.util.Collections;
89
import java.util.HashMap;
910
import java.util.List;
1011
import java.util.Map;
1112
import java.util.Optional;
13+
import java.util.Set;
1214
import java.util.UUID;
1315
import java.util.concurrent.ConcurrentHashMap;
1416
import java.util.concurrent.CopyOnWriteArrayList;
@@ -25,7 +27,6 @@
2527
import io.modelcontextprotocol.spec.McpSchema.CompleteResult.CompleteCompletion;
2628
import io.modelcontextprotocol.spec.McpSchema.ErrorCodes;
2729
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
28-
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
2930
import io.modelcontextprotocol.spec.McpSchema.PromptReference;
3031
import io.modelcontextprotocol.spec.McpSchema.ResourceReference;
3132
import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest;
@@ -111,12 +112,10 @@ public class McpAsyncServer {
111112

112113
private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
113114

114-
// FIXME: this field is deprecated and should be remvoed together with the
115-
// broadcasting loggingNotification.
116-
private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG;
117-
118115
private final ConcurrentHashMap<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new ConcurrentHashMap<>();
119116

117+
private final ConcurrentHashMap<String, Set<String>> resourceSubscriptions = new ConcurrentHashMap<>();
118+
120119
private List<String> protocolVersions;
121120

122121
private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DefaultMcpUriTemplateManagerFactory();
@@ -149,8 +148,11 @@ public class McpAsyncServer {
149148

150149
this.protocolVersions = mcpTransportProvider.protocolVersions();
151150

152-
mcpTransportProvider.setSessionFactory(transport -> new McpServerSession(UUID.randomUUID().toString(),
153-
requestTimeout, transport, this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
151+
mcpTransportProvider.setSessionFactory(transport -> {
152+
String sessionId = UUID.randomUUID().toString();
153+
return new McpServerSession(sessionId, requestTimeout, transport, this::asyncInitializeRequestHandler,
154+
requestHandlers, notificationHandlers, () -> this.cleanupForSession(sessionId));
155+
});
154156
}
155157

156158
McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
@@ -174,8 +176,9 @@ public class McpAsyncServer {
174176

175177
this.protocolVersions = mcpTransportProvider.protocolVersions();
176178

177-
mcpTransportProvider.setSessionFactory(new DefaultMcpStreamableServerSessionFactory(requestTimeout,
178-
this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
179+
mcpTransportProvider.setSessionFactory(
180+
new DefaultMcpStreamableServerSessionFactory(requestTimeout, this::asyncInitializeRequestHandler,
181+
requestHandlers, notificationHandlers, sessionId -> this.cleanupForSession(sessionId)));
179182
}
180183

181184
private Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServerFeatures.Async features) {
@@ -215,6 +218,10 @@ private Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
215218
requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());
216219
requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());
217220
requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());
221+
if (Boolean.TRUE.equals(this.serverCapabilities.resources().subscribe())) {
222+
requestHandlers.put(McpSchema.METHOD_RESOURCES_SUBSCRIBE, resourcesSubscribeRequestHandler());
223+
requestHandlers.put(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, resourcesUnsubscribeRequestHandler());
224+
}
218225
}
219226

220227
// Add prompts API handlers if provider exists
@@ -685,12 +692,73 @@ public Mono<Void> notifyResourcesListChanged() {
685692
}
686693

687694
/**
688-
* Notifies clients that the resources have updated.
689-
* @return A Mono that completes when all clients have been notified
695+
* Notifies only the sessions that have subscribed to the updated resource URI.
696+
* @param resourcesUpdatedNotification the notification containing the updated
697+
* resource URI
698+
* @return A Mono that completes when all subscribed sessions have been notified
690699
*/
691700
public Mono<Void> notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) {
692-
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
693-
resourcesUpdatedNotification);
701+
return Mono.defer(() -> {
702+
String uri = resourcesUpdatedNotification.uri();
703+
Set<String> subscribedSessions = this.resourceSubscriptions.get(uri);
704+
if (subscribedSessions == null || subscribedSessions.isEmpty()) {
705+
logger.debug("No sessions subscribed to resource URI: {}", uri);
706+
return Mono.empty();
707+
}
708+
return Flux.fromIterable(subscribedSessions)
709+
.flatMap(sessionId -> this.mcpTransportProvider
710+
.notifyClient(sessionId, McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
711+
resourcesUpdatedNotification)
712+
.doOnError(e -> logger.error("Failed to notify session {} of resource update for {}", sessionId,
713+
uri, e))
714+
.onErrorComplete())
715+
.then();
716+
});
717+
}
718+
719+
private Mono<Void> cleanupForSession(String sessionId) {
720+
return Mono.fromRunnable(() -> {
721+
removeSessionSubscriptions(sessionId);
722+
});
723+
}
724+
725+
private void removeSessionSubscriptions(String sessionId) {
726+
this.resourceSubscriptions.forEach((uri, sessions) -> sessions.remove(sessionId));
727+
this.resourceSubscriptions.entrySet().removeIf(entry -> entry.getValue().isEmpty());
728+
}
729+
730+
private McpRequestHandler<Object> resourcesSubscribeRequestHandler() {
731+
return (exchange, params) -> Mono.defer(() -> {
732+
McpSchema.SubscribeRequest subscribeRequest = jsonMapper.convertValue(params,
733+
new TypeRef<McpSchema.SubscribeRequest>() {
734+
});
735+
String uri = subscribeRequest.uri();
736+
String sessionId = exchange.sessionId();
737+
this.resourceSubscriptions.computeIfAbsent(uri, k -> Collections.newSetFromMap(new ConcurrentHashMap<>()))
738+
.add(sessionId);
739+
logger.debug("Session {} subscribed to resource URI: {}", sessionId, uri);
740+
741+
return Mono.just(Map.of());
742+
});
743+
}
744+
745+
private McpRequestHandler<Object> resourcesUnsubscribeRequestHandler() {
746+
return (exchange, params) -> Mono.defer(() -> {
747+
McpSchema.UnsubscribeRequest unsubscribeRequest = jsonMapper.convertValue(params,
748+
new TypeRef<McpSchema.UnsubscribeRequest>() {
749+
});
750+
String uri = unsubscribeRequest.uri();
751+
String sessionId = exchange.sessionId();
752+
Set<String> sessions = this.resourceSubscriptions.get(uri);
753+
if (sessions != null) {
754+
sessions.remove(sessionId);
755+
if (sessions.isEmpty()) {
756+
this.resourceSubscriptions.remove(uri, sessions);
757+
}
758+
}
759+
logger.debug("Session {} unsubscribed from resource URI: {}", sessionId, uri);
760+
return Mono.just(Map.of());
761+
});
694762
}
695763

696764
private McpRequestHandler<McpSchema.ListResourcesResult> resourcesListRequestHandler() {
@@ -878,10 +946,6 @@ private McpRequestHandler<Object> setLoggerRequestHandler() {
878946

879947
exchange.setMinLoggingLevel(newMinLoggingLevel.level());
880948

881-
// FIXME: this field is deprecated and should be removed together
882-
// with the broadcasting loggingNotification.
883-
this.minLoggingLevel = newMinLoggingLevel.level();
884-
885949
return Mono.just(Map.of());
886950
});
887951
};

mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,25 @@ public Mono<Void> notifyClients(String method, Object params) {
228228
.then();
229229
}
230230

231+
@Override
232+
public Mono<Void> notifyClient(String sessionId, String method, Object params) {
233+
return Mono.defer(() -> {
234+
// Need to iterate in O(n) because the transport session id
235+
// is different from the server-logical session id (in streamable http this
236+
// design issue was solved)
237+
McpServerSession session = sessions.values()
238+
.stream()
239+
.filter(s -> sessionId.equals(s.getId()))
240+
.findFirst()
241+
.orElse(null);
242+
if (session == null) {
243+
logger.debug("Session {} not found", sessionId);
244+
return Mono.empty();
245+
}
246+
return session.sendNotification(method, params);
247+
});
248+
}
249+
231250
/**
232251
* Handles GET requests to establish SSE connections.
233252
* <p>

mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,18 @@ public Mono<Void> notifyClients(String method, Object params) {
206206
});
207207
}
208208

209+
@Override
210+
public Mono<Void> notifyClient(String sessionId, String method, Object params) {
211+
return Mono.defer(() -> {
212+
McpStreamableServerSession session = this.sessions.get(sessionId);
213+
if (session == null) {
214+
logger.debug("Session {} not found", sessionId);
215+
return Mono.empty();
216+
}
217+
return session.sendNotification(method, params);
218+
});
219+
}
220+
209221
/**
210222
* Initiates a graceful shutdown of the transport.
211223
* @return A Mono that completes when all cleanup operations are finished

mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,26 @@ public void setSessionFactory(McpServerSession.Factory sessionFactory) {
9898
@Override
9999
public Mono<Void> notifyClients(String method, Object params) {
100100
if (this.session == null) {
101-
return Mono.error(new IllegalStateException("No session to close"));
101+
return Mono.error(new IllegalStateException("No session to notify"));
102102
}
103103
return this.session.sendNotification(method, params)
104104
.doOnError(e -> logger.error("Failed to send notification: {}", e.getMessage()));
105105
}
106106

107+
@Override
108+
public Mono<Void> notifyClient(String sessionId, String method, Object params) {
109+
return Mono.defer(() -> {
110+
if (this.session == null) {
111+
return Mono.error(new IllegalStateException("No session to notify"));
112+
}
113+
if (!this.session.getId().equals(sessionId)) {
114+
return Mono.error(new IllegalStateException("Existing session id " + this.session.getId()
115+
+ " doesn't match the notification target: " + sessionId));
116+
}
117+
return this.session.sendNotification(method, params);
118+
});
119+
}
120+
107121
@Override
108122
public Mono<Void> closeGracefully() {
109123
if (this.session == null) {

0 commit comments

Comments
 (0)