Skip to content

gRPC AIO Client Instrumentation Unary-Unary Response Hook Bad Arguments #3490

Open
@agcom

Description

@agcom

Describe your environment

OS: Ubuntu 24.04.2
Python version: Python 3.12.3
Package version: 0.54b0

What happened?

The gRPC AIO client instrumentation response hook for unary-unary receives as arguments the span and the details of the call, but it should receive the span and the response, just like the non-AIO version.

Steps to Reproduce

requirements.txt:

grpcio~=1.71
protobuf~=5.29
grpcio-tools~=1.71

opentelemetry-instrumentation-grpc~=0.54b0
opentelemetry-api~=1.33
opentelemetry-sdk~=1.33
pip install -r ./requirements.txt

greeter.proto:

syntax = "proto3";

service Greeter {
  rpc SayHello(HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}
python -m grpc_tools.protoc -I ./ --python_out ./ --grpc_python_out ./ --pyi_out ./ ./greeter.proto

async.py:

import asyncio

import grpc.aio
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorClient
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter

from greeter_pb2 import HelloReply, HelloRequest
from greeter_pb2_grpc import GreeterServicer, add_GreeterServicer_to_server, GreeterStub


class GreeterServicerImpl(GreeterServicer):
	async def SayHello(self, request, context):
		return HelloReply(message=f"Hello, {request.name}!")


tracer_provider = TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(tracer_provider)


def request_hook(span, request):
	print(f"Request hook called with a {type(span)} and a {type(request)}.")


def response_hook(span, response):
	print(f"Response hook called with a {type(span)} and a {type(response)}.")


GrpcAioInstrumentorClient().instrument(request_hook=request_hook, response_hook=response_hook)


async def amain():
	server = grpc.aio.server()
	add_GreeterServicer_to_server(GreeterServicerImpl(), server)
	server.add_insecure_port('localhost:50051')
	await server.start()
	
	async with grpc.aio.insecure_channel('localhost:50051') as channel:
		stub = GreeterStub(channel)
		print((await stub.SayHello(HelloRequest(name='Alireza'))).message)
	
	await server.stop(None)

asyncio.run(amain())

sync.py:

import concurrent.futures

import grpc
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter

from greeter_pb2 import HelloReply, HelloRequest
from greeter_pb2_grpc import GreeterServicer, add_GreeterServicer_to_server, GreeterStub


class GreeterServicerImpl(GreeterServicer):
	def SayHello(self, request, context):
		return HelloReply(message=f"Hello, {request.name}!")


tracer_provider = TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(tracer_provider)


def request_hook(span, request):
	print(f"Request hook called with a {type(span)} and a {type(request)}.")


def response_hook(span, response):
	print(f"Response hook called with a {type(span)} and a {type(response)}.")


GrpcInstrumentorClient().instrument(request_hook=request_hook, response_hook=response_hook)

server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=1))
add_GreeterServicer_to_server(GreeterServicerImpl(), server)
server.add_insecure_port('localhost:50051')
server.start()

with grpc.insecure_channel('localhost:50051') as channel:
	stub = GreeterStub(channel)
	print(stub.SayHello(HelloRequest(name='Alireza')).message)

server.stop(None)

Expected Result

async.py:

Request hook called with a <class 'opentelemetry.sdk.trace._Span'> and a <class 'greeter_pb2.HelloRequest'>.
Response hook called with a <class 'opentelemetry.sdk.trace._Span'> and a <class 'greeter_pb2.HelloReply'>.
...

sync.py:

Request hook called with a <class 'opentelemetry.sdk.trace._Span'> and a <class 'greeter_pb2.HelloRequest'>.
Response hook called with a <class 'opentelemetry.sdk.trace._Span'> and a <class 'greeter_pb2.HelloReply'>.
...

Actual Result

async.py:

Request hook called with a <class 'opentelemetry.sdk.trace._Span'> and a <class 'greeter_pb2.HelloRequest'>.
Response hook called with a <class 'opentelemetry.sdk.trace._Span'> and a <class 'str'>.
...

sync.py:

Request hook called with a <class 'opentelemetry.sdk.trace._Span'> and a <class 'greeter_pb2.HelloRequest'>.
Response hook called with a <class 'opentelemetry.sdk.trace._Span'> and a <class 'greeter_pb2.HelloReply'>.
...

Additional context

These are the solutions that came into my mind.

First Solution: Quick One-liner Fix

One way to fix this is to change this line

into

response_hook(span, call._call_response.result())

but that is by accessing a protected member, which should be normally accessed via an await call in an async function; also, the result (.result() call) is probably sitting there as it is a done callback, but there is no documented guarantees.

Second Solution: Avoid Done Callback

Another possible fix that came to mind is to change these lines

async def _wrap_unary_response(self, continuation, span):
try:
call = await continuation()
# code and details are both coroutines that need to be await-ed,
# the callbacks added with add_done_callback do not allow async
# code so we need to get the code and details here then pass them
# to the callback.
code = await call.code()
details = await call.details()
call.add_done_callback(
_unary_done_callback(
span, code, details, self._call_response_hook
)
)
return call
except grpc.aio.AioRpcError as exc:
self.add_error_details_to_span(span, exc)
raise exc

into

    async def _wrap_unary_response(self, continuation, span):
        try:
            call = await continuation()

            code = await call.code()
            details = await call.details()

            span.set_attribute(SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0])
            if code != grpc.StatusCode.OK:
                span.set_status(Status(status_code=StatusCode.ERROR, description=details))

            response = await call
            self._call_response_hook(span, response)

            return call
        except grpc.aio.AioRpcError as exc:
            self.add_error_details_to_span(span, exc)
            raise exc

The same idea is used for the stream response:

async def _wrap_stream_response(self, span, call):
try:
async for response in call:
if self._response_hook:
self._call_response_hook(span, response)
yield response
except Exception as exc:
self.add_error_details_to_span(span, exc)
raise exc
finally:
span.end()

I am not a expert in how gRPC interceptors work, and I am worried that it might mess-up the interceptors stack, yet it seems promising as the same approach is used for the stream response case; also, there is a span.end() call in the done callback that is being omitted, but there is already one at the upstream context manager exit call.

Just found out about the end_on_exit argument, e.g. the following line, which is used across the package and should be watched out for regarding the span end:

Would you like to implement a fix?

Yes

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions