Skip to content

Commit b232c03

Browse files
committed
feat: Adding support for SSE on the WebClient
Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent 1faaac8 commit b232c03

File tree

7 files changed

+1188
-0
lines changed

7 files changed

+1188
-0
lines changed
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
package io.vertx.ext.web.client.tests;
2+
3+
import io.vertx.core.Handler;
4+
import io.vertx.core.Vertx;
5+
import io.vertx.core.http.HttpServer;
6+
import io.vertx.ext.unit.Async;
7+
import io.vertx.ext.unit.TestContext;
8+
import io.vertx.ext.unit.junit.VertxUnitRunner;
9+
import io.vertx.ext.web.client.WebClient;
10+
import io.vertx.ext.web.client.WebClientOptions;
11+
import io.vertx.ext.web.codec.sse.SseBodyCodec;
12+
import io.vertx.ext.web.codec.sse.SseEvent;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
import org.junit.After;
17+
import org.junit.Before;
18+
import org.junit.Test;
19+
import org.junit.runner.RunWith;
20+
21+
@RunWith(VertxUnitRunner.class)
22+
public class SseClientTest {
23+
24+
private Vertx vertx;
25+
private WebClient client;
26+
private HttpServer server;
27+
28+
@Before
29+
public void setup(TestContext tc) {
30+
vertx = Vertx.vertx();
31+
client = WebClient.create(vertx, new WebClientOptions().setDefaultPort(8080).setDefaultHost("localhost"));
32+
33+
server = vertx.createHttpServer().requestHandler(req -> {
34+
String path = req.path();
35+
req.response().setChunked(true);
36+
37+
// set headers
38+
req.response().headers().add("Content-Type", "text/event-stream;charset=UTF-8");
39+
req.response().headers().add("Connection", "keep-alive");
40+
req.response().headers().add("Cache-Control", "no-cache");
41+
req.response().headers().add("Access-Control-Allow-Origin", "*");
42+
43+
if ("/basic".equals(path)) {
44+
int count = Integer.parseInt(req.getParam("count"));
45+
vertx.setPeriodic(50, new Handler<Long>() {
46+
private int index = 0;
47+
48+
@Override
49+
public void handle(Long timerId) {
50+
if (index < count) {
51+
String event = String.format("event: event%d\ndata: data%d\nid: %d\n\n", index, index, index);
52+
index++;
53+
req.response().write(event);
54+
} else {
55+
vertx.cancelTimer(timerId);
56+
req.response().end();
57+
}
58+
}
59+
});
60+
} else if ("/multiline-data".equals(path)) {
61+
req.response().write("data: line1\ndata: line2\ndata: line3\n\n");
62+
req.response().end();
63+
} else if ("/comments".equals(path)) {
64+
req.response().write(": this is a comment\ndata: test data\n\n");
65+
req.response().end();
66+
} else if ("/retry".equals(path)) {
67+
req.response().write("retry: 5000\ndata: test\n\n");
68+
req.response().end();
69+
} else if ("/no-event-type".equals(path)) {
70+
req.response().write("data: message without event type\n\n");
71+
req.response().end();
72+
} else if ("/burst".equals(path)) {
73+
// Send many events quickly to test backpressure
74+
int count = Integer.parseInt(req.getParam("count"));
75+
for (int i = 0; i < count; i++) {
76+
String event = String.format("data: burst%d\n\n", i);
77+
req.response().write(event);
78+
}
79+
req.response().end();
80+
} else if ("/slow".equals(path)) {
81+
// Send events slowly to test pause/resume
82+
int count = Integer.parseInt(req.getParam("count"));
83+
vertx.setPeriodic(200, new Handler<Long>() {
84+
private int index = 0;
85+
86+
@Override
87+
public void handle(Long timerId) {
88+
if (index < count) {
89+
String event = String.format("data: slow%d\n\n", index);
90+
index++;
91+
req.response().write(event);
92+
} else {
93+
vertx.cancelTimer(timerId);
94+
req.response().end();
95+
}
96+
}
97+
});
98+
} else if ("/invalid-retry".equals(path)) {
99+
req.response().write("retry: not-a-number\ndata: test\n\n");
100+
req.response().end();
101+
}
102+
});
103+
104+
server.listen(8080).onComplete(tc.asyncAssertSuccess());
105+
}
106+
107+
@Test(timeout = 10000)
108+
public void testGetSseEvents(TestContext tc) throws Exception {
109+
Async async = tc.async();
110+
final List<SseEvent> events = new ArrayList<>();
111+
112+
client.get("/basic?count=5").as(SseBodyCodec.sseStream(stream -> {
113+
stream.handler(events::add);
114+
stream.endHandler(v -> {
115+
tc.assertEquals(5, events.size());
116+
for (int i = 0; i < 5; i++) {
117+
tc.assertEquals("event" + i, events.get(i).event());
118+
tc.assertEquals("data" + i, events.get(i).data());
119+
tc.assertEquals(String.valueOf(i), events.get(i).id());
120+
}
121+
async.complete();
122+
});
123+
})).send().onFailure(tc::fail);
124+
}
125+
126+
@Test(timeout = 10000)
127+
public void testMultilineData(TestContext tc) throws Exception {
128+
Async async = tc.async();
129+
final List<SseEvent> events = new ArrayList<>();
130+
131+
client.get("/multiline-data").as(SseBodyCodec.sseStream(stream -> {
132+
stream.handler(events::add);
133+
stream.endHandler(v -> {
134+
tc.assertEquals(1, events.size());
135+
// TODO: Per SSE spec, multi-line data should be joined by newlines (i.e., "line1\nline2\nline3").
136+
// The current implementation concatenates them without a separator.
137+
// This test verifies the current behavior.
138+
tc.assertEquals("line1line2line3", events.get(0).data());
139+
async.complete();
140+
});
141+
})).send().onFailure(tc::fail);
142+
}
143+
144+
@Test(timeout = 10000)
145+
public void testComments(TestContext tc) throws Exception {
146+
Async async = tc.async();
147+
final List<SseEvent> events = new ArrayList<>();
148+
149+
client.get("/comments").as(SseBodyCodec.sseStream(stream -> {
150+
stream.handler(events::add);
151+
stream.endHandler(v -> {
152+
tc.assertEquals(1, events.size());
153+
tc.assertEquals("test data", events.get(0).data());
154+
async.complete();
155+
});
156+
})).send().onFailure(tc::fail);
157+
}
158+
159+
@Test(timeout = 10000)
160+
public void testRetryField(TestContext tc) throws Exception {
161+
Async async = tc.async();
162+
final List<SseEvent> events = new ArrayList<>();
163+
164+
client.get("/retry").as(SseBodyCodec.sseStream(stream -> {
165+
stream.handler(events::add);
166+
stream.endHandler(v -> {
167+
tc.assertEquals(1, events.size());
168+
tc.assertEquals("test", events.get(0).data());
169+
tc.assertEquals(5000, events.get(0).retry());
170+
async.complete();
171+
});
172+
})).send().onFailure(tc::fail);
173+
}
174+
175+
@Test(timeout = 10000)
176+
public void testNoEventType(TestContext tc) throws Exception {
177+
Async async = tc.async();
178+
final List<SseEvent> events = new ArrayList<>();
179+
180+
client.get("/no-event-type").as(SseBodyCodec.sseStream(stream -> {
181+
stream.handler(events::add);
182+
stream.endHandler(v -> {
183+
tc.assertEquals(1, events.size());
184+
tc.assertEquals("message without event type", events.get(0).data());
185+
// Per SSE spec, the default event type is "message".
186+
// This implementation uses null. This test verifies the implementation's behavior.
187+
tc.assertNull(events.get(0).event());
188+
async.complete();
189+
});
190+
})).send().onFailure(tc::fail);
191+
}
192+
193+
@Test(timeout = 10000)
194+
public void testBurstEvents(TestContext tc) throws Exception {
195+
Async async = tc.async();
196+
final List<SseEvent> events = new ArrayList<>();
197+
198+
client.get("/burst?count=100").as(SseBodyCodec.sseStream(stream -> {
199+
stream.handler(events::add);
200+
stream.endHandler(v -> {
201+
tc.assertEquals(100, events.size());
202+
for (int i = 0; i < 100; i++) {
203+
tc.assertEquals("burst" + i, events.get(i).data());
204+
}
205+
async.complete();
206+
});
207+
})).send().onFailure(tc::fail);
208+
}
209+
210+
@Test(timeout = 10000)
211+
public void testPauseResume(TestContext tc) throws Exception {
212+
Async async = tc.async();
213+
final List<SseEvent> events = new ArrayList<>();
214+
final AtomicInteger pauseCount = new AtomicInteger(0);
215+
216+
client.get("/basic?count=10").as(SseBodyCodec.sseStream(stream -> {
217+
stream.handler(event -> {
218+
events.add(event);
219+
// Pause after every 3 events
220+
if (events.size() % 3 == 0 && pauseCount.get() < 2) {
221+
stream.pause();
222+
pauseCount.incrementAndGet();
223+
// Resume after a short delay
224+
vertx.setTimer(100, id -> stream.resume());
225+
}
226+
});
227+
stream.endHandler(v -> {
228+
tc.assertEquals(10, events.size());
229+
tc.assertTrue(pauseCount.get() >= 2, "Stream should have been paused at least twice");
230+
async.complete();
231+
});
232+
})).send().onFailure(tc::fail);
233+
}
234+
235+
@Test(timeout = 10000)
236+
public void testFetch(TestContext tc) throws Exception {
237+
Async async = tc.async();
238+
final List<SseEvent> events = new ArrayList<>();
239+
final AtomicInteger fetchCount = new AtomicInteger(0);
240+
241+
client.get("/basic?count=10").as(SseBodyCodec.sseStream(stream -> {
242+
stream.pause(); // Start paused
243+
stream.handler(event -> {
244+
events.add(event);
245+
fetchCount.incrementAndGet();
246+
// Only fetch 3 events total
247+
if (fetchCount.get() < 3) {
248+
stream.fetch(1);
249+
} else {
250+
// After receiving 3 events, complete the test
251+
vertx.setTimer(500, id -> {
252+
tc.assertEquals(3, events.size());
253+
async.complete();
254+
});
255+
}
256+
});
257+
stream.endHandler(v -> {
258+
// End handler may not be called if we don't fetch all events
259+
});
260+
// Kick off by fetching the first event
261+
stream.fetch(1);
262+
})).send().onFailure(tc::fail);
263+
}
264+
265+
@Test(timeout = 15000)
266+
public void testBackpressure(TestContext tc) throws Exception {
267+
Async async = tc.async();
268+
final List<SseEvent> events = new ArrayList<>();
269+
final List<Long> timestamps = new ArrayList<>();
270+
271+
client.get("/slow?count=5").as(SseBodyCodec.sseStream(stream -> {
272+
stream.handler(event -> {
273+
timestamps.add(System.currentTimeMillis());
274+
events.add(event);
275+
// Simulate slow processing by pausing briefly
276+
if (events.size() < 5) {
277+
stream.pause();
278+
vertx.setTimer(50, id -> stream.resume());
279+
}
280+
});
281+
stream.endHandler(v -> {
282+
tc.assertEquals(5, events.size());
283+
// Verify events were received over time (not all at once)
284+
long totalTime = timestamps.get(timestamps.size() - 1) - timestamps.get(0);
285+
tc.assertTrue(totalTime >= 750, "Events should be spread over time due to backpressure. Total time was " + totalTime);
286+
async.complete();
287+
});
288+
})).send().onFailure(tc::fail);
289+
}
290+
291+
@Test(timeout = 10000)
292+
public void testExceptionHandler(TestContext tc) throws Exception {
293+
Async async = tc.async();
294+
final List<Throwable> exceptions = new ArrayList<>();
295+
296+
client.get("/invalid-retry").as(SseBodyCodec.sseStream(stream -> {
297+
stream.handler(event -> {
298+
// This might or might not be called depending on when the parser fails
299+
});
300+
stream.exceptionHandler(exceptions::add);
301+
stream.endHandler(v -> {
302+
tc.assertEquals(1, exceptions.size(), "Expected one exception");
303+
tc.assertTrue(exceptions.get(0) instanceof NumberFormatException, "Expected a NumberFormatException");
304+
async.complete();
305+
});
306+
})).send().onFailure(tc::fail);
307+
}
308+
309+
@After
310+
public void close(TestContext tc) {
311+
if (server != null) {
312+
server.close();
313+
}
314+
if (client != null) {
315+
client.close();
316+
}
317+
if (vertx != null) {
318+
vertx.close().onComplete(tc.asyncAssertSuccess());
319+
}
320+
}
321+
322+
}

vertx-web-common/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,18 @@
1313

1414
<artifactId>vertx-web-common</artifactId>
1515

16+
<dependencies>
17+
<!-- Testing -->
18+
<dependency>
19+
<groupId>io.vertx</groupId>
20+
<artifactId>vertx-unit</artifactId>
21+
<scope>test</scope>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.hamcrest</groupId>
25+
<artifactId>hamcrest-core</artifactId>
26+
<version>2.2</version>
27+
<scope>test</scope>
28+
</dependency>
29+
</dependencies>
1630
</project>

0 commit comments

Comments
 (0)