Skip to content

Commit f459be7

Browse files
committed
feat: Adding support for SSE on the WebClient
Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent 6cadf00 commit f459be7

File tree

11 files changed

+1447
-11
lines changed

11 files changed

+1447
-11
lines changed

vertx-web-client/src/main/asciidoc/index.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,13 @@ that decode the response to a specific type
372372

373373
WARNING: this is only valid for the response decoded as a buffer.
374374

375+
=== Server-Sent Events
376+
377+
Using a specific body codec: {@link io.vertx.ext.web.codec.impl.SseBodyCodec}, you can decode a Server-Sent Events stream into a list of events:
378+
[source,$lang]
379+
----
380+
{@link examples.WebClientExamples#receiveResponseAsServerSentEvents}
381+
----
375382
[[http-response-expectations]]
376383
=== Response expectations
377384

vertx-web-client/src/main/java/examples/WebClientExamples.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import io.vertx.uritemplate.UriTemplate;
4343

4444
import java.util.HashMap;
45-
import java.util.List;
4645
import java.util.Map;
4746

4847
/**
@@ -761,4 +760,16 @@ public static void clientSideConsistentHashing(Vertx vertx, int servicePort) {
761760

762761
server.listen(servicePort);
763762
}
763+
764+
public static void receiveResponseAsServerSentEvents(Vertx vertx, int servicePort) {
765+
WebClient client = WebClient.create(vertx, new WebClientOptions().setDefaultPort(servicePort).setDefaultHost("localhost"));
766+
client.get("/basic?count=5").as(BodyCodec.sseStream(stream -> {
767+
stream.handler(v -> System.out.println("Event received " + v));
768+
stream.endHandler(v -> System.out.println("End of stream " + v));
769+
})).send().expecting(HttpResponseExpectation.SC_OK)
770+
.onSuccess(res ->
771+
System.out.println("Received response with status code" + res.statusCode()))
772+
.onFailure(err ->
773+
System.out.println("Something went wrong " + err.getMessage()));
774+
}
764775
}
Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
package io.vertx.ext.web.client.tests;
2+
3+
import io.vertx.core.Handler;
4+
import io.vertx.core.Vertx;
5+
import io.vertx.core.http.HttpServer;
6+
import io.vertx.ext.unit.Async;
7+
import io.vertx.ext.unit.TestContext;
8+
import io.vertx.ext.unit.junit.VertxUnitRunner;
9+
import io.vertx.ext.web.client.WebClient;
10+
import io.vertx.ext.web.client.WebClientOptions;
11+
import io.vertx.ext.web.codec.BodyCodec;
12+
import io.vertx.ext.web.codec.SseEvent;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
import org.junit.After;
17+
import org.junit.Before;
18+
import org.junit.Test;
19+
import org.junit.runner.RunWith;
20+
21+
@RunWith(VertxUnitRunner.class)
22+
public class SseClientTest {
23+
24+
private Vertx vertx;
25+
private WebClient client;
26+
private HttpServer server;
27+
28+
@Before
29+
public void setup(TestContext tc) {
30+
vertx = Vertx.vertx();
31+
client = WebClient.create(vertx, new WebClientOptions().setDefaultPort(8080).setDefaultHost("localhost"));
32+
33+
server = vertx.createHttpServer().requestHandler(req -> {
34+
String path = req.path();
35+
req.response().setChunked(true);
36+
37+
// set headers
38+
req.response().headers().add("Content-Type", "text/event-stream;charset=UTF-8");
39+
req.response().headers().add("Connection", "keep-alive");
40+
req.response().headers().add("Cache-Control", "no-cache");
41+
req.response().headers().add("Access-Control-Allow-Origin", "*");
42+
43+
if (null != path) switch (path) {
44+
case "/basic":
45+
int count = Integer.parseInt(req.getParam("count"));
46+
vertx.setPeriodic(50, new Handler<Long>() {
47+
private int index = 0;
48+
49+
@Override
50+
public void handle(Long timerId) {
51+
if (index < count) {
52+
String event = String.format("event: event%d\ndata: data%d\nid: %d\n\n", index, index, index);
53+
index++;
54+
req.response().write(event);
55+
} else {
56+
vertx.cancelTimer(timerId);
57+
req.response().end();
58+
}
59+
}
60+
});
61+
break;
62+
case "/multiline-data":
63+
req.response().write("data: line1\ndata: line2\ndata: line3\n\n");
64+
req.response().end();
65+
break;
66+
case "/comments":
67+
req.response().write(": this is a comment\ndata: test data\n\n");
68+
req.response().end();
69+
break;
70+
case "/retry":
71+
req.response().write("retry: 5000\ndata: test\n\n");
72+
req.response().end();
73+
break;
74+
case "/no-event-type":
75+
req.response().write("data: message without event type\n\n");
76+
req.response().end();
77+
break;
78+
case "/burst":
79+
// Send many events quickly to test backpressure
80+
count = Integer.parseInt(req.getParam("count"));
81+
for (int i = 0; i < count; i++) {
82+
String event = String.format("data: burst%d\n\n", i);
83+
req.response().write(event);
84+
}
85+
req.response().end();
86+
break;
87+
case "/slow":{
88+
// Send events slowly to test pause/resume
89+
count = Integer.parseInt(req.getParam("count"));
90+
vertx.setPeriodic(200, new Handler<Long>() {
91+
private int index = 0;
92+
93+
@Override
94+
public void handle(Long timerId) {
95+
if (index < count) {
96+
String event = String.format("data: slow%d\n\n", index);
97+
index++;
98+
req.response().write(event);
99+
} else {
100+
vertx.cancelTimer(timerId);
101+
req.response().end();
102+
}
103+
}
104+
}); break;
105+
}
106+
case "/invalid-retry":
107+
req.response().write("retry: not-a-number\ndata: test\n\n");
108+
req.response().end();
109+
break;
110+
default:
111+
break;
112+
}
113+
});
114+
115+
server.listen(8080).onComplete(tc.asyncAssertSuccess());
116+
}
117+
118+
@Test(timeout = 10000)
119+
public void testGetSseEvents(TestContext tc) throws Exception {
120+
Async async = tc.async();
121+
final List<SseEvent> events = new ArrayList<>();
122+
123+
client.get("/basic?count=5").as(BodyCodec.sseStream(stream -> {
124+
stream.handler(events::add);
125+
stream.endHandler(v -> {
126+
tc.assertEquals(5, events.size());
127+
for (int i = 0; i < 5; i++) {
128+
tc.assertEquals("event" + i, events.get(i).event());
129+
tc.assertEquals("data" + i, events.get(i).data());
130+
tc.assertEquals(String.valueOf(i), events.get(i).id());
131+
}
132+
async.complete();
133+
});
134+
})).send().onFailure(tc::fail);
135+
}
136+
137+
@Test(timeout = 10000)
138+
public void testMultilineData(TestContext tc) throws Exception {
139+
Async async = tc.async();
140+
final List<SseEvent> events = new ArrayList<>();
141+
142+
client.get("/multiline-data").as(BodyCodec.sseStream(stream -> {
143+
stream.handler(events::add);
144+
stream.endHandler(v -> {
145+
tc.assertEquals(1, events.size());
146+
// Per SSE spec, multi-line data should be joined by newlines
147+
tc.assertEquals("line1\nline2\nline3", events.get(0).data());
148+
async.complete();
149+
});
150+
})).send().onFailure(tc::fail);
151+
}
152+
153+
@Test(timeout = 10000)
154+
public void testComments(TestContext tc) throws Exception {
155+
Async async = tc.async();
156+
final List<SseEvent> events = new ArrayList<>();
157+
158+
client.get("/comments").as(BodyCodec.sseStream(stream -> {
159+
stream.handler(events::add);
160+
stream.endHandler(v -> {
161+
tc.assertEquals(1, events.size());
162+
tc.assertEquals("test data", events.get(0).data());
163+
async.complete();
164+
});
165+
})).send().onFailure(tc::fail);
166+
}
167+
168+
@Test(timeout = 10000)
169+
public void testRetryField(TestContext tc) throws Exception {
170+
Async async = tc.async();
171+
final List<SseEvent> events = new ArrayList<>();
172+
173+
client.get("/retry").as(BodyCodec.sseStream(stream -> {
174+
stream.handler(events::add);
175+
stream.endHandler(v -> {
176+
tc.assertEquals(1, events.size());
177+
tc.assertEquals("test", events.get(0).data());
178+
tc.assertEquals(5000, events.get(0).retry());
179+
async.complete();
180+
});
181+
})).send().onFailure(tc::fail);
182+
}
183+
184+
@Test(timeout = 10000)
185+
public void testNoEventType(TestContext tc) throws Exception {
186+
Async async = tc.async();
187+
final List<SseEvent> events = new ArrayList<>();
188+
189+
client.get("/no-event-type").as(BodyCodec.sseStream(stream -> {
190+
stream.handler(events::add);
191+
stream.endHandler(v -> {
192+
tc.assertEquals(1, events.size());
193+
tc.assertEquals("message without event type", events.get(0).data());
194+
// Per SSE spec, the default event type is "message".
195+
// This implementation uses null. This test verifies the implementation's behavior.
196+
tc.assertEquals("message", events.get(0).event());
197+
async.complete();
198+
});
199+
})).send().onFailure(tc::fail);
200+
}
201+
202+
@Test(timeout = 10000)
203+
public void testBurstEvents(TestContext tc) throws Exception {
204+
Async async = tc.async();
205+
final List<SseEvent> events = new ArrayList<>();
206+
207+
client.get("/burst?count=100").as(BodyCodec.sseStream(stream -> {
208+
stream.handler(events::add);
209+
stream.endHandler(v -> {
210+
tc.assertEquals(100, events.size());
211+
for (int i = 0; i < 100; i++) {
212+
tc.assertEquals("burst" + i, events.get(i).data());
213+
}
214+
async.complete();
215+
});
216+
})).send().onFailure(tc::fail);
217+
}
218+
219+
@Test(timeout = 10000)
220+
public void testPauseResume(TestContext tc) throws Exception {
221+
Async async = tc.async();
222+
final List<SseEvent> events = new ArrayList<>();
223+
final AtomicInteger pauseCount = new AtomicInteger(0);
224+
225+
client.get("/basic?count=10").as(BodyCodec.sseStream(stream -> {
226+
stream.handler(event -> {
227+
events.add(event);
228+
// Pause after every 3 events
229+
if (events.size() % 3 == 0 && pauseCount.get() < 2) {
230+
stream.pause();
231+
pauseCount.incrementAndGet();
232+
// Resume after a short delay
233+
vertx.setTimer(100, id -> stream.resume());
234+
}
235+
});
236+
stream.endHandler(v -> {
237+
tc.assertEquals(10, events.size());
238+
tc.assertTrue(pauseCount.get() >= 2, "Stream should have been paused at least twice");
239+
async.complete();
240+
});
241+
})).send().onFailure(tc::fail);
242+
}
243+
244+
@Test(timeout = 10000)
245+
public void testFetch(TestContext tc) throws Exception {
246+
Async async = tc.async();
247+
final List<SseEvent> events = new ArrayList<>();
248+
final AtomicInteger fetchCount = new AtomicInteger(0);
249+
250+
client.get("/basic?count=10").as(BodyCodec.sseStream(stream -> {
251+
stream.pause(); // Start paused
252+
stream.handler(event -> {
253+
events.add(event);
254+
fetchCount.incrementAndGet();
255+
// Only fetch 3 events total
256+
if (fetchCount.get() < 3) {
257+
stream.fetch(1);
258+
} else {
259+
// After receiving 3 events, complete the test
260+
vertx.setTimer(500, id -> {
261+
tc.assertEquals(3, events.size());
262+
async.complete();
263+
});
264+
}
265+
});
266+
stream.endHandler(v -> {
267+
// End handler may not be called if we don't fetch all events
268+
});
269+
// Kick off by fetching the first event
270+
stream.fetch(1);
271+
})).send().onFailure(tc::fail);
272+
}
273+
274+
@Test(timeout = 15000)
275+
public void testBackpressure(TestContext tc) throws Exception {
276+
Async async = tc.async();
277+
final List<SseEvent> events = new ArrayList<>();
278+
final List<Long> timestamps = new ArrayList<>();
279+
280+
client.get("/slow?count=5").as(BodyCodec.sseStream(stream -> {
281+
stream.handler(event -> {
282+
timestamps.add(System.currentTimeMillis());
283+
events.add(event);
284+
// Simulate slow processing by pausing briefly
285+
if (events.size() < 5) {
286+
stream.pause();
287+
vertx.setTimer(50, id -> stream.resume());
288+
}
289+
});
290+
stream.endHandler(v -> {
291+
tc.assertEquals(5, events.size());
292+
// Verify events were received over time (not all at once)
293+
long totalTime = timestamps.get(timestamps.size() - 1) - timestamps.get(0);
294+
tc.assertTrue(totalTime >= 750, "Events should be spread over time due to backpressure. Total time was " + totalTime);
295+
async.complete();
296+
});
297+
})).send().onFailure(tc::fail);
298+
}
299+
300+
@Test(timeout = 10000)
301+
public void testExceptionHandler(TestContext tc) throws Exception {
302+
Async async = tc.async();
303+
final List<Throwable> exceptions = new ArrayList<>();
304+
305+
client.get("/invalid-retry").as(BodyCodec.sseStream(stream -> {
306+
stream.handler(event -> {
307+
// This might or might not be called depending on when the parser fails
308+
});
309+
stream.exceptionHandler(exceptions::add);
310+
stream.endHandler(v -> {
311+
tc.assertEquals(1, exceptions.size(), "Expected one exception");
312+
tc.assertTrue(exceptions.get(0) instanceof RuntimeException, "Expected a RuntimeException");
313+
tc.assertTrue(exceptions.get(0).getMessage().contains("Invalid \"retry\" value"));
314+
tc.assertNotNull(exceptions.get(0).getCause(), "Expected a cause for the exception");
315+
tc.assertTrue(exceptions.get(0).getCause() instanceof NumberFormatException, "Expected cause to be a NumberFormatException");
316+
async.complete();
317+
});
318+
})).send().onFailure(tc::fail);
319+
}
320+
321+
@After
322+
public void close(TestContext tc) {
323+
if (server != null) {
324+
server.close();
325+
}
326+
if (client != null) {
327+
client.close();
328+
}
329+
if (vertx != null) {
330+
vertx.close().onComplete(tc.asyncAssertSuccess());
331+
}
332+
}
333+
334+
}

vertx-web-common/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,18 @@
1313

1414
<artifactId>vertx-web-common</artifactId>
1515

16+
<dependencies>
17+
<!-- Testing -->
18+
<dependency>
19+
<groupId>io.vertx</groupId>
20+
<artifactId>vertx-unit</artifactId>
21+
<scope>test</scope>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.hamcrest</groupId>
25+
<artifactId>hamcrest-core</artifactId>
26+
<version>2.2</version>
27+
<scope>test</scope>
28+
</dependency>
29+
</dependencies>
1630
</project>

0 commit comments

Comments
 (0)