Skip to content

Commit ffecc2c

Browse files
Fixes and options for gRPC instrumentation (#13443)
1 parent 7dc73f0 commit ffecc2c

File tree

9 files changed

+183
-75
lines changed

9 files changed

+183
-75
lines changed

instrumentation/armeria/armeria-grpc-1.14/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/armeria/grpc/v1_14/ArmeriaGrpcTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ void grpcInstrumentation() {
9898
.hasAttributesSatisfyingExactly(
9999
equalTo(
100100
MessageIncubatingAttributes.MESSAGE_TYPE, "RECEIVED"),
101-
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 2L))),
101+
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 1L))),
102102
span ->
103103
span.hasName("example.Greeter/SayHello")
104104
.hasKind(SpanKind.SERVER)
@@ -123,6 +123,6 @@ void grpcInstrumentation() {
123123
.hasName("message")
124124
.hasAttributesSatisfyingExactly(
125125
equalTo(MessageIncubatingAttributes.MESSAGE_TYPE, "SENT"),
126-
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 2L)))));
126+
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 1L)))));
127127
}
128128
}

instrumentation/grpc-1.6/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
| System property | Type | Default | Description |
44
|-------------------------------------------------------------|---------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
5+
| `otel.instrumentation.grpc.emit-message-events` | Boolean | `true` | Determines whether to emit span event for each individual message received and sent. |
56
| `otel.instrumentation.grpc.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |
67
| `otel.instrumentation.grpc.capture-metadata.client.request` | String | | A comma-separated list of request metadata keys. gRPC client instrumentation will capture metadata values corresponding to configured keys as span attributes. |
78
| `otel.instrumentation.grpc.capture-metadata.server.request` | String | | A comma-separated list of request metadata keys. gRPC server instrumentation will capture metadata values corresponding to configured keys as span attributes. |

instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java

+5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ public final class GrpcSingletons {
2727
private static final AtomicReference<Context.Storage> STORAGE_REFERENCE = new AtomicReference<>();
2828

2929
static {
30+
boolean emitMessageEvents =
31+
AgentInstrumentationConfig.get()
32+
.getBoolean("otel.instrumentation.grpc.emit-message-events", true);
33+
3034
boolean experimentalSpanAttributes =
3135
AgentInstrumentationConfig.get()
3236
.getBoolean("otel.instrumentation.grpc.experimental-span-attributes", false);
@@ -40,6 +44,7 @@ public final class GrpcSingletons {
4044

4145
GrpcTelemetry telemetry =
4246
GrpcTelemetry.builder(GlobalOpenTelemetry.get())
47+
.setEmitMessageEvents(emitMessageEvents)
4348
.setCaptureExperimentalSpanAttributes(experimentalSpanAttributes)
4449
.setCapturedClientRequestMetadata(clientRequestMetadata)
4550
.setCapturedServerRequestMetadata(serverRequestMetadata)

instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTelemetry.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -29,31 +29,36 @@ public static GrpcTelemetryBuilder builder(OpenTelemetry openTelemetry) {
2929
private final Instrumenter<GrpcRequest, Status> clientInstrumenter;
3030
private final ContextPropagators propagators;
3131
private final boolean captureExperimentalSpanAttributes;
32+
private final boolean emitMessageEvents;
3233

3334
GrpcTelemetry(
3435
Instrumenter<GrpcRequest, Status> serverInstrumenter,
3536
Instrumenter<GrpcRequest, Status> clientInstrumenter,
3637
ContextPropagators propagators,
37-
boolean captureExperimentalSpanAttributes) {
38+
boolean captureExperimentalSpanAttributes,
39+
boolean emitMessageEvents) {
3840
this.serverInstrumenter = serverInstrumenter;
3941
this.clientInstrumenter = clientInstrumenter;
4042
this.propagators = propagators;
4143
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
44+
this.emitMessageEvents = emitMessageEvents;
4245
}
4346

4447
/**
4548
* Returns a new {@link ClientInterceptor} for use with methods like {@link
4649
* io.grpc.ManagedChannelBuilder#intercept(ClientInterceptor...)}.
4750
*/
4851
public ClientInterceptor newClientInterceptor() {
49-
return new TracingClientInterceptor(clientInstrumenter, propagators);
52+
return new TracingClientInterceptor(
53+
clientInstrumenter, propagators, captureExperimentalSpanAttributes, emitMessageEvents);
5054
}
5155

5256
/**
5357
* Returns a new {@link ServerInterceptor} for use with methods like {@link
5458
* io.grpc.ServerBuilder#intercept(ServerInterceptor)}.
5559
*/
5660
public ServerInterceptor newServerInterceptor() {
57-
return new TracingServerInterceptor(serverInstrumenter, captureExperimentalSpanAttributes);
61+
return new TracingServerInterceptor(
62+
serverInstrumenter, captureExperimentalSpanAttributes, emitMessageEvents);
5863
}
5964
}

instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTelemetryBuilder.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public final class GrpcTelemetryBuilder {
4949
additionalServerExtractors = new ArrayList<>();
5050

5151
private boolean captureExperimentalSpanAttributes;
52+
private boolean emitMessageEvents = true;
5253
private List<String> capturedClientRequestMetadata = Collections.emptyList();
5354
private List<String> capturedServerRequestMetadata = Collections.emptyList();
5455

@@ -130,6 +131,16 @@ public GrpcTelemetryBuilder setPeerService(String peerService) {
130131
return this;
131132
}
132133

134+
/**
135+
* Determines whether to add span event for each individual message received and sent. The default
136+
* is true. Set this to false in case of streaming large volumes of messages.
137+
*/
138+
@CanIgnoreReturnValue
139+
public GrpcTelemetryBuilder setEmitMessageEvents(boolean emitMessageEvents) {
140+
this.emitMessageEvents = emitMessageEvents;
141+
return this;
142+
}
143+
133144
/**
134145
* Sets whether experimental attributes should be set to spans. These attributes may be changed or
135146
* removed in the future, so only enable this if you know you do not require attributes filled by
@@ -211,6 +222,7 @@ public GrpcTelemetry build() {
211222
// So we go ahead and inject manually in this instrumentation.
212223
clientInstrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysClient()),
213224
openTelemetry.getPropagators(),
214-
captureExperimentalSpanAttributes);
225+
captureExperimentalSpanAttributes,
226+
emitMessageEvents);
215227
}
216228
}

instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java

+41-17
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626

2727
final class TracingClientInterceptor implements ClientInterceptor {
2828

29+
private static final AttributeKey<Long> GRPC_RECEIVED_MESSAGE_COUNT =
30+
AttributeKey.longKey("grpc.received.message_count");
31+
private static final AttributeKey<Long> GRPC_SENT_MESSAGE_COUNT =
32+
AttributeKey.longKey("grpc.sent.message_count");
2933
// copied from MessageIncubatingAttributes
3034
private static final AttributeKey<Long> MESSAGE_ID = AttributeKey.longKey("message.id");
3135
private static final AttributeKey<String> MESSAGE_TYPE = AttributeKey.stringKey("message.type");
@@ -34,16 +38,27 @@ final class TracingClientInterceptor implements ClientInterceptor {
3438
private static final String RECEIVED = "RECEIVED";
3539

3640
@SuppressWarnings("rawtypes")
37-
private static final AtomicLongFieldUpdater<TracingClientCall> MESSAGE_ID_UPDATER =
38-
AtomicLongFieldUpdater.newUpdater(TracingClientCall.class, "messageId");
41+
private static final AtomicLongFieldUpdater<TracingClientCall> SENT_MESSAGE_ID_UPDATER =
42+
AtomicLongFieldUpdater.newUpdater(TracingClientCall.class, "sentMessageId");
43+
44+
@SuppressWarnings("rawtypes")
45+
private static final AtomicLongFieldUpdater<TracingClientCall> RECEIVED_MESSAGE_ID_UPDATER =
46+
AtomicLongFieldUpdater.newUpdater(TracingClientCall.class, "receivedMessageId");
3947

4048
private final Instrumenter<GrpcRequest, Status> instrumenter;
4149
private final ContextPropagators propagators;
50+
private final boolean captureExperimentalSpanAttributes;
51+
private final boolean emitMessageEvents;
4252

4353
TracingClientInterceptor(
44-
Instrumenter<GrpcRequest, Status> instrumenter, ContextPropagators propagators) {
54+
Instrumenter<GrpcRequest, Status> instrumenter,
55+
ContextPropagators propagators,
56+
boolean captureExperimentalSpanAttributes,
57+
boolean emitMessageEvents) {
4558
this.instrumenter = instrumenter;
4659
this.propagators = propagators;
60+
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
61+
this.emitMessageEvents = emitMessageEvents;
4762
}
4863

4964
@Override
@@ -75,9 +90,13 @@ final class TracingClientCall<REQUEST, RESPONSE>
7590
private final Context context;
7691
private final GrpcRequest request;
7792

78-
// Used by MESSAGE_ID_UPDATER
93+
// Used by SENT_MESSAGE_ID_UPDATER
94+
@SuppressWarnings("UnusedVariable")
95+
volatile long sentMessageId;
96+
97+
// Used by RECEIVED_MESSAGE_ID_UPDATER
7998
@SuppressWarnings("UnusedVariable")
80-
volatile long messageId;
99+
volatile long receivedMessageId;
81100

82101
TracingClientCall(
83102
ClientCall<REQUEST, RESPONSE> delegate,
@@ -113,10 +132,11 @@ public void sendMessage(REQUEST message) {
113132
instrumenter.end(context, request, Status.UNKNOWN, e);
114133
throw e;
115134
}
116-
Span span = Span.fromContext(context);
117-
Attributes attributes =
118-
Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, MESSAGE_ID_UPDATER.incrementAndGet(this));
119-
span.addEvent("message", attributes);
135+
long messageId = SENT_MESSAGE_ID_UPDATER.incrementAndGet(this);
136+
if (emitMessageEvents) {
137+
Attributes attributes = Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, messageId);
138+
Span.fromContext(context).addEvent("message", attributes);
139+
}
120140
}
121141

122142
final class TracingClientCallListener
@@ -139,14 +159,11 @@ final class TracingClientCallListener
139159

140160
@Override
141161
public void onMessage(RESPONSE message) {
142-
Span span = Span.fromContext(context);
143-
Attributes attributes =
144-
Attributes.of(
145-
MESSAGE_TYPE,
146-
RECEIVED,
147-
MESSAGE_ID,
148-
MESSAGE_ID_UPDATER.incrementAndGet(TracingClientCall.this));
149-
span.addEvent("message", attributes);
162+
long messageId = RECEIVED_MESSAGE_ID_UPDATER.incrementAndGet(TracingClientCall.this);
163+
if (emitMessageEvents) {
164+
Attributes attributes = Attributes.of(MESSAGE_TYPE, RECEIVED, MESSAGE_ID, messageId);
165+
Span.fromContext(context).addEvent("message", attributes);
166+
}
150167
try (Scope ignored = context.makeCurrent()) {
151168
delegate().onMessage(message);
152169
}
@@ -155,6 +172,13 @@ public void onMessage(RESPONSE message) {
155172
@Override
156173
public void onClose(Status status, Metadata trailers) {
157174
request.setPeerSocketAddress(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
175+
if (captureExperimentalSpanAttributes) {
176+
Span span = Span.fromContext(context);
177+
span.setAttribute(
178+
GRPC_RECEIVED_MESSAGE_COUNT, RECEIVED_MESSAGE_ID_UPDATER.get(TracingClientCall.this));
179+
span.setAttribute(
180+
GRPC_SENT_MESSAGE_COUNT, SENT_MESSAGE_ID_UPDATER.get(TracingClientCall.this));
181+
}
158182
instrumenter.end(context, request, status, status.getCause());
159183
try (Scope ignored = parentContext.makeCurrent()) {
160184
delegate().onClose(status, trailers);

instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java

+52-24
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525

2626
final class TracingServerInterceptor implements ServerInterceptor {
2727

28+
private static final AttributeKey<Boolean> GRPC_CANCELED =
29+
AttributeKey.booleanKey("grpc.canceled");
30+
private static final AttributeKey<Long> GRPC_RECEIVED_MESSAGE_COUNT =
31+
AttributeKey.longKey("grpc.received.message_count");
32+
private static final AttributeKey<Long> GRPC_SENT_MESSAGE_COUNT =
33+
AttributeKey.longKey("grpc.sent.message_count");
2834
// copied from MessageIncubatingAttributes
2935
private static final AttributeKey<Long> MESSAGE_ID = AttributeKey.longKey("message.id");
3036
private static final AttributeKey<String> MESSAGE_TYPE = AttributeKey.stringKey("message.type");
@@ -33,19 +39,27 @@ final class TracingServerInterceptor implements ServerInterceptor {
3339
private static final String RECEIVED = "RECEIVED";
3440

3541
@SuppressWarnings("rawtypes")
36-
private static final AtomicLongFieldUpdater<TracingServerCall> MESSAGE_ID_UPDATER =
37-
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "messageId");
42+
private static final AtomicLongFieldUpdater<TracingServerCall> SENT_MESSAGE_ID_UPDATER =
43+
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "sentMessageId");
44+
45+
@SuppressWarnings("rawtypes")
46+
private static final AtomicLongFieldUpdater<TracingServerCall> RECEIVED_MESSAGE_ID_UPDATER =
47+
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "receivedMessageId");
3848

3949
private static final VirtualField<ServerCall<?, ?>, String> AUTHORITY_FIELD =
4050
VirtualField.find(ServerCall.class, String.class);
4151

4252
private final Instrumenter<GrpcRequest, Status> instrumenter;
4353
private final boolean captureExperimentalSpanAttributes;
54+
private final boolean emitMessageEvents;
4455

4556
TracingServerInterceptor(
46-
Instrumenter<GrpcRequest, Status> instrumenter, boolean captureExperimentalSpanAttributes) {
57+
Instrumenter<GrpcRequest, Status> instrumenter,
58+
boolean captureExperimentalSpanAttributes,
59+
boolean emitMessageEvents) {
4760
this.instrumenter = instrumenter;
4861
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
62+
this.emitMessageEvents = emitMessageEvents;
4963
}
5064

5165
@Override
@@ -87,9 +101,13 @@ final class TracingServerCall<REQUEST, RESPONSE>
87101
private final GrpcRequest request;
88102
private Status status;
89103

90-
// Used by MESSAGE_ID_UPDATER
104+
// Used by SENT_MESSAGE_ID_UPDATER
105+
@SuppressWarnings("UnusedVariable")
106+
volatile long sentMessageId;
107+
108+
// Used by RECEIVED_MESSAGE_ID_UPDATER
91109
@SuppressWarnings("UnusedVariable")
92-
volatile long messageId;
110+
volatile long receivedMessageId;
93111

94112
TracingServerCall(
95113
ServerCall<REQUEST, RESPONSE> delegate, Context context, GrpcRequest request) {
@@ -108,10 +126,11 @@ public void sendMessage(RESPONSE message) {
108126
try (Scope ignored = context.makeCurrent()) {
109127
super.sendMessage(message);
110128
}
111-
Span span = Span.fromContext(context);
112-
Attributes attributes =
113-
Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, MESSAGE_ID_UPDATER.incrementAndGet(this));
114-
span.addEvent("message", attributes);
129+
long messageId = SENT_MESSAGE_ID_UPDATER.incrementAndGet(this);
130+
if (emitMessageEvents) {
131+
Attributes attributes = Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, messageId);
132+
Span.fromContext(context).addEvent("message", attributes);
133+
}
115134
}
116135

117136
@Override
@@ -136,15 +155,27 @@ final class TracingServerCallListener
136155
this.request = request;
137156
}
138157

158+
private void end(Context context, GrpcRequest request, Status response, Throwable error) {
159+
if (captureExperimentalSpanAttributes) {
160+
Span span = Span.fromContext(context);
161+
span.setAttribute(
162+
GRPC_RECEIVED_MESSAGE_COUNT, RECEIVED_MESSAGE_ID_UPDATER.get(TracingServerCall.this));
163+
span.setAttribute(
164+
GRPC_SENT_MESSAGE_COUNT, SENT_MESSAGE_ID_UPDATER.get(TracingServerCall.this));
165+
if (Status.CANCELLED.equals(status)) {
166+
span.setAttribute(GRPC_CANCELED, true);
167+
}
168+
}
169+
instrumenter.end(context, request, response, error);
170+
}
171+
139172
@Override
140173
public void onMessage(REQUEST message) {
141-
Attributes attributes =
142-
Attributes.of(
143-
MESSAGE_TYPE,
144-
RECEIVED,
145-
MESSAGE_ID,
146-
MESSAGE_ID_UPDATER.incrementAndGet(TracingServerCall.this));
147-
Span.fromContext(context).addEvent("message", attributes);
174+
long messageId = RECEIVED_MESSAGE_ID_UPDATER.incrementAndGet(TracingServerCall.this);
175+
if (emitMessageEvents) {
176+
Attributes attributes = Attributes.of(MESSAGE_TYPE, RECEIVED, MESSAGE_ID, messageId);
177+
Span.fromContext(context).addEvent("message", attributes);
178+
}
148179
delegate().onMessage(message);
149180
}
150181

@@ -162,36 +193,33 @@ public void onHalfClose() {
162193
public void onCancel() {
163194
try {
164195
delegate().onCancel();
165-
if (captureExperimentalSpanAttributes) {
166-
Span.fromContext(context).setAttribute("grpc.canceled", true);
167-
}
168196
} catch (Throwable e) {
169-
instrumenter.end(context, request, Status.UNKNOWN, e);
197+
end(context, request, Status.UNKNOWN, e);
170198
throw e;
171199
}
172-
instrumenter.end(context, request, Status.CANCELLED, null);
200+
end(context, request, Status.CANCELLED, null);
173201
}
174202

175203
@Override
176204
public void onComplete() {
177205
try {
178206
delegate().onComplete();
179207
} catch (Throwable e) {
180-
instrumenter.end(context, request, Status.UNKNOWN, e);
208+
end(context, request, Status.UNKNOWN, e);
181209
throw e;
182210
}
183211
if (status == null) {
184212
status = Status.UNKNOWN;
185213
}
186-
instrumenter.end(context, request, status, status.getCause());
214+
end(context, request, status, status.getCause());
187215
}
188216

189217
@Override
190218
public void onReady() {
191219
try {
192220
delegate().onReady();
193221
} catch (Throwable e) {
194-
instrumenter.end(context, request, Status.UNKNOWN, e);
222+
end(context, request, Status.UNKNOWN, e);
195223
throw e;
196224
}
197225
}

0 commit comments

Comments
 (0)