Skip to content

Commit fe7c62e

Browse files
committed
feat: Adding support for SSE on the WebClient
Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent 454bbaf commit fe7c62e

File tree

7 files changed

+1219
-0
lines changed

7 files changed

+1219
-0
lines changed
Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
package io.vertx.ext.web.client.tests;
2+
3+
import io.vertx.core.Handler;
4+
import io.vertx.core.Vertx;
5+
import io.vertx.core.http.HttpServer;
6+
import io.vertx.ext.unit.Async;
7+
import io.vertx.ext.unit.TestContext;
8+
import io.vertx.ext.unit.junit.VertxUnitRunner;
9+
import io.vertx.ext.web.client.WebClient;
10+
import io.vertx.ext.web.client.WebClientOptions;
11+
import io.vertx.ext.web.codec.sse.SseBodyCodec;
12+
import io.vertx.ext.web.codec.sse.SseEvent;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
import org.junit.After;
17+
import org.junit.Before;
18+
import org.junit.Test;
19+
import org.junit.runner.RunWith;
20+
21+
@RunWith(VertxUnitRunner.class)
22+
public class SseClientTest {
23+
24+
private Vertx vertx;
25+
private WebClient client;
26+
private HttpServer server;
27+
28+
@Before
29+
public void setup(TestContext tc) {
30+
vertx = Vertx.vertx();
31+
client = WebClient.create(vertx, new WebClientOptions().setDefaultPort(8080).setDefaultHost("localhost"));
32+
33+
server = vertx.createHttpServer().requestHandler(req -> {
34+
String path = req.path();
35+
req.response().setChunked(true);
36+
37+
// set headers
38+
req.response().headers().add("Content-Type", "text/event-stream;charset=UTF-8");
39+
req.response().headers().add("Connection", "keep-alive");
40+
req.response().headers().add("Cache-Control", "no-cache");
41+
req.response().headers().add("Access-Control-Allow-Origin", "*");
42+
43+
if (null != path) switch (path) {
44+
case "/basic":{
45+
int count = Integer.parseInt(req.getParam("count"));
46+
vertx.setPeriodic(50, new Handler<Long>() {
47+
private int index = 0;
48+
49+
@Override
50+
public void handle(Long timerId) {
51+
if (index < count) {
52+
String event = String.format("event: event%d\ndata: data%d\nid: %d\n\n", index, index, index);
53+
index++;
54+
req.response().write(event);
55+
} else {
56+
vertx.cancelTimer(timerId);
57+
req.response().end();
58+
}
59+
}
60+
}); break;
61+
}
62+
case "/multiline-data":
63+
req.response().write("data: line1\ndata: line2\ndata: line3\n\n");
64+
req.response().end();
65+
break;
66+
case "/comments":
67+
req.response().write(": this is a comment\ndata: test data\n\n");
68+
req.response().end();
69+
break;
70+
case "/retry":
71+
req.response().write("retry: 5000\ndata: test\n\n");
72+
req.response().end();
73+
break;
74+
case "/no-event-type":
75+
req.response().write("data: message without event type\n\n");
76+
req.response().end();
77+
break;
78+
case "/burst":{
79+
// Send many events quickly to test backpressure
80+
int count = Integer.parseInt(req.getParam("count"));
81+
for (int i = 0; i < count; i++) {
82+
String event = String.format("data: burst%d\n\n", i);
83+
req.response().write(event);
84+
} req.response().end();
85+
break;
86+
}
87+
case "/slow":{
88+
// Send events slowly to test pause/resume
89+
int count = Integer.parseInt(req.getParam("count"));
90+
vertx.setPeriodic(200, new Handler<Long>() {
91+
private int index = 0;
92+
93+
@Override
94+
public void handle(Long timerId) {
95+
if (index < count) {
96+
String event = String.format("data: slow%d\n\n", index);
97+
index++;
98+
req.response().write(event);
99+
} else {
100+
vertx.cancelTimer(timerId);
101+
req.response().end();
102+
}
103+
}
104+
}); break;
105+
}
106+
case "/invalid-retry":
107+
req.response().write("retry: not-a-number\ndata: test\n\n");
108+
req.response().end();
109+
break;
110+
default:
111+
break;
112+
}
113+
});
114+
115+
server.listen(8080).onComplete(tc.asyncAssertSuccess());
116+
}
117+
118+
@Test(timeout = 10000)
119+
public void testGetSseEvents(TestContext tc) throws Exception {
120+
Async async = tc.async();
121+
final List<SseEvent> events = new ArrayList<>();
122+
123+
client.get("/basic?count=5").as(SseBodyCodec.sseStream(stream -> {
124+
stream.handler(events::add);
125+
stream.endHandler(v -> {
126+
tc.assertEquals(5, events.size());
127+
for (int i = 0; i < 5; i++) {
128+
tc.assertEquals("event" + i, events.get(i).event());
129+
tc.assertEquals("data" + i, events.get(i).data());
130+
tc.assertEquals(String.valueOf(i), events.get(i).id());
131+
}
132+
async.complete();
133+
});
134+
})).send().onFailure(tc::fail);
135+
}
136+
137+
@Test(timeout = 10000)
138+
public void testMultilineData(TestContext tc) throws Exception {
139+
Async async = tc.async();
140+
final List<SseEvent> events = new ArrayList<>();
141+
142+
client.get("/multiline-data").as(SseBodyCodec.sseStream(stream -> {
143+
stream.handler(events::add);
144+
stream.endHandler(v -> {
145+
tc.assertEquals(1, events.size());
146+
// TODO: Per SSE spec, multi-line data should be joined by newlines (i.e., "line1\nline2\nline3").
147+
// The current implementation concatenates them without a separator.
148+
// This test verifies the current behavior.
149+
tc.assertEquals("line1line2line3", events.get(0).data());
150+
async.complete();
151+
});
152+
})).send().onFailure(tc::fail);
153+
}
154+
155+
@Test(timeout = 10000)
156+
public void testComments(TestContext tc) throws Exception {
157+
Async async = tc.async();
158+
final List<SseEvent> events = new ArrayList<>();
159+
160+
client.get("/comments").as(SseBodyCodec.sseStream(stream -> {
161+
stream.handler(events::add);
162+
stream.endHandler(v -> {
163+
tc.assertEquals(1, events.size());
164+
tc.assertEquals("test data", events.get(0).data());
165+
async.complete();
166+
});
167+
})).send().onFailure(tc::fail);
168+
}
169+
170+
@Test(timeout = 10000)
171+
public void testRetryField(TestContext tc) throws Exception {
172+
Async async = tc.async();
173+
final List<SseEvent> events = new ArrayList<>();
174+
175+
client.get("/retry").as(SseBodyCodec.sseStream(stream -> {
176+
stream.handler(events::add);
177+
stream.endHandler(v -> {
178+
tc.assertEquals(1, events.size());
179+
tc.assertEquals("test", events.get(0).data());
180+
tc.assertEquals(5000, events.get(0).retry());
181+
async.complete();
182+
});
183+
})).send().onFailure(tc::fail);
184+
}
185+
186+
@Test(timeout = 10000)
187+
public void testNoEventType(TestContext tc) throws Exception {
188+
Async async = tc.async();
189+
final List<SseEvent> events = new ArrayList<>();
190+
191+
client.get("/no-event-type").as(SseBodyCodec.sseStream(stream -> {
192+
stream.handler(events::add);
193+
stream.endHandler(v -> {
194+
tc.assertEquals(1, events.size());
195+
tc.assertEquals("message without event type", events.get(0).data());
196+
// Per SSE spec, the default event type is "message".
197+
// This implementation uses null. This test verifies the implementation's behavior.
198+
tc.assertEquals("message", events.get(0).event());
199+
async.complete();
200+
});
201+
})).send().onFailure(tc::fail);
202+
}
203+
204+
@Test(timeout = 10000)
205+
public void testBurstEvents(TestContext tc) throws Exception {
206+
Async async = tc.async();
207+
final List<SseEvent> events = new ArrayList<>();
208+
209+
client.get("/burst?count=100").as(SseBodyCodec.sseStream(stream -> {
210+
stream.handler(events::add);
211+
stream.endHandler(v -> {
212+
tc.assertEquals(100, events.size());
213+
for (int i = 0; i < 100; i++) {
214+
tc.assertEquals("burst" + i, events.get(i).data());
215+
}
216+
async.complete();
217+
});
218+
})).send().onFailure(tc::fail);
219+
}
220+
221+
@Test(timeout = 10000)
222+
public void testPauseResume(TestContext tc) throws Exception {
223+
Async async = tc.async();
224+
final List<SseEvent> events = new ArrayList<>();
225+
final AtomicInteger pauseCount = new AtomicInteger(0);
226+
227+
client.get("/basic?count=10").as(SseBodyCodec.sseStream(stream -> {
228+
stream.handler(event -> {
229+
events.add(event);
230+
// Pause after every 3 events
231+
if (events.size() % 3 == 0 && pauseCount.get() < 2) {
232+
stream.pause();
233+
pauseCount.incrementAndGet();
234+
// Resume after a short delay
235+
vertx.setTimer(100, id -> stream.resume());
236+
}
237+
});
238+
stream.endHandler(v -> {
239+
tc.assertEquals(10, events.size());
240+
tc.assertTrue(pauseCount.get() >= 2, "Stream should have been paused at least twice");
241+
async.complete();
242+
});
243+
})).send().onFailure(tc::fail);
244+
}
245+
246+
@Test(timeout = 10000)
247+
public void testFetch(TestContext tc) throws Exception {
248+
Async async = tc.async();
249+
final List<SseEvent> events = new ArrayList<>();
250+
final AtomicInteger fetchCount = new AtomicInteger(0);
251+
252+
client.get("/basic?count=10").as(SseBodyCodec.sseStream(stream -> {
253+
stream.pause(); // Start paused
254+
stream.handler(event -> {
255+
events.add(event);
256+
fetchCount.incrementAndGet();
257+
// Only fetch 3 events total
258+
if (fetchCount.get() < 3) {
259+
stream.fetch(1);
260+
} else {
261+
// After receiving 3 events, complete the test
262+
vertx.setTimer(500, id -> {
263+
tc.assertEquals(3, events.size());
264+
async.complete();
265+
});
266+
}
267+
});
268+
stream.endHandler(v -> {
269+
// End handler may not be called if we don't fetch all events
270+
});
271+
// Kick off by fetching the first event
272+
stream.fetch(1);
273+
})).send().onFailure(tc::fail);
274+
}
275+
276+
@Test(timeout = 15000)
277+
public void testBackpressure(TestContext tc) throws Exception {
278+
Async async = tc.async();
279+
final List<SseEvent> events = new ArrayList<>();
280+
final List<Long> timestamps = new ArrayList<>();
281+
282+
client.get("/slow?count=5").as(SseBodyCodec.sseStream(stream -> {
283+
stream.handler(event -> {
284+
timestamps.add(System.currentTimeMillis());
285+
events.add(event);
286+
// Simulate slow processing by pausing briefly
287+
if (events.size() < 5) {
288+
stream.pause();
289+
vertx.setTimer(50, id -> stream.resume());
290+
}
291+
});
292+
stream.endHandler(v -> {
293+
tc.assertEquals(5, events.size());
294+
// Verify events were received over time (not all at once)
295+
long totalTime = timestamps.get(timestamps.size() - 1) - timestamps.get(0);
296+
tc.assertTrue(totalTime >= 750, "Events should be spread over time due to backpressure. Total time was " + totalTime);
297+
async.complete();
298+
});
299+
})).send().onFailure(tc::fail);
300+
}
301+
302+
@Test(timeout = 10000)
303+
public void testExceptionHandler(TestContext tc) throws Exception {
304+
Async async = tc.async();
305+
final List<Throwable> exceptions = new ArrayList<>();
306+
307+
client.get("/invalid-retry").as(SseBodyCodec.sseStream(stream -> {
308+
stream.handler(event -> {
309+
// This might or might not be called depending on when the parser fails
310+
});
311+
stream.exceptionHandler(exceptions::add);
312+
stream.endHandler(v -> {
313+
tc.assertEquals(1, exceptions.size(), "Expected one exception");
314+
tc.assertTrue(exceptions.get(0) instanceof RuntimeException, "Expected a RuntimeException");
315+
tc.assertTrue(exceptions.get(0).getMessage().contains("Invalid \"retry\" value"));
316+
tc.assertNotNull(exceptions.get(0).getCause(), "Expected a cause for the exception");
317+
tc.assertTrue(exceptions.get(0).getCause() instanceof NumberFormatException, "Expected cause to be a NumberFormatException");
318+
async.complete();
319+
});
320+
})).send().onFailure(tc::fail);
321+
}
322+
323+
@After
324+
public void close(TestContext tc) {
325+
if (server != null) {
326+
server.close();
327+
}
328+
if (client != null) {
329+
client.close();
330+
}
331+
if (vertx != null) {
332+
vertx.close().onComplete(tc.asyncAssertSuccess());
333+
}
334+
}
335+
336+
}

vertx-web-common/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,18 @@
1313

1414
<artifactId>vertx-web-common</artifactId>
1515

16+
<dependencies>
17+
<!-- Testing -->
18+
<dependency>
19+
<groupId>io.vertx</groupId>
20+
<artifactId>vertx-unit</artifactId>
21+
<scope>test</scope>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.hamcrest</groupId>
25+
<artifactId>hamcrest-core</artifactId>
26+
<version>2.2</version>
27+
<scope>test</scope>
28+
</dependency>
29+
</dependencies>
1630
</project>

0 commit comments

Comments
 (0)