diff --git a/vertx-web-client/src/main/asciidoc/index.adoc b/vertx-web-client/src/main/asciidoc/index.adoc index 39e679b63b..31b3a5eb75 100644 --- a/vertx-web-client/src/main/asciidoc/index.adoc +++ b/vertx-web-client/src/main/asciidoc/index.adoc @@ -371,6 +371,13 @@ that decode the response to a specific type WARNING: this is only valid for the response decoded as a buffer. +=== Server-Sent Events + +Using a specific body codec: {@link io.vertx.ext.web.codec.impl.SseBodyCodec}, you can decode a Server-Sent Events stream into a list of events: +[source,$lang] +---- +{@link examples.WebClientExamples#receiveResponseAsServerSentEvents} +---- [[http-response-expectations]] === Response expectations diff --git a/vertx-web-client/src/main/java/examples/WebClientExamples.java b/vertx-web-client/src/main/java/examples/WebClientExamples.java index 4c62cd7765..3e561ba116 100644 --- a/vertx-web-client/src/main/java/examples/WebClientExamples.java +++ b/vertx-web-client/src/main/java/examples/WebClientExamples.java @@ -727,4 +727,16 @@ public void testSocketAddress(WebClient client) { .onFailure(err -> System.out.println("Something went wrong " + err.getMessage())); } + + public static void receiveResponseAsServerSentEvents(Vertx vertx, int servicePort) { + WebClient client = WebClient.create(vertx, new WebClientOptions().setDefaultPort(servicePort).setDefaultHost("localhost")); + client.get("/basic?count=5").as(BodyCodec.sseStream(stream -> { + stream.handler(v -> System.out.println("Event received " + v)); + stream.endHandler(v -> System.out.println("End of stream " + v)); + })).send().expecting(HttpResponseExpectation.SC_OK) + .onSuccess(res -> + System.out.println("Received response with status code" + res.statusCode())) + .onFailure(err -> + System.out.println("Something went wrong " + err.getMessage())); + } } diff --git a/vertx-web-client/src/test/java/io/vertx/ext/web/client/SseClientTest.java b/vertx-web-client/src/test/java/io/vertx/ext/web/client/SseClientTest.java new file mode 100644 index 0000000000..7ad8fe581d --- /dev/null +++ b/vertx-web-client/src/test/java/io/vertx/ext/web/client/SseClientTest.java @@ -0,0 +1,334 @@ +package io.vertx.ext.web.client; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.ext.web.codec.BodyCodec; +import io.vertx.ext.web.codec.SseEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(VertxUnitRunner.class) +public class SseClientTest { + + private Vertx vertx; + private WebClient client; + private HttpServer server; + + @Before + public void setup(TestContext tc) { + vertx = Vertx.vertx(); + client = WebClient.create(vertx, new WebClientOptions().setDefaultPort(8080).setDefaultHost("localhost")); + + server = vertx.createHttpServer().requestHandler(req -> { + String path = req.path(); + req.response().setChunked(true); + + // set headers + req.response().headers().add("Content-Type", "text/event-stream;charset=UTF-8"); + req.response().headers().add("Connection", "keep-alive"); + req.response().headers().add("Cache-Control", "no-cache"); + req.response().headers().add("Access-Control-Allow-Origin", "*"); + + if (null != path) switch (path) { + case "/basic": + int count = Integer.parseInt(req.getParam("count")); + vertx.setPeriodic(50, new Handler() { + private int index = 0; + + @Override + public void handle(Long timerId) { + if (index < count) { + String event = String.format("event: event%d\ndata: data%d\nid: %d\n\n", index, index, index); + index++; + req.response().write(event); + } else { + vertx.cancelTimer(timerId); + req.response().end(); + } + } + }); + break; + case "/multiline-data": + req.response().write("data: line1\ndata: line2\ndata: line3\n\n"); + req.response().end(); + break; + case "/comments": + req.response().write(": this is a comment\ndata: test data\n\n"); + req.response().end(); + break; + case "/retry": + req.response().write("retry: 5000\ndata: test\n\n"); + req.response().end(); + break; + case "/no-event-type": + req.response().write("data: message without event type\n\n"); + req.response().end(); + break; + case "/burst": + // Send many events quickly to test backpressure + count = Integer.parseInt(req.getParam("count")); + for (int i = 0; i < count; i++) { + String event = String.format("data: burst%d\n\n", i); + req.response().write(event); + } + req.response().end(); + break; + case "/slow":{ + // Send events slowly to test pause/resume + count = Integer.parseInt(req.getParam("count")); + vertx.setPeriodic(200, new Handler() { + private int index = 0; + + @Override + public void handle(Long timerId) { + if (index < count) { + String event = String.format("data: slow%d\n\n", index); + index++; + req.response().write(event); + } else { + vertx.cancelTimer(timerId); + req.response().end(); + } + } + }); break; + } + case "/invalid-retry": + req.response().write("retry: not-a-number\ndata: test\n\n"); + req.response().end(); + break; + default: + break; + } + }); + + server.listen(8080).onComplete(tc.asyncAssertSuccess()); + } + + @Test(timeout = 10000) + public void testGetSseEvents(TestContext tc) throws Exception { + Async async = tc.async(); + final List events = new ArrayList<>(); + + client.get("/basic?count=5").as(BodyCodec.sseStream(stream -> { + stream.handler(events::add); + stream.endHandler(v -> { + tc.assertEquals(5, events.size()); + for (int i = 0; i < 5; i++) { + tc.assertEquals("event" + i, events.get(i).event()); + tc.assertEquals("data" + i, events.get(i).data()); + tc.assertEquals(String.valueOf(i), events.get(i).id()); + } + async.complete(); + }); + })).send().onFailure(tc::fail); + } + + @Test(timeout = 10000) + public void testMultilineData(TestContext tc) throws Exception { + Async async = tc.async(); + final List events = new ArrayList<>(); + + client.get("/multiline-data").as(BodyCodec.sseStream(stream -> { + stream.handler(events::add); + stream.endHandler(v -> { + tc.assertEquals(1, events.size()); + // Per SSE spec, multi-line data should be joined by newlines + tc.assertEquals("line1\nline2\nline3", events.get(0).data()); + async.complete(); + }); + })).send().onFailure(tc::fail); + } + + @Test(timeout = 10000) + public void testComments(TestContext tc) throws Exception { + Async async = tc.async(); + final List events = new ArrayList<>(); + + client.get("/comments").as(BodyCodec.sseStream(stream -> { + stream.handler(events::add); + stream.endHandler(v -> { + tc.assertEquals(1, events.size()); + tc.assertEquals("test data", events.get(0).data()); + async.complete(); + }); + })).send().onFailure(tc::fail); + } + + @Test(timeout = 10000) + public void testRetryField(TestContext tc) throws Exception { + Async async = tc.async(); + final List events = new ArrayList<>(); + + client.get("/retry").as(BodyCodec.sseStream(stream -> { + stream.handler(events::add); + stream.endHandler(v -> { + tc.assertEquals(1, events.size()); + tc.assertEquals("test", events.get(0).data()); + tc.assertEquals(5000, events.get(0).retry()); + async.complete(); + }); + })).send().onFailure(tc::fail); + } + + @Test(timeout = 10000) + public void testNoEventType(TestContext tc) throws Exception { + Async async = tc.async(); + final List events = new ArrayList<>(); + + client.get("/no-event-type").as(BodyCodec.sseStream(stream -> { + stream.handler(events::add); + stream.endHandler(v -> { + tc.assertEquals(1, events.size()); + tc.assertEquals("message without event type", events.get(0).data()); + // Per SSE spec, the default event type is "message". + // This implementation uses null. This test verifies the implementation's behavior. + tc.assertEquals("message", events.get(0).event()); + async.complete(); + }); + })).send().onFailure(tc::fail); + } + + @Test(timeout = 10000) + public void testBurstEvents(TestContext tc) throws Exception { + Async async = tc.async(); + final List events = new ArrayList<>(); + + client.get("/burst?count=100").as(BodyCodec.sseStream(stream -> { + stream.handler(events::add); + stream.endHandler(v -> { + tc.assertEquals(100, events.size()); + for (int i = 0; i < 100; i++) { + tc.assertEquals("burst" + i, events.get(i).data()); + } + async.complete(); + }); + })).send().onFailure(tc::fail); + } + + @Test(timeout = 10000) + public void testPauseResume(TestContext tc) throws Exception { + Async async = tc.async(); + final List events = new ArrayList<>(); + final AtomicInteger pauseCount = new AtomicInteger(0); + + client.get("/basic?count=10").as(BodyCodec.sseStream(stream -> { + stream.handler(event -> { + events.add(event); + // Pause after every 3 events + if (events.size() % 3 == 0 && pauseCount.get() < 2) { + stream.pause(); + pauseCount.incrementAndGet(); + // Resume after a short delay + vertx.setTimer(100, id -> stream.resume()); + } + }); + stream.endHandler(v -> { + tc.assertEquals(10, events.size()); + tc.assertTrue(pauseCount.get() >= 2, "Stream should have been paused at least twice"); + async.complete(); + }); + })).send().onFailure(tc::fail); + } + + @Test(timeout = 10000) + public void testFetch(TestContext tc) throws Exception { + Async async = tc.async(); + final List events = new ArrayList<>(); + final AtomicInteger fetchCount = new AtomicInteger(0); + + client.get("/basic?count=10").as(BodyCodec.sseStream(stream -> { + stream.pause(); // Start paused + stream.handler(event -> { + events.add(event); + fetchCount.incrementAndGet(); + // Only fetch 3 events total + if (fetchCount.get() < 3) { + stream.fetch(1); + } else { + // After receiving 3 events, complete the test + vertx.setTimer(500, id -> { + tc.assertEquals(3, events.size()); + async.complete(); + }); + } + }); + stream.endHandler(v -> { + // End handler may not be called if we don't fetch all events + }); + // Kick off by fetching the first event + stream.fetch(1); + })).send().onFailure(tc::fail); + } + + @Test(timeout = 15000) + public void testBackpressure(TestContext tc) throws Exception { + Async async = tc.async(); + final List events = new ArrayList<>(); + final List timestamps = new ArrayList<>(); + + client.get("/slow?count=5").as(BodyCodec.sseStream(stream -> { + stream.handler(event -> { + timestamps.add(System.currentTimeMillis()); + events.add(event); + // Simulate slow processing by pausing briefly + if (events.size() < 5) { + stream.pause(); + vertx.setTimer(50, id -> stream.resume()); + } + }); + stream.endHandler(v -> { + tc.assertEquals(5, events.size()); + // Verify events were received over time (not all at once) + long totalTime = timestamps.get(timestamps.size() - 1) - timestamps.get(0); + tc.assertTrue(totalTime >= 750, "Events should be spread over time due to backpressure. Total time was " + totalTime); + async.complete(); + }); + })).send().onFailure(tc::fail); + } + + @Test(timeout = 10000) + public void testExceptionHandler(TestContext tc) throws Exception { + Async async = tc.async(); + final List exceptions = new ArrayList<>(); + + client.get("/invalid-retry").as(BodyCodec.sseStream(stream -> { + stream.handler(event -> { + // This might or might not be called depending on when the parser fails + }); + stream.exceptionHandler(exceptions::add); + stream.endHandler(v -> { + tc.assertEquals(1, exceptions.size(), "Expected one exception"); + tc.assertTrue(exceptions.get(0) instanceof RuntimeException, "Expected a RuntimeException"); + tc.assertTrue(exceptions.get(0).getMessage().contains("Invalid \"retry\" value")); + tc.assertNotNull(exceptions.get(0).getCause(), "Expected a cause for the exception"); + tc.assertTrue(exceptions.get(0).getCause() instanceof NumberFormatException, "Expected cause to be a NumberFormatException"); + async.complete(); + }); + })).send().onFailure(tc::fail); + } + + @After + public void close(TestContext tc) { + if (server != null) { + server.close(); + } + if (client != null) { + client.close(); + } + if (vertx != null) { + vertx.close().onComplete(tc.asyncAssertSuccess()); + } + } + +} diff --git a/vertx-web-common/pom.xml b/vertx-web-common/pom.xml index b98dbb2847..3d6a58df86 100644 --- a/vertx-web-common/pom.xml +++ b/vertx-web-common/pom.xml @@ -14,4 +14,18 @@ vertx-web-common Vert.x Web Common + + + + io.vertx + vertx-unit + test + + + org.hamcrest + hamcrest-core + 2.2 + test + + diff --git a/vertx-web-common/src/main/java/io/vertx/ext/web/codec/BodyCodec.java b/vertx-web-common/src/main/java/io/vertx/ext/web/codec/BodyCodec.java index de728fd08d..15ca926124 100644 --- a/vertx-web-common/src/main/java/io/vertx/ext/web/codec/BodyCodec.java +++ b/vertx-web-common/src/main/java/io/vertx/ext/web/codec/BodyCodec.java @@ -16,16 +16,20 @@ package io.vertx.ext.web.codec; import io.vertx.codegen.annotations.GenIgnore; +import io.vertx.codegen.annotations.Unstable; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.AsyncResult; + import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.parsetools.JsonParser; +import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; import io.vertx.ext.web.codec.impl.BodyCodecImpl; import io.vertx.ext.web.codec.impl.JsonStreamBodyCodec; +import io.vertx.ext.web.codec.impl.SseBodyCodec; import io.vertx.ext.web.codec.impl.StreamingBodyCodec; import io.vertx.ext.web.codec.spi.BodyStream; @@ -139,6 +143,17 @@ static BodyCodec jsonStream(JsonParser parser) { return new JsonStreamBodyCodec(parser); } + /** + * A body codec that parse the response as a Server-SentEvent stream. + * + * @param handler the non-null {@code handler} for the stream of Server-Sent Events. + * @return the body codec for a write stream + */ + @Unstable + static BodyCodec sseStream(Handler> handler) { + return new SseBodyCodec(handler); + } + /** * Create the {@link BodyStream}. *

diff --git a/vertx-web-common/src/main/java/io/vertx/ext/web/codec/SseEvent.java b/vertx-web-common/src/main/java/io/vertx/ext/web/codec/SseEvent.java new file mode 100644 index 0000000000..8c7b072124 --- /dev/null +++ b/vertx-web-common/src/main/java/io/vertx/ext/web/codec/SseEvent.java @@ -0,0 +1,131 @@ +package io.vertx.ext.web.codec; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.codegen.annotations.Unstable; +import io.vertx.core.json.JsonObject; + +/** + * This represents a Server-Sent Event. + * + * @see Server-sent events + */ +@Unstable +@DataObject +public class SseEvent { + + private String id; + private String event; + private String data; + private int retry; + + public SseEvent(){ + + } + + public SseEvent(String id, String event, String data, int retry) { + this.id = id; + this.event = event; + this.data = data; + this.retry = retry; + } + + public SseEvent(JsonObject json) { + for (java.util.Map.Entry member : json) { + switch (member.getKey()) { + case "id": + if (member.getValue() instanceof String) { + this.id = (String) member.getValue(); + } + break; + case "event": + if (member.getValue() instanceof String) { + this.event = (String) member.getValue(); + } + break; + case "data": + if (member.getValue() instanceof String) { + this.data = (String) member.getValue(); + } + break; + case "retry": + if (member.getValue() instanceof Number) { + this.retry = ((Number)member.getValue()).intValue(); + } + break; + } + } + } + + public SseEvent(SseEvent other) { + this.id = other.id(); + this.event = other.event(); + this.data = other.data(); + this.retry = other.retry(); + } + + /** + * Returns the event id. + * + * @return the event id. + */ + public String id() { + return id; + } + + /** + * Returns the type of the event. + * + * @return the type of the event. + */ + public String event() { + return event; + } + + /** + * Returns the payload of the event. + * + * @return the payload of the event. + */ + public String data() { + return data; + } + + /** + * Returns the reconnection time. + * + * @return the reconnection time. + */ + public int retry() { + return retry; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SseEvent sseEvent = (SseEvent) o; + return retry == sseEvent.retry + && java.util.Objects.equals(id, sseEvent.id) + && java.util.Objects.equals(event, sseEvent.event) + && java.util.Objects.equals(data, sseEvent.data); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(id, event, data, retry); + } + + @Override + public String toString() { + return "id: " + id + '\n' + + "event: " + event + '\n' + + "data: '" + data + '\n' + + "retry: " + retry + '\n' + + '\n'; + } + +} diff --git a/vertx-web-common/src/main/java/io/vertx/ext/web/codec/impl/SseBodyCodec.java b/vertx-web-common/src/main/java/io/vertx/ext/web/codec/impl/SseBodyCodec.java new file mode 100644 index 0000000000..e9cf24161d --- /dev/null +++ b/vertx-web-common/src/main/java/io/vertx/ext/web/codec/impl/SseBodyCodec.java @@ -0,0 +1,311 @@ +package io.vertx.ext.web.codec.impl; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; +import io.vertx.ext.web.codec.BodyCodec; +import io.vertx.ext.web.codec.SseEvent; +import io.vertx.ext.web.codec.spi.BodyStream; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A codec for processing and decoding Server Sent Event from streaming HTTP body content . + */ +public class SseBodyCodec implements BodyCodec { + + private final Handler> handler; + + public SseBodyCodec(Handler> handler) { + this.handler = handler; + } + + @Override + public void create(Handler>> completionHandler) { + SseBodyStream stream = new SseBodyStream(); + handler.handle(stream); + completionHandler.handle(Future.succeededFuture(stream)); + } + + static class SseBodyStream implements BodyStream, ReadStream { + + private static final int LOW_WATERMARK = 1024; + private static final int HIGH_WATERMARK = 4 * 1024; + + private Handler handler; + private Handler endHandler; + private final AtomicLong demand = new AtomicLong(Long.MAX_VALUE); + private Buffer content = Buffer.buffer(); + private volatile boolean ended; + private Handler drainHandler; + private Handler errorHandler; + private volatile boolean writeQueueFull; + private volatile boolean failed; + private final Object lock = new Object(); + private final Promise promise = Promise.promise(); + + @Override + public ReadStream handler(@Nullable Handler handler) { + this.handler = handler; + return this; + } + + @Override + public ReadStream pause() { + demand.set(0L); + return this; + } + + @Override + public ReadStream resume() { + demand.set(Long.MAX_VALUE); + check(); + return this; + } + + @Override + public ReadStream fetch(long l) { + if (l <= 0) { + return this; + } + demand.getAndAdd(l); + check(); + return this; + } + + @Override + public ReadStream endHandler(@Nullable Handler handler) { + this.endHandler = handler; + return this; + } + + SseEvent nextSseEvent() { + SseEventBuilder eventBuilder = new SseEventBuilder(); + int lineStart = 0; + byte[] bytes = content.getBytes(); + + for (int i = 0; i < bytes.length; i++) { + byte b = bytes[i]; + if (b == '\n' || b == '\r') { + // Extract the line without the newline character + String line = content.getString(lineStart, i, "UTF-8"); + + if (line.isEmpty()) { + // Empty line dispatches the event + content = content.getBuffer(i + 1, content.length()); + return eventBuilder.build(); + } else { + eventBuilder.parseLine(line); + } + + lineStart = i + 1; + } + } + return null; + } + + void check() { + if (failed) { + return; + } + while (true) { + if (demand.get() == 0L) { + break; + } + SseEvent event; + try { + synchronized (lock) { + event = nextSseEvent(); + writeQueueFull |= writeQueueFull(); + } + } catch (Exception e) { + failed = true; + handle(e); + handleEnd(); + return; + } + if (event == null) { + if (ended) { + handleEnd(); + } + break; + } + demand.updateAndGet(d -> d == Long.MAX_VALUE ? d : d - 1); + Handler h = handler; + if (h != null) { + h.handle(event); + } + } + Handler h = null; + synchronized (lock) { + if (content.length() < LOW_WATERMARK && writeQueueFull) { + writeQueueFull = false; + h = drainHandler; + } + } + if (h != null) { + h.handle(null); + } + } + + private void handleEnd() { + Handler h = endHandler; + if (h != null) { + h.handle(null); + } + } + + @Override + public void write(Buffer buffer, Handler> handler) { + synchronized (lock) { + content.appendBuffer(buffer); + } + check(); + if (handler != null) { + handler.handle(Future.succeededFuture()); + } + } + + @Override + public Future write(Buffer buffer) { + Promise promise = Promise.promise(); + write(buffer, promise); + return promise.future(); + } + + @Override + public boolean writeQueueFull() { + return content.length() >= HIGH_WATERMARK; + } + + @Override + public WriteStream drainHandler(@Nullable Handler handler) { + drainHandler = handler; + return this; + } + + @Override + public void end(Handler> handler) { + ended = true; + check(); + promise.tryComplete(); + if (handler != null) { + handler.handle(Future.succeededFuture()); + } + } + + @Override + public Future result() { + return promise.future(); + } + + @Override + public SseBodyStream exceptionHandler(@Nullable Handler handler) { + this.errorHandler = handler; + return this; + } + + @Override + public void handle(Throwable throwable) { + Handler h = errorHandler; + if (h != null) { + h.handle(throwable); + } + } + + @Override + public WriteStream setWriteQueueMaxSize(int i) { + return this; + } + } + + private static class SseEventBuilder { + + private String id; + private String event = "message"; + private StringBuilder data = new StringBuilder(); + private int retry; + + SseEventBuilder id(String id) { + this.id = id; + return this; + } + + SseEventBuilder event(String event) { + this.event = event; + return this; + } + + SseEventBuilder data(String data) { + if (this.data.length() > 0) { + this.data.append('\n'); + } + this.data.append(data); + return this; + } + + SseEventBuilder retry(int retry) { + this.retry = retry; + return this; + } + + void parseLine(String line) { + int colonIndex = line.indexOf(':'); + if (colonIndex == 0) { + return; + } + if (colonIndex == -1) { + processField(line, ""); + return; + } + String field = line.substring(0, colonIndex); + String value = line.substring(colonIndex + 1); + // Remove leading space from value if present (SSE spec) + if (value.startsWith(" ")) { + value = value.substring(1); + } + processField(field, value); + } + + private void processField(String field, String value) { + // Field names must be compared literally, with no case folding performed. + switch (field) { + case "event": + event(value); + break; + case "data": + data(value); + break; + case "id": + id(value); + break; + case "retry": + // If the field value consists of only ASCII digits, then interpret the field value as an + // integer in base ten, and set the event stream's reconnection time to that integer. + // Otherwise, ignore the field. + try { + retry(Integer.parseInt(value)); + } catch (NumberFormatException ex) { + throw new RuntimeException("Invalid \"retry\" value:" + value, ex); + } + break; + default: + // Ignore unknown fields as per SSE spec + break; + } + } + + public SseEvent build() { + String dataStr = this.data.toString(); + // Remove trailing LF if present (SSE spec requirement) + if (dataStr.endsWith("\n")) { + dataStr = dataStr.substring(0, dataStr.length() - 1); + } + return new SseEvent(this.id, this.event, dataStr, this.retry); + } + } +} diff --git a/vertx-web-common/src/test/java/io/vertx/ext/web/codec/tests/sse/SseBodyCodecTest.java b/vertx-web-common/src/test/java/io/vertx/ext/web/codec/tests/sse/SseBodyCodecTest.java new file mode 100644 index 0000000000..1cf5b6516c --- /dev/null +++ b/vertx-web-common/src/test/java/io/vertx/ext/web/codec/tests/sse/SseBodyCodecTest.java @@ -0,0 +1,651 @@ +package io.vertx.ext.web.codec.tests.sse; + +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.codec.BodyCodec; +import io.vertx.ext.web.codec.SseEvent; +import io.vertx.ext.web.codec.spi.BodyStream; +import io.vertx.test.core.VertxTestBase; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +public class SseBodyCodecTest extends VertxTestBase { + + @Test + public void testBasicEventParsing() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("event: test\ndata: hello world\nid: 1\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + + SseEvent event = events.get(0); + assertEquals("test", event.event()); + assertEquals("hello world", event.data()); + assertEquals("1", event.id()); + assertEquals(0, event.retry()); + + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testMultipleEvents() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("event: first\ndata: data1\n\nevent: second\ndata: data2\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(2, events.size()); + + assertEquals("first", events.get(0).event()); + assertEquals("data1", events.get(0).data()); + assertEquals("second", events.get(1).event()); + assertEquals("data2", events.get(1).data()); + + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testRetryField() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("retry: 5000\ndata: test\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + assertEquals(5000, events.get(0).retry()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testInvalidRetryField() { + AtomicReference caught = new AtomicReference<>(); + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(evt -> fail("Should not receive event")); + stream.exceptionHandler(err -> { + caught.set(err); + testComplete(); + }); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + Buffer data = Buffer.buffer("retry: invalid\ndata: test\n\n"); + stream.write(data); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + assertNotNull(caught.get()); + assertTrue(caught.get().getMessage().contains("Invalid \"retry\" value")); + } + + @Test + public void testCommentLines() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer(": this is a comment\ndata: actual data\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + assertEquals("actual data", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testFieldWithoutColon() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("data\nevent: test\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + assertEquals("test", events.get(0).event()); + assertEquals("", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testFieldWithLeadingSpace() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("data: value with space\nevent: test\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + assertEquals("value with space", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testMultipleDataFields() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("data: line1\ndata: line2\ndata: line3\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + // Per SSE spec: multiple data fields should be concatenated with newlines + assertEquals("line1\nline2\nline3", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testUnknownFieldsIgnored() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("unknown: value\ndata: test\ncustom: ignored\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + assertEquals("test", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testBackpressure() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + // Write enough data to trigger backpressure (HIGH_WATERMARK = 4096) + StringBuilder largeData = new StringBuilder(); + for (int i = 0; i < 5000; i++) { + largeData.append('x'); + } + + Buffer data = Buffer.buffer("data: " + largeData); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertTrue(stream.writeQueueFull()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testStreamEnded() { + List events = new ArrayList<>(); + AtomicReference endCalled = new AtomicReference<>(false); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + stream.endHandler(v -> endCalled.set(true)); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("data: test\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + + stream.end().onComplete(endAr -> { + assertTrue(endAr.succeeded()); + assertTrue(endCalled.get()); + testComplete(); + }); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testEndWithPendingData() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("data: incomplete"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(0, events.size()); // No complete event yet + + stream.end().onComplete(endAr -> { + assertTrue(endAr.succeeded()); + // Incomplete data should not be dispatched + assertEquals(0, events.size()); + testComplete(); + }); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testCarriageReturnLineSeparator() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("data: test\r\r"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + assertEquals("test", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testWriteQueueMethods() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + stream.setWriteQueueMaxSize(100); + assertFalse(stream.writeQueueFull()); + + AtomicReference drainCalled = new AtomicReference<>(); + stream.drainHandler(v -> drainCalled.set(v)); + + testComplete(); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testExceptionHandler() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + AtomicReference exception = new AtomicReference<>(); + stream.exceptionHandler(exception::set); + + // The exceptionHandler returns null in the current implementation + // This test just verifies it doesn't crash + testComplete(); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testResultFuture() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Future result = stream.result(); + assertFalse(result.isComplete()); // Not complete until end() is called + + stream.end(endAr -> { + assertTrue(endAr.succeeded()); + assertTrue(result.isComplete()); // Now complete after end() + assertTrue(result.succeeded()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testPauseResume() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.pause(); + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("data: test1\n\ndata: test2\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + // Events should not be delivered while paused + assertEquals(0, events.size()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testFetch() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.pause(); + stream.handler(events::add); + stream.fetch(1); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + Buffer data = Buffer.buffer("data: test1\n\ndata: test2\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + // Only one event should be delivered + assertEquals(1, events.size()); + assertEquals("test1", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testMultipleDataFieldsWithTrailingNewline() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + // According to SSE spec, when concatenating multiple data fields: + // 1. Append each field value with a newline + // 2. Remove the final trailing newline before dispatching + Buffer data = Buffer.buffer("data: first\ndata: second\ndata: third\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + // The trailing newline after "third" should be stripped + assertEquals("first\nsecond\nthird", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testSingleDataFieldNoTrailingNewline() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + // Single data field - no trailing newline to strip + Buffer data = Buffer.buffer("data: single line\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + assertEquals("single line", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testMultipleDataFieldsWithEmptyLines() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + // Test with empty data fields (should still add newlines) + Buffer data = Buffer.buffer("data: line1\ndata:\ndata: line3\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + // Empty data field still contributes a newline + assertEquals("line1\n\nline3", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } + + @Test + public void testDataFieldWithActualNewlines() { + List events = new ArrayList<>(); + + BodyCodec codec = BodyCodec.sseStream(stream -> { + stream.handler(events::add); + }); + + codec.create(ar -> { + if (ar.succeeded()) { + BodyStream stream = ar.result(); + + // SSE spec allows multiline data by using multiple data: fields + Buffer data = Buffer.buffer("data: This is\ndata: a multiline\ndata: message\n\n"); + stream.write(data).onComplete(writeAr -> { + assertTrue(writeAr.succeeded()); + assertEquals(1, events.size()); + assertEquals("This is\na multiline\nmessage", events.get(0).data()); + testComplete(); + }); + } else { + fail(ar.cause().getMessage()); + } + }); + + await(); + } +} diff --git a/vertx-web/src/test/java/io/vertx/ext/web/handler/AuthWithSessionsTest.java b/vertx-web/src/test/java/io/vertx/ext/web/handler/AuthWithSessionsTest.java index 556d4d35ae..62e32651e4 100644 --- a/vertx-web/src/test/java/io/vertx/ext/web/handler/AuthWithSessionsTest.java +++ b/vertx-web/src/test/java/io/vertx/ext/web/handler/AuthWithSessionsTest.java @@ -29,6 +29,4 @@ public void testAuthWithSessions() throws Exception { } - - } diff --git a/vertx-web/src/test/java/io/vertx/ext/web/handler/sockjs/SockJSProtocolTest.java b/vertx-web/src/test/java/io/vertx/ext/web/handler/sockjs/SockJSProtocolTest.java index df1e2c8f4b..8a0d81eeef 100644 --- a/vertx-web/src/test/java/io/vertx/ext/web/handler/sockjs/SockJSProtocolTest.java +++ b/vertx-web/src/test/java/io/vertx/ext/web/handler/sockjs/SockJSProtocolTest.java @@ -49,6 +49,7 @@ public class SockJSProtocolTest { private static Vertx vertx; private static HttpServer server; + @BeforeClass public static void before() throws Exception { vertx = Vertx.vertx();