Skip to content

Commit 2ee2cf3

Browse files
authored
fix asynchonous unary call traces (open-telemetry#536)
1 parent 753e228 commit 2ee2cf3

File tree

4 files changed

+76
-82
lines changed

4 files changed

+76
-82
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
([#545](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/545))
1313
- `openelemetry-sdk-extension-aws` Take a dependency on `opentelemetry-sdk`
1414
([#558](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/558))
15+
1516
### Changed
1617
- `opentelemetry-instrumentation-tornado` properly instrument work done in tornado on_finish method.
1718
([#499](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/499))
@@ -33,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3334
- Updating dependency for opentelemetry api/sdk packages to support major version instead
3435
of pinning to specific versions.
3536
([#567](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/567))
37+
- `opentelemetry-instrumentation-grpc` Fixed asynchonous unary call traces
38+
([#536](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/536))
3639

3740
### Added
3841
- `opentelemetry-instrumentation-httpx` Add `httpx` instrumentation

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py

+51-82
Original file line numberDiff line numberDiff line change
@@ -33,27 +33,6 @@
3333
from opentelemetry.trace.status import Status, StatusCode
3434

3535

36-
class _GuardedSpan:
37-
def __init__(self, span):
38-
self.span = span
39-
self.generated_span = None
40-
self._engaged = True
41-
42-
def __enter__(self):
43-
self.generated_span = self.span.__enter__()
44-
return self
45-
46-
def __exit__(self, *args, **kwargs):
47-
if self._engaged:
48-
self.generated_span = None
49-
return self.span.__exit__(*args, **kwargs)
50-
return False
51-
52-
def release(self):
53-
self._engaged = False
54-
return self.span
55-
56-
5736
class _CarrierSetter(Setter):
5837
"""We use a custom setter in order to be able to lower case
5938
keys as is required by grpc.
@@ -68,7 +47,7 @@ def set(self, carrier: MutableMapping[str, str], key: str, value: str):
6847

6948
def _make_future_done_callback(span, rpc_info):
7049
def callback(response_future):
71-
with span:
50+
with trace.use_span(span, end_on_exit=True):
7251
code = response_future.code()
7352
if code != grpc.StatusCode.OK:
7453
rpc_info.error = code
@@ -85,7 +64,7 @@ class OpenTelemetryClientInterceptor(
8564
def __init__(self, tracer):
8665
self._tracer = tracer
8766

88-
def _start_span(self, method):
67+
def _start_span(self, method, **kwargs):
8968
service, meth = method.lstrip("/").split("/", 1)
9069
attributes = {
9170
SpanAttributes.RPC_SYSTEM: "grpc",
@@ -95,16 +74,19 @@ def _start_span(self, method):
9574
}
9675

9776
return self._tracer.start_as_current_span(
98-
name=method, kind=trace.SpanKind.CLIENT, attributes=attributes
77+
name=method,
78+
kind=trace.SpanKind.CLIENT,
79+
attributes=attributes,
80+
**kwargs,
9981
)
10082

10183
# pylint:disable=no-self-use
102-
def _trace_result(self, guarded_span, rpc_info, result):
103-
# If the RPC is called asynchronously, release the guard and add a
104-
# callback so that the span can be finished once the future is done.
84+
def _trace_result(self, span, rpc_info, result):
85+
# If the RPC is called asynchronously, add a callback to end the span
86+
# when the future is done, else end the span immediately
10587
if isinstance(result, grpc.Future):
10688
result.add_done_callback(
107-
_make_future_done_callback(guarded_span.release(), rpc_info)
89+
_make_future_done_callback(span, rpc_info)
10890
)
10991
return result
11092
response = result
@@ -115,41 +97,54 @@ def _trace_result(self, guarded_span, rpc_info, result):
11597
if isinstance(result, tuple):
11698
response = result[0]
11799
rpc_info.response = response
118-
100+
span.end()
119101
return result
120102

121-
def _start_guarded_span(self, *args, **kwargs):
122-
return _GuardedSpan(self._start_span(*args, **kwargs))
123-
124-
def intercept_unary(self, request, metadata, client_info, invoker):
103+
def _intercept(self, request, metadata, client_info, invoker):
125104
if not metadata:
126105
mutable_metadata = OrderedDict()
127106
else:
128107
mutable_metadata = OrderedDict(metadata)
129-
130-
with self._start_guarded_span(client_info.full_method) as guarded_span:
131-
inject(mutable_metadata, setter=_carrier_setter)
132-
metadata = tuple(mutable_metadata.items())
133-
134-
rpc_info = RpcInfo(
135-
full_method=client_info.full_method,
136-
metadata=metadata,
137-
timeout=client_info.timeout,
138-
request=request,
139-
)
140-
108+
with self._start_span(
109+
client_info.full_method,
110+
end_on_exit=False,
111+
record_exception=False,
112+
set_status_on_exception=False,
113+
) as span:
114+
result = None
141115
try:
142-
result = invoker(request, metadata)
143-
except grpc.RpcError as err:
144-
guarded_span.generated_span.set_status(
145-
Status(StatusCode.ERROR)
116+
inject(mutable_metadata, setter=_carrier_setter)
117+
metadata = tuple(mutable_metadata.items())
118+
119+
rpc_info = RpcInfo(
120+
full_method=client_info.full_method,
121+
metadata=metadata,
122+
timeout=client_info.timeout,
123+
request=request,
146124
)
147-
guarded_span.generated_span.set_attribute(
148-
SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0]
125+
126+
result = invoker(request, metadata)
127+
except Exception as exc:
128+
if isinstance(exc, grpc.RpcError):
129+
span.set_attribute(
130+
SpanAttributes.RPC_GRPC_STATUS_CODE,
131+
exc.code().value[0],
132+
)
133+
span.set_status(
134+
Status(
135+
status_code=StatusCode.ERROR,
136+
description="{}: {}".format(type(exc).__name__, exc),
137+
)
149138
)
150-
raise err
139+
span.record_exception(exc)
140+
raise exc
141+
finally:
142+
if not result:
143+
span.end()
144+
return self._trace_result(span, rpc_info, result)
151145

152-
return self._trace_result(guarded_span, rpc_info, result)
146+
def intercept_unary(self, request, metadata, client_info, invoker):
147+
return self._intercept(request, metadata, client_info, invoker)
153148

154149
# For RPCs that stream responses, the result can be a generator. To record
155150
# the span across the generated responses and detect any errors, we wrap
@@ -194,32 +189,6 @@ def intercept_stream(
194189
request_or_iterator, metadata, client_info, invoker
195190
)
196191

197-
if not metadata:
198-
mutable_metadata = OrderedDict()
199-
else:
200-
mutable_metadata = OrderedDict(metadata)
201-
202-
with self._start_guarded_span(client_info.full_method) as guarded_span:
203-
inject(mutable_metadata, setter=_carrier_setter)
204-
metadata = tuple(mutable_metadata.items())
205-
rpc_info = RpcInfo(
206-
full_method=client_info.full_method,
207-
metadata=metadata,
208-
timeout=client_info.timeout,
209-
request=request_or_iterator,
210-
)
211-
212-
rpc_info.request = request_or_iterator
213-
214-
try:
215-
result = invoker(request_or_iterator, metadata)
216-
except grpc.RpcError as err:
217-
guarded_span.generated_span.set_status(
218-
Status(StatusCode.ERROR)
219-
)
220-
guarded_span.generated_span.set_attribute(
221-
SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0],
222-
)
223-
raise err
224-
225-
return self._trace_result(guarded_span, rpc_info, result)
192+
return self._intercept(
193+
request_or_iterator, metadata, client_info, invoker
194+
)

instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ def simple_method(stub, error=False):
2424
stub.SimpleMethod(request)
2525

2626

27+
def simple_method_future(stub, error=False):
28+
request = Request(
29+
client_id=CLIENT_ID, request_data="error" if error else "data"
30+
)
31+
return stub.SimpleMethod.future(request)
32+
33+
2734
def client_streaming_method(stub, error=False):
2835
# create a generator
2936
def request_messages():

instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py

+15
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
client_streaming_method,
3737
server_streaming_method,
3838
simple_method,
39+
simple_method_future,
3940
)
4041
from ._server import create_test_server
4142
from .protobuf.test_server_pb2 import Request
@@ -100,6 +101,20 @@ def tearDown(self):
100101
self.server.stop(None)
101102
self.channel.close()
102103

104+
def test_unary_unary_future(self):
105+
simple_method_future(self._stub).result()
106+
spans = self.memory_exporter.get_finished_spans()
107+
self.assertEqual(len(spans), 1)
108+
span = spans[0]
109+
110+
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
111+
self.assertIs(span.kind, trace.SpanKind.CLIENT)
112+
113+
# Check version and name in span's instrumentation info
114+
self.check_span_instrumentation_info(
115+
span, opentelemetry.instrumentation.grpc
116+
)
117+
103118
def test_unary_unary(self):
104119
simple_method(self._stub)
105120
spans = self.memory_exporter.get_finished_spans()

0 commit comments

Comments
 (0)