Skip to content

Commit 42fd22f

Browse files
committed
fix: Add an exception handling function to WebFluxSseClientTransport
- `sseErrorHandler` is used to handle errors when processing SSE events and situations where an unrecognized event type is received. - Add unit tests to verify. Signed-off-by: YunKui Lu <[email protected]>
1 parent e610d85 commit 42fd22f

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import java.io.IOException;
77
import java.util.function.BiConsumer;
8+
import java.util.function.BiFunction;
89
import java.util.function.Function;
910

1011
import com.fasterxml.jackson.core.type.TypeReference;
@@ -123,6 +124,16 @@ public class WebFluxSseClientTransport implements McpClientTransport {
123124
*/
124125
private String sseEndpoint;
125126

127+
/**
128+
* Handle exceptions that occur during the processing of SSE events to avoid
129+
* connection interruption.
130+
*/
131+
private BiFunction<Throwable, ServerSentEvent<String>, Mono<? extends JSONRPCMessage>> sseErrorHandler = (error,
132+
event) -> {
133+
logger.warn("Failed to handle SSE event {}", event, error);
134+
return Mono.empty();
135+
};
136+
126137
/**
127138
* Constructs a new SseClientTransport with the specified WebClient builder. Uses a
128139
* default ObjectMapper instance for JSON processing.
@@ -215,12 +226,26 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
215226
else {
216227
s.error(new McpError("Received unrecognized SSE event type: " + event.event()));
217228
}
218-
}).transform(handler)).subscribe();
229+
}).onErrorResume(e -> sseErrorHandler.apply(e, event)).transform(handler)).subscribe();
219230

220231
// The connection is established once the server sends the endpoint event
221232
return messageEndpointSink.asMono().then();
222233
}
223234

235+
/**
236+
* Sets the handler for processing transport-level errors.
237+
*
238+
* <p>
239+
* The provided handler will be called when errors occur during transport operations,
240+
* such as connection failures or protocol violations.
241+
* </p>
242+
* @param errorHandler a consumer that processes error messages
243+
*/
244+
public void setSseErrorHandler(
245+
BiFunction<Throwable, ServerSentEvent<String>, Mono<? extends JSONRPCMessage>> errorHandler) {
246+
this.sseErrorHandler = errorHandler;
247+
}
248+
224249
/**
225250
* Sends a JSON-RPC message to the server using the endpoint provided during
226251
* connection.

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66

77
import java.time.Duration;
88
import java.util.Map;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.TimeUnit;
911
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.concurrent.atomic.AtomicReference;
1013
import java.util.function.Function;
1114

1215
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -86,6 +89,11 @@ public void simulateMessageEvent(String jsonMessage) {
8689
inboundMessageCount.incrementAndGet();
8790
}
8891

92+
public void simulateComment(String comment) {
93+
events.tryEmitNext(ServerSentEvent.<String>builder().comment(comment).build());
94+
inboundMessageCount.incrementAndGet();
95+
}
96+
8997
}
9098

9199
void startContainer() {
@@ -338,4 +346,26 @@ void testMessageOrderPreservation() {
338346
assertThat(transport.getInboundMessageCount()).isEqualTo(3);
339347
}
340348

349+
@Test
350+
void customErrorHandlerShouldReceiveErrors() throws InterruptedException {
351+
AtomicReference<ServerSentEvent<String>> receivedErrorEvent = new AtomicReference<>();
352+
AtomicReference<Throwable> handledError = new AtomicReference<>();
353+
354+
transport.setSseErrorHandler((error, event) -> {
355+
receivedErrorEvent.set(event);
356+
handledError.set(error);
357+
return Mono.empty();
358+
});
359+
360+
// Mock receive a common message `: This is a comment.\n\n`
361+
transport.simulateComment("This is a comment.");
362+
363+
assertThat(receivedErrorEvent.get().comment()).isNotNull().isEqualTo("This is a comment.");
364+
365+
// Mock receive a common message `:ping - 2025-05-06 08:42:06.508759+00:00\n\n`
366+
transport.simulateComment("ping - 2025-05-06 08:42:06.508759+00:00");
367+
368+
assertThat(receivedErrorEvent.get().comment()).isNotNull().isEqualTo("ping - 2025-05-06 08:42:06.508759+00:00");
369+
}
370+
341371
}

0 commit comments

Comments
 (0)