Skip to content

Commit f37e4fa

Browse files
author
希铭
committed
add it for aliyun-mns-sdk
1 parent 7b39cd5 commit f37e4fa

File tree

13 files changed

+1071
-7
lines changed

13 files changed

+1071
-7
lines changed

messaging-wrappers/aliyun-mns-sdk/build.gradle.kts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,22 @@ dependencies {
1212

1313
compileOnly("com.aliyun.mns:aliyun-sdk-mns:1.3.0")
1414

15+
testImplementation("com.aliyun.mns:aliyun-sdk-mns:1.3.0")
1516
testImplementation(project(":messaging-wrappers:testing"))
16-
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
17-
testImplementation("io.opentelemetry:opentelemetry-sdk-trace")
18-
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
19-
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
2017

21-
testImplementation("com.aliyun.mns:aliyun-sdk-mns:1.3.0")
22-
testImplementation("org.springframework.boot:spring-boot-starter-web:3.0.0")
18+
testImplementation("org.springframework.boot:spring-boot-starter-web:2.7.18")
19+
testImplementation("org.springframework.boot:spring-boot-starter-test:2.7.18")
20+
}
21+
22+
tasks {
23+
withType<Test>().configureEach {
24+
jvmArgs("-Dotel.java.global-autoconfigure.enabled=true")
25+
// TODO: According to https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#message-creation-context-as-parent-of-process-span,
26+
// process span should be the child of receive span. However, we couldn't access the trace context with receive span
27+
// in wrappers, unless we add a generic accessor for that.
28+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
29+
jvmArgs("-Dotel.traces.exporter=logging")
30+
jvmArgs("-Dotel.metrics.exporter=logging")
31+
jvmArgs("-Dotel.logs.exporter=logging")
32+
}
2333
}

messaging-wrappers/aliyun-mns-sdk/src/main/java/io/opentelemetry/contrib/messaging/wrappers/mns/semconv/MnsProcessRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public Long getMessageBodySize() {
6868
@Nullable
6969
@Override
7070
public Long getMessageEnvelopeSize() {
71-
return null;
71+
return (long) message.getMessageBodyAsRawBytes().length;
7272
}
7373

7474
@Nullable
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.messaging.wrappers.mns;
7+
8+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
9+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
10+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
11+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
12+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
13+
import static org.assertj.core.api.Assertions.assertThat;
14+
15+
import com.aliyun.mns.client.CloudAccount;
16+
import com.aliyun.mns.client.CloudQueue;
17+
import com.aliyun.mns.client.MNSClient;
18+
import com.aliyun.mns.common.ServiceHandlingRequiredException;
19+
import com.aliyun.mns.common.http.ClientConfiguration;
20+
import com.aliyun.mns.model.Message;
21+
import io.opentelemetry.api.GlobalOpenTelemetry;
22+
import io.opentelemetry.api.OpenTelemetry;
23+
import io.opentelemetry.api.trace.Span;
24+
import io.opentelemetry.api.trace.SpanKind;
25+
import io.opentelemetry.api.trace.Tracer;
26+
import io.opentelemetry.context.Context;
27+
import io.opentelemetry.context.Scope;
28+
import io.opentelemetry.contrib.messaging.wrappers.MessagingProcessWrapper;
29+
import io.opentelemetry.contrib.messaging.wrappers.mns.broker.SmqMockedBroker;
30+
import io.opentelemetry.contrib.messaging.wrappers.mns.semconv.MnsProcessRequest;
31+
import io.opentelemetry.contrib.messaging.wrappers.testing.AbstractBaseTest;
32+
import java.nio.charset.StandardCharsets;
33+
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.TestInstance;
36+
import org.springframework.boot.test.context.SpringBootTest;
37+
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
38+
import org.springframework.boot.test.web.server.LocalServerPort;
39+
40+
@SuppressWarnings("OtelInternalJavadoc")
41+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
42+
@SpringBootTest(
43+
classes = {SmqMockedBroker.class},
44+
webEnvironment = WebEnvironment.RANDOM_PORT)
45+
public class AliyunMnsSdkTest extends AbstractBaseTest {
46+
47+
private static final String TEST_ENDPOINT = "http://test.mns.cn-hangzhou.aliyuncs.com";
48+
49+
private static final String QUEUE = "TEST_QUEUE";
50+
51+
private static final String MESSAGE_BODY = "Hello OpenTelemetry";
52+
53+
@LocalServerPort private int testApplicationPort; // port at which the spring app is running
54+
55+
private MNSClient mnsClient;
56+
57+
private CloudQueue queue;
58+
59+
private OpenTelemetry otel;
60+
61+
private Tracer tracer;
62+
63+
private MessagingProcessWrapper<MnsProcessRequest> wrapper;
64+
65+
@BeforeAll
66+
void setupClass() {
67+
otel = GlobalOpenTelemetry.get();
68+
tracer = otel.getTracer("test-tracer", "1.0.0");
69+
wrapper = MnsHelper.processWrapperBuilder().openTelemetry(otel).build();
70+
71+
ClientConfiguration configuration = new ClientConfiguration();
72+
configuration.setProxyHost("127.0.0.1");
73+
configuration.setProxyPort(testApplicationPort);
74+
75+
CloudAccount account = new CloudAccount("test-ak", "test-sk", TEST_ENDPOINT, configuration);
76+
77+
mnsClient = account.getMNSClient();
78+
queue = mnsClient.getQueueRef(QUEUE);
79+
}
80+
81+
@Test
82+
void testSendAndConsume() throws ServiceHandlingRequiredException {
83+
sendWithParent();
84+
85+
consumeWithChild();
86+
87+
assertTraces();
88+
}
89+
90+
public void sendWithParent() {
91+
// mock a send span
92+
Span parent = tracer.spanBuilder("publish " + QUEUE).setSpanKind(SpanKind.PRODUCER).startSpan();
93+
94+
try (Scope scope = parent.makeCurrent()) {
95+
Message message = new Message(MESSAGE_BODY);
96+
otel.getPropagators()
97+
.getTextMapPropagator()
98+
.inject(Context.current(), message, MnsTextMapSetter.INSTANCE);
99+
queue.putMessage(message);
100+
}
101+
102+
parent.end();
103+
}
104+
105+
public void consumeWithChild() throws ServiceHandlingRequiredException {
106+
// check that the message was received
107+
Message message = null;
108+
for (int i = 0; i < 3; i++) {
109+
message = queue.popMessage(3);
110+
if (message != null) {
111+
break;
112+
}
113+
}
114+
115+
assertThat(message).isNotNull();
116+
117+
wrapper.doProcess(
118+
MnsProcessRequest.of(message, QUEUE),
119+
() -> {
120+
tracer.spanBuilder("process child").startSpan().end();
121+
});
122+
}
123+
124+
/**
125+
* Copied from <a
126+
* href=https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/testing-common>testing-common</a>.
127+
*/
128+
@SuppressWarnings("deprecation") // using deprecated semconv
129+
public void assertTraces() {
130+
waitAndAssertTraces(
131+
sortByRootSpanName("parent", "producer callback"),
132+
trace ->
133+
trace.hasSpansSatisfyingExactly(
134+
span ->
135+
// No need to verify the attribute here because it is generated by
136+
// instrumentation library.
137+
span.hasName("publish " + QUEUE).hasKind(SpanKind.PRODUCER).hasNoParent(),
138+
span ->
139+
span.hasName("process " + QUEUE)
140+
.hasKind(SpanKind.CONSUMER)
141+
.hasParent(trace.getSpan(0))
142+
.hasAttributesSatisfyingExactly(
143+
equalTo(MESSAGING_SYSTEM, "guava-eventbus"),
144+
equalTo(MESSAGING_DESTINATION_NAME, QUEUE),
145+
equalTo(
146+
MESSAGING_MESSAGE_BODY_SIZE,
147+
MESSAGE_BODY.getBytes(StandardCharsets.UTF_8).length),
148+
equalTo(MESSAGING_OPERATION, "process")),
149+
span ->
150+
span.hasName("process child")
151+
.hasKind(SpanKind.INTERNAL)
152+
.hasParent(trace.getSpan(1))));
153+
}
154+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.messaging.wrappers.mns;
7+
8+
import com.aliyun.mns.model.Message;
9+
import com.aliyun.mns.model.MessagePropertyValue;
10+
import com.aliyun.mns.model.MessageSystemPropertyName;
11+
import com.aliyun.mns.model.MessageSystemPropertyValue;
12+
import com.aliyun.mns.model.SystemPropertyType;
13+
import io.opentelemetry.context.propagation.TextMapSetter;
14+
import javax.annotation.Nullable;
15+
16+
public enum MnsTextMapSetter implements TextMapSetter<Message> {
17+
INSTANCE;
18+
19+
/**
20+
* MNS message trails currently only support the W3C protocol; other protocol headers should be
21+
* injected into userProperties.
22+
*/
23+
@Override
24+
public void set(@Nullable Message message, String key, String value) {
25+
if (message == null) {
26+
return;
27+
}
28+
MessageSystemPropertyName systemPropertyName = MnsHelper.convert2SystemPropertyName(key);
29+
if (systemPropertyName != null) {
30+
message.putSystemProperty(
31+
systemPropertyName, new MessageSystemPropertyValue(SystemPropertyType.STRING, value));
32+
} else {
33+
message.getUserProperties().put(key, new MessagePropertyValue(value));
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)