Skip to content

Commit fdd4b43

Browse files
committed
feat: Adding support for SSE on the WebClient
Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent 92276ec commit fdd4b43

File tree

6 files changed

+816
-0
lines changed

6 files changed

+816
-0
lines changed

vertx-web-common/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,18 @@
1313

1414
<artifactId>vertx-web-common</artifactId>
1515

16+
<dependencies>
17+
<!-- Testing -->
18+
<dependency>
19+
<groupId>io.vertx</groupId>
20+
<artifactId>vertx-unit</artifactId>
21+
<scope>test</scope>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.hamcrest</groupId>
25+
<artifactId>hamcrest-core</artifactId>
26+
<version>2.2</version>
27+
<scope>test</scope>
28+
</dependency>
29+
</dependencies>
1630
</project>
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package io.vertx.ext.web.codec.sse;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.core.Handler;
5+
import io.vertx.core.buffer.Buffer;
6+
import io.vertx.core.streams.WriteStream;
7+
import io.vertx.ext.web.codec.BodyCodec;
8+
import io.vertx.ext.web.codec.spi.BodyStream;
9+
import java.nio.charset.StandardCharsets;
10+
11+
public class SseBodyCodec implements BodyCodec<Void> {
12+
13+
private final WriteStream<SseEvent> eventHandler;
14+
15+
private SseBodyCodec(WriteStream<SseEvent> eventHandler) {
16+
this.eventHandler = eventHandler;
17+
}
18+
19+
private static final int MAX_BUFFER = 8000000;
20+
21+
@Override
22+
public BodyStream<Void> stream() throws Exception {
23+
return new BodyStream<Void>() {
24+
private Buffer lineBuffer = Buffer.buffer();
25+
private SseEvent.Builder eventBuilder = SseEvent.builder();
26+
private Handler<Throwable> exceptionHandler;
27+
private Handler<Void> drainHandler;
28+
private int maxQueueSize = 8192;
29+
private boolean ended = false;
30+
31+
@Override
32+
public Future<Void> result() {
33+
return Future.succeededFuture();
34+
}
35+
36+
@Override
37+
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
38+
this.exceptionHandler = handler;
39+
return this;
40+
}
41+
42+
@Override
43+
public Future<Void> write(Buffer data) {
44+
if (ended) {
45+
return Future.failedFuture("Stream is ended");
46+
}
47+
48+
try {
49+
for (byte b : data.getBytes()) {
50+
if (b == '\n' || b == '\r') {
51+
// Process the current line (could be empty)
52+
String line = lineBuffer.toString(StandardCharsets.UTF_8);
53+
lineBuffer = Buffer.buffer();
54+
55+
if (line.isEmpty()) {
56+
// Empty line dispatches the event
57+
if (eventHandler != null) {
58+
eventHandler.write(eventBuilder.build());
59+
}
60+
eventBuilder = SseEvent.builder();
61+
} else {
62+
parseLine(line, eventBuilder);
63+
}
64+
} else {
65+
if (lineBuffer.length() > MAX_BUFFER) {
66+
return Future.failedFuture("Data is too big");
67+
}
68+
lineBuffer.appendByte(b);
69+
}
70+
}
71+
return Future.succeededFuture();
72+
} catch (Exception e) {
73+
if (exceptionHandler != null) {
74+
exceptionHandler.handle(e);
75+
}
76+
return Future.failedFuture(e);
77+
}
78+
}
79+
80+
@Override
81+
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
82+
this.maxQueueSize = maxSize;
83+
return this;
84+
}
85+
86+
@Override
87+
public boolean writeQueueFull() {
88+
return lineBuffer.length() >= maxQueueSize;
89+
}
90+
91+
@Override
92+
public WriteStream<Buffer> drainHandler(Handler<Void> handler) {
93+
this.drainHandler = handler;
94+
return this;
95+
}
96+
97+
@Override
98+
public void handle(Throwable event) {
99+
if (exceptionHandler != null) {
100+
exceptionHandler.handle(event);
101+
}
102+
}
103+
104+
@Override
105+
public Future<Void> end() {
106+
ended = true;
107+
// Process any remaining data in buffer
108+
if (lineBuffer.length() > 0) {
109+
String line = lineBuffer.toString(StandardCharsets.UTF_8);
110+
if (!line.isEmpty()) {
111+
parseLine(line, eventBuilder);
112+
}
113+
}
114+
// Dispatch final event if there's data
115+
if (eventHandler != null && (eventBuilder.toString().length() > 0)) {
116+
eventHandler.end(eventBuilder.build());
117+
}
118+
return Future.succeededFuture();
119+
}
120+
};
121+
}
122+
123+
public void parseLine(String line, SseEvent.Builder builder) {
124+
int colonIndex = line.indexOf(':');
125+
if (colonIndex == 0) {
126+
return;
127+
}
128+
if (colonIndex == -1) {
129+
processField(builder, line, "");
130+
return;
131+
}
132+
String field = line.substring(0, colonIndex);
133+
String value = line.substring(colonIndex + 1);
134+
// Remove leading space from value if present (SSE spec)
135+
if (value.startsWith(" ")) {
136+
value = value.substring(1);
137+
}
138+
processField(builder, field, value);
139+
}
140+
141+
public void processField(SseEvent.Builder builder, String field, String value) {
142+
// Field names must be compared literally, with no case folding performed.
143+
switch (field) {
144+
case "event":
145+
builder.event(value);
146+
break;
147+
case "data":
148+
builder.data(value);
149+
break;
150+
case "id":
151+
builder.id(value);
152+
break;
153+
case "retry":
154+
// If the field value consists of only ASCII digits, then interpret the field value as an
155+
// integer in base ten, and set the event stream's reconnection time to that integer.
156+
// Otherwise, ignore the field.
157+
try {
158+
builder.retry(Integer.parseInt(value));
159+
} catch (NumberFormatException ex) {
160+
throw new RuntimeException("Invalid \"retry\" value:" + value);
161+
}
162+
break;
163+
default:
164+
// Ignore unknown fields as per SSE spec
165+
break;
166+
}
167+
}
168+
169+
public static BodyCodec<Void> sseStream(WriteStream<SseEvent> handler) {
170+
return new SseBodyCodec(handler);
171+
}
172+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package io.vertx.ext.web.codec.sse;
2+
3+
public class SseEvent {
4+
5+
private final String id;
6+
private final String event;
7+
private final String data;
8+
private final int retry;
9+
10+
public SseEvent(String id, String event, String data, int retry) {
11+
this.id = id;
12+
this.event = event;
13+
this.data = data;
14+
this.retry = retry;
15+
}
16+
17+
public String id() {
18+
return id;
19+
}
20+
21+
public String event() {
22+
return event;
23+
}
24+
25+
public String data() {
26+
return data;
27+
}
28+
29+
public int retry() {
30+
return retry;
31+
}
32+
33+
@Override
34+
public boolean equals(Object o) {
35+
if (this == o) {
36+
return true;
37+
}
38+
if (o == null || getClass() != o.getClass()) {
39+
return false;
40+
}
41+
SseEvent sseEvent = (SseEvent) o;
42+
return retry == sseEvent.retry
43+
&& java.util.Objects.equals(id, sseEvent.id)
44+
&& java.util.Objects.equals(event, sseEvent.event)
45+
&& java.util.Objects.equals(data, sseEvent.data);
46+
}
47+
48+
@Override
49+
public int hashCode() {
50+
return java.util.Objects.hash(id, event, data, retry);
51+
}
52+
53+
@Override
54+
public String toString() {
55+
return "SseEvent{"
56+
+ "id='" + id + '\''
57+
+ ", event='" + event + '\''
58+
+ ", data='" + data + '\''
59+
+ ", retry=" + retry
60+
+ '}';
61+
}
62+
63+
public static Builder builder() {
64+
return new Builder();
65+
}
66+
67+
public static class Builder {
68+
69+
private String id;
70+
private String event;
71+
private StringBuilder data = new StringBuilder();
72+
private int retry;
73+
74+
public Builder id(String id) {
75+
this.id = id;
76+
return this;
77+
}
78+
79+
public Builder event(String event) {
80+
this.event = event;
81+
return this;
82+
}
83+
84+
public Builder data(String data) {
85+
this.data.append(data);
86+
return this;
87+
}
88+
89+
public Builder retry(int retry) {
90+
this.retry = retry;
91+
return this;
92+
}
93+
94+
public SseEvent build() {
95+
return new SseEvent(this.id, this.event, this.data.toString(), this.retry);
96+
}
97+
}
98+
}

vertx-web-common/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
exports io.vertx.ext.web.common;
2525
exports io.vertx.ext.web.common.template;
2626
exports io.vertx.ext.web.codec;
27+
exports io.vertx.ext.web.codec.sse;
2728
exports io.vertx.ext.web.codec.spi;
2829
exports io.vertx.ext.web.multipart;
2930

0 commit comments

Comments
 (0)