Skip to content

Commit c110062

Browse files
committed
Dispatch vert.x event bus consumer events in correct context
Fixes quarkusio#38061
1 parent a98fb44 commit c110062

File tree

10 files changed

+380
-83
lines changed

10 files changed

+380
-83
lines changed

extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.quarkus.vertx.runtime;
22

3-
import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;
43
import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setCurrentContextSafe;
54
import static io.smallrye.common.expression.Expression.Flag.LENIENT_SYNTAX;
65
import static io.smallrye.common.expression.Expression.Flag.NO_TRIM;
@@ -32,9 +31,7 @@
3231
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
3332
import io.smallrye.common.expression.Expression;
3433
import io.smallrye.common.expression.ResolveContext;
35-
import io.smallrye.common.vertx.VertxContext;
3634
import io.vertx.core.AsyncResult;
37-
import io.vertx.core.Context;
3835
import io.vertx.core.Future;
3936
import io.vertx.core.Handler;
4037
import io.vertx.core.Vertx;
@@ -123,35 +120,30 @@ public void handle(Void x) {
123120
consumer.handler(new Handler<Message<Object>>() {
124121
@Override
125122
public void handle(Message<Object> m) {
123+
// Will run on the context used for the consumer registration.
124+
// It's a duplicated context, but we need to mark it as safe.
125+
// The safety comes from the fact that it's instantiated by Vert.x for every
126+
// message.
127+
setCurrentContextSafe(true);
126128
if (blocking) {
127-
// We need to create a duplicated context from the "context"
128-
Context dup = VertxContext.getOrCreateDuplicatedContext(context);
129-
setContextSafe(dup, true);
130-
131129
if (runOnVirtualThread) {
132-
// Switch to a Vert.x context to capture it and use it during the invocation.
133-
dup.runOnContext(new Handler<Void>() {
130+
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
134131
@Override
135-
public void handle(Void event) {
136-
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
137-
@Override
138-
public void run() {
139-
try {
140-
invoker.invoke(m);
141-
} catch (Exception e) {
142-
if (m.replyAddress() == null) {
143-
// No reply handler
144-
throw wrapIfNecessary(e);
145-
} else {
146-
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
147-
}
148-
}
132+
public void run() {
133+
try {
134+
invoker.invoke(m);
135+
} catch (Exception e) {
136+
if (m.replyAddress() == null) {
137+
// No reply handler
138+
throw wrapIfNecessary(e);
139+
} else {
140+
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
149141
}
150-
});
142+
}
151143
}
152144
});
153145
} else {
154-
Future<Void> future = dup.executeBlocking(new Callable<Void>() {
146+
Future<Void> future = Vertx.currentContext().executeBlocking(new Callable<Void>() {
155147
@Override
156148
public Void call() {
157149
try {
@@ -170,11 +162,6 @@ public Void call() {
170162
future.onFailure(context::reportException);
171163
}
172164
} else {
173-
// Will run on the context used for the consumer registration.
174-
// It's a duplicated context, but we need to mark it as safe.
175-
// The safety comes from the fact that it's instantiated by Vert.x for every
176-
// message.
177-
setCurrentContextSafe(true);
178165
try {
179166
invoker.invoke(m);
180167
} catch (Exception e) {

integration-tests/opentelemetry-vertx/pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
<groupId>io.quarkus</groupId>
2323
<artifactId>quarkus-vertx-http</artifactId>
2424
</dependency>
25+
<dependency>
26+
<groupId>io.quarkus</groupId>
27+
<artifactId>quarkus-rest-jackson</artifactId>
28+
</dependency>
2529
<dependency>
2630
<groupId>io.quarkus</groupId>
2731
<artifactId>quarkus-micrometer</artifactId>
@@ -72,6 +76,19 @@
7276
</exclusion>
7377
</exclusions>
7478
</dependency>
79+
<dependency>
80+
<groupId>io.quarkus</groupId>
81+
<artifactId>quarkus-rest-jackson-deployment</artifactId>
82+
<version>${project.version}</version>
83+
<type>pom</type>
84+
<scope>test</scope>
85+
<exclusions>
86+
<exclusion>
87+
<groupId>*</groupId>
88+
<artifactId>*</artifactId>
89+
</exclusion>
90+
</exclusions>
91+
</dependency>
7592
<dependency>
7693
<groupId>io.quarkus</groupId>
7794
<artifactId>quarkus-vertx-http-deployment</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.quarkus.it.opentelemetry.vertx;
2+
3+
import jakarta.enterprise.context.ApplicationScoped;
4+
5+
import io.opentelemetry.api.trace.Span;
6+
import io.opentelemetry.instrumentation.annotations.WithSpan;
7+
import io.quarkus.logging.Log;
8+
import io.quarkus.vertx.ConsumeEvent;
9+
import io.smallrye.common.annotation.Blocking;
10+
import io.smallrye.common.annotation.RunOnVirtualThread;
11+
import io.vertx.core.MultiMap;
12+
13+
@ApplicationScoped
14+
public class EventBusConsumer {
15+
16+
@ConsumeEvent("pets")
17+
// non-blocking
18+
public String sayHi(Pet pet) {
19+
Log.infov("Received a pet: {0} {1}", pet, Span.current());
20+
process();
21+
return "Hello " + pet.getName() + " (" + pet.getKind() + ")";
22+
}
23+
24+
@ConsumeEvent("persons")
25+
@Blocking
26+
public String name(String name) {
27+
Log.infov("Received a pet: {0} {1}", name, Span.current());
28+
process();
29+
return "Hello " + name;
30+
}
31+
32+
@ConsumeEvent("person-headers")
33+
@RunOnVirtualThread
34+
public String personWithHeader(MultiMap headers, Person person) {
35+
Log.infov("Received a person: {0} {1}", person, Span.current());
36+
process();
37+
String s = "Hello " + person.getFirstName() + " " + person.getLastName() + ", " + headers;
38+
return s;
39+
}
40+
41+
@WithSpan
42+
public void process() {
43+
44+
}
45+
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.quarkus.it.opentelemetry.vertx;
2+
3+
import jakarta.inject.Inject;
4+
import jakarta.ws.rs.POST;
5+
import jakarta.ws.rs.Path;
6+
import jakarta.ws.rs.Produces;
7+
8+
import io.smallrye.mutiny.Uni;
9+
import io.vertx.core.eventbus.DeliveryOptions;
10+
import io.vertx.core.json.JsonObject;
11+
import io.vertx.mutiny.core.eventbus.EventBus;
12+
import io.vertx.mutiny.core.eventbus.Message;
13+
14+
@Path("/event-bus")
15+
public class EventBusSender {
16+
17+
@Inject
18+
EventBus bus;
19+
20+
@POST
21+
@Path("/person")
22+
public Uni<String> helloToPerson(JsonObject json) {
23+
return bus.<String> request("persons", json.getString("name"))
24+
.onItem().transform(Message::body);
25+
}
26+
27+
@POST
28+
@Path("/person2")
29+
@Produces("text/plain")
30+
public Uni<String> helloToPersonWithHeaders(JsonObject json) {
31+
return bus.<String> request(
32+
"person-headers",
33+
new Person(json.getString("firstName"), json.getString("lastName")),
34+
new DeliveryOptions().addHeader("header", "headerValue"))
35+
.onItem().transform(Message::body);
36+
}
37+
38+
@POST
39+
@Path("/pet")
40+
public Uni<String> helloToPet(JsonObject json) {
41+
return bus.<String> request("pets", new Pet(json.getString("name"), json.getString("kind")))
42+
.onItem().transform(Message::body);
43+
}
44+
45+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.quarkus.it.opentelemetry.vertx;
2+
3+
import io.quarkus.runtime.annotations.RegisterForReflection;
4+
5+
@RegisterForReflection
6+
public class Person {
7+
8+
private String firstName;
9+
private String lastName;
10+
11+
public Person(String firstName, String lastName) {
12+
this.firstName = firstName;
13+
this.lastName = lastName;
14+
}
15+
16+
public Person() {
17+
// Used by reflection.
18+
}
19+
20+
public String getFirstName() {
21+
return firstName;
22+
}
23+
24+
public Person setFirstName(String firstName) {
25+
this.firstName = firstName;
26+
return this;
27+
}
28+
29+
public String getLastName() {
30+
return lastName;
31+
}
32+
33+
public Person setLastName(String lastName) {
34+
this.lastName = lastName;
35+
return this;
36+
}
37+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.quarkus.it.opentelemetry.vertx;
2+
3+
/**
4+
* Simple pojo.
5+
* The test using this pojo will use the generic codec facility.
6+
*/
7+
public class Pet {
8+
9+
private final String name;
10+
11+
private final String kind;
12+
13+
public Pet(String name, String kind) {
14+
this.name = name;
15+
this.kind = kind;
16+
}
17+
18+
public String getName() {
19+
return name;
20+
}
21+
22+
public String getKind() {
23+
return kind;
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.quarkus.it.opentelemetry.vertx;
2+
3+
import io.quarkus.test.junit.QuarkusIntegrationTest;
4+
5+
@QuarkusIntegrationTest
6+
public class EventBusIT extends EventBusTest {
7+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package io.quarkus.it.opentelemetry.vertx;
2+
3+
import static io.restassured.RestAssured.given;
4+
import static java.net.HttpURLConnection.HTTP_OK;
5+
import static org.awaitility.Awaitility.await;
6+
import static org.hamcrest.CoreMatchers.equalTo;
7+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
8+
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.concurrent.TimeUnit;
12+
13+
import org.hamcrest.CoreMatchers;
14+
import org.junit.jupiter.api.BeforeEach;
15+
import org.junit.jupiter.api.Test;
16+
17+
import io.opentelemetry.api.trace.SpanId;
18+
import io.opentelemetry.api.trace.SpanKind;
19+
import io.quarkus.test.junit.QuarkusTest;
20+
import io.restassured.http.ContentType;
21+
import io.vertx.core.json.JsonObject;
22+
23+
@QuarkusTest
24+
public class EventBusTest extends SpanExporterBaseTest {
25+
26+
@BeforeEach
27+
void reset() {
28+
given().get("/reset").then().statusCode(HTTP_OK);
29+
await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().isEmpty());
30+
}
31+
32+
@Test
33+
public void testEventBusWithString() {
34+
String body = new JsonObject().put("name", "Bob Morane").toString();
35+
given().contentType(ContentType.JSON).body(body)
36+
.post("/event-bus/person")
37+
.then().statusCode(200).body(equalTo("Hello Bob Morane"));
38+
39+
await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() >= 3);
40+
List<Map<String, Object>> spans = getSpans();
41+
42+
Map<String, Object> serverCall = getSpanByKindAndParentId(spans, SpanKind.SERVER, SpanId.getInvalid());
43+
String spanId = getSpanId(serverCall);
44+
assertNotEquals(SpanId.getInvalid(), spanId);
45+
46+
Map<String, Object> producerSpan = getSpanByKindAndParentId(spans, SpanKind.PRODUCER, spanId);
47+
String producerSpanId = getSpanId(producerSpan);
48+
assertNotEquals(SpanId.getInvalid(), producerSpanId);
49+
50+
Map<String, Object> consumerSpan = getSpanByKindAndParentId(spans, SpanKind.CONSUMER, producerSpanId);
51+
String consumerSpanId = getSpanId(consumerSpan);
52+
assertNotEquals(SpanId.getInvalid(), consumerSpanId);
53+
54+
Map<String, Object> methodCallSpan = getSpanByKindAndParentId(spans, SpanKind.INTERNAL, consumerSpanId);
55+
String methodCallSpanId = getSpanId(methodCallSpan);
56+
assertNotEquals(SpanId.getInvalid(), methodCallSpanId);
57+
}
58+
59+
@Test
60+
public void testEventBusWithObjectAndHeader() {
61+
String body = new JsonObject()
62+
.put("firstName", "Bob")
63+
.put("lastName", "Morane")
64+
.toString();
65+
given().contentType(ContentType.JSON).body(body)
66+
.post("/event-bus/person2")
67+
.then().statusCode(200)
68+
// For some reason Multimap.toString() has \n at the end.
69+
.body(CoreMatchers.startsWith("Hello Bob Morane, header=headerValue\n"));
70+
71+
await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() >= 3);
72+
List<Map<String, Object>> spans = getSpans();
73+
74+
Map<String, Object> serverCall = getSpanByKindAndParentId(spans, SpanKind.SERVER, SpanId.getInvalid());
75+
String spanId = getSpanId(serverCall);
76+
assertNotEquals(SpanId.getInvalid(), spanId);
77+
78+
Map<String, Object> producerSpan = getSpanByKindAndParentId(spans, SpanKind.PRODUCER, spanId);
79+
String producerSpanId = getSpanId(producerSpan);
80+
assertNotEquals(SpanId.getInvalid(), producerSpanId);
81+
82+
Map<String, Object> consumerSpan = getSpanByKindAndParentId(spans, SpanKind.CONSUMER, producerSpanId);
83+
String consumerSpanId = getSpanId(consumerSpan);
84+
assertNotEquals(SpanId.getInvalid(), consumerSpanId);
85+
86+
Map<String, Object> methodCallSpan = getSpanByKindAndParentId(spans, SpanKind.INTERNAL, consumerSpanId);
87+
String methodCallSpanId = getSpanId(methodCallSpan);
88+
assertNotEquals(SpanId.getInvalid(), methodCallSpanId);
89+
}
90+
91+
@Test
92+
public void testEventBusWithPet() {
93+
String body = new JsonObject().put("name", "Neo").put("kind", "rabbit").toString();
94+
given().contentType(ContentType.JSON).body(body)
95+
.post("/event-bus/pet")
96+
.then().statusCode(200).body(equalTo("Hello Neo (rabbit)"));
97+
98+
await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() >= 3);
99+
List<Map<String, Object>> spans = getSpans();
100+
101+
Map<String, Object> serverCall = getSpanByKindAndParentId(spans, SpanKind.SERVER, SpanId.getInvalid());
102+
String spanId = getSpanId(serverCall);
103+
assertNotEquals(SpanId.getInvalid(), spanId);
104+
105+
Map<String, Object> producerSpan = getSpanByKindAndParentId(spans, SpanKind.PRODUCER, spanId);
106+
String producerSpanId = getSpanId(producerSpan);
107+
assertNotEquals(SpanId.getInvalid(), producerSpanId);
108+
109+
Map<String, Object> consumerSpan = getSpanByKindAndParentId(spans, SpanKind.CONSUMER, producerSpanId);
110+
String consumerSpanId = getSpanId(consumerSpan);
111+
assertNotEquals(SpanId.getInvalid(), consumerSpanId);
112+
113+
Map<String, Object> methodCallSpan = getSpanByKindAndParentId(spans, SpanKind.INTERNAL, consumerSpanId);
114+
String methodCallSpanId = getSpanId(methodCallSpan);
115+
assertNotEquals(SpanId.getInvalid(), methodCallSpanId);
116+
}
117+
}

0 commit comments

Comments
 (0)