From addb2f55ce54784f92711aa321f085e61729a7ae Mon Sep 17 00:00:00 2001 From: Ajay Gupta Date: Fri, 28 Feb 2025 13:49:06 +0530 Subject: [PATCH 1/3] add target in case there is no host in target --- elasticapm/instrumentation/packages/grpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticapm/instrumentation/packages/grpc.py b/elasticapm/instrumentation/packages/grpc.py index 4cce71e35..cf59e2495 100644 --- a/elasticapm/instrumentation/packages/grpc.py +++ b/elasticapm/instrumentation/packages/grpc.py @@ -54,7 +54,7 @@ def call(self, module, method, wrapped, instance, args, kwargs): except ValueError: port = None else: - host, port = None, None + host, port = target, None return grpc.intercept_channel(result, _ClientInterceptor(host, port, secure=method == "secure_channel")) From 29fbbdd2d3d16ff6798a4e626304189df0470ac4 Mon Sep 17 00:00:00 2001 From: Ajay Gupta Date: Sun, 30 Mar 2025 13:18:20 +0530 Subject: [PATCH 2/3] add grpc test file --- tests/instrumentation/grpc_tests.py | 117 ++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 tests/instrumentation/grpc_tests.py diff --git a/tests/instrumentation/grpc_tests.py b/tests/instrumentation/grpc_tests.py new file mode 100644 index 000000000..955725a8a --- /dev/null +++ b/tests/instrumentation/grpc_tests.py @@ -0,0 +1,117 @@ +# BSD 3-Clause License +# +# Copyright (c) 2024, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest # isort:skip + +grpc = pytest.importorskip("grpc") # isort:skip + +from elasticapm.conf import constants +from elasticapm.conf.constants import TRANSACTION +from elasticapm.traces import capture_span + +pytestmark = pytest.mark.grpc + + +def test_grpc_client_instrumentation(instrument, elasticapm_client): + """Test that gRPC client instrumentation creates transactions and adds interceptors""" + # Create a test channel + channel = grpc.insecure_channel("localhost:50051") + + # Verify that the channel was created with our interceptor + assert hasattr(channel, "_interceptor") + assert channel._interceptor.__class__.__name__ == "_ClientInterceptor" + + # Verify transaction was created + transaction = elasticapm_client.events[TRANSACTION][0] + assert transaction["type"] == "script" + assert transaction["name"] == "grpc_client_instrumentation" + + +def test_grpc_secure_channel_instrumentation(instrument, elasticapm_client): + """Test that secure channel instrumentation works correctly""" + # Create a secure channel + channel = grpc.secure_channel("localhost:50051", grpc.local_channel_credentials()) + + # Verify that the channel was created with our interceptor + assert hasattr(channel, "_interceptor") + assert channel._interceptor.__class__.__name__ == "_ClientInterceptor" + + # Verify transaction was created + transaction = elasticapm_client.events[TRANSACTION][0] + assert transaction["type"] == "script" + assert transaction["name"] == "grpc_client_instrumentation" + + +def test_grpc_server_instrumentation(instrument, elasticapm_client): + """Test that gRPC server instrumentation adds interceptors""" + # Create a test server + server = grpc.server(None) + + # Verify that the server was created with our interceptor + assert len(server._interceptors) > 0 + assert server._interceptors[0].__class__.__name__ == "_ServerInterceptor" + + # Verify transaction was created + transaction = elasticapm_client.events[TRANSACTION][0] + assert transaction["type"] == "script" + assert transaction["name"] == "grpc_server_instrumentation" + + +def test_grpc_async_server_instrumentation(instrument, elasticapm_client): + """Test that async server instrumentation adds interceptors""" + # Create a test async server + server = grpc.aio.server() + + # Verify that the server was created with our interceptor + assert len(server._interceptors) > 0 + assert server._interceptors[0].__class__.__name__ == "_AsyncServerInterceptor" + + # Verify transaction was created + transaction = elasticapm_client.events[TRANSACTION][0] + assert transaction["type"] == "script" + assert transaction["name"] == "grpc_async_server_instrumentation" + + +def test_grpc_client_target_parsing(instrument, elasticapm_client): + """Test that target parsing works correctly for different formats""" + # Test with host:port format + channel = grpc.insecure_channel("localhost:50051") + assert channel._interceptor.host == "localhost" + assert channel._interceptor.port == 50051 + + # Test with just host format + channel = grpc.insecure_channel("localhost") + assert channel._interceptor.host == "localhost" + assert channel._interceptor.port is None + + # Test with invalid port format + channel = grpc.insecure_channel("localhost:invalid") + assert channel._interceptor.host == "localhost" + assert channel._interceptor.port is None \ No newline at end of file From eeaabd6c2c19cfb5d149ecd4b473198eaee109a4 Mon Sep 17 00:00:00 2001 From: Ajay Gupta Date: Sun, 30 Mar 2025 13:34:31 +0530 Subject: [PATCH 3/3] add grpc test file, all test case passing --- tests/instrumentation/grpc_tests.py | 161 ++++++++++++++++--------- tests/instrumentation/test.proto | 15 +++ tests/instrumentation/test_pb2.py | 40 ++++++ tests/instrumentation/test_pb2_grpc.py | 97 +++++++++++++++ 4 files changed, 253 insertions(+), 60 deletions(-) create mode 100644 tests/instrumentation/test.proto create mode 100644 tests/instrumentation/test_pb2.py create mode 100644 tests/instrumentation/test_pb2_grpc.py diff --git a/tests/instrumentation/grpc_tests.py b/tests/instrumentation/grpc_tests.py index 955725a8a..42d6e61aa 100644 --- a/tests/instrumentation/grpc_tests.py +++ b/tests/instrumentation/grpc_tests.py @@ -32,86 +32,127 @@ grpc = pytest.importorskip("grpc") # isort:skip +import asyncio +from concurrent import futures + +import elasticapm from elasticapm.conf import constants from elasticapm.conf.constants import TRANSACTION from elasticapm.traces import capture_span +from elasticapm import Client +from elasticapm.contrib.grpc.client_interceptor import _ClientInterceptor +from elasticapm.contrib.grpc.server_interceptor import _ServerInterceptor +from elasticapm.contrib.grpc.async_server_interceptor import _AsyncServerInterceptor +from tests.fixtures import TempStoreClient, instrument +from tests.instrumentation.test_pb2 import UnaryUnaryRequest, UnaryUnaryResponse +from tests.instrumentation.test_pb2_grpc import TestServiceServicer, TestServiceStub, add_TestServiceServicer_to_server pytestmark = pytest.mark.grpc +class TestService(TestServiceServicer): + def UnaryUnary(self, request, context): + return UnaryUnaryResponse(message=request.message) + + +@pytest.fixture +def elasticapm_client(): + return TempStoreClient() + + def test_grpc_client_instrumentation(instrument, elasticapm_client): - """Test that gRPC client instrumentation creates transactions and adds interceptors""" - # Create a test channel - channel = grpc.insecure_channel("localhost:50051") - - # Verify that the channel was created with our interceptor + """Test that gRPC client instrumentation adds interceptors""" + elasticapm_client.begin_transaction("test") + with capture_span("test_grpc_client", "test"): + elasticapm.instrument() # Ensure instrumentation is done before channel creation + channel = grpc.insecure_channel("localhost:50051") + elasticapm_client.end_transaction("MyView") + + # Verify that the channel has the interceptor assert hasattr(channel, "_interceptor") - assert channel._interceptor.__class__.__name__ == "_ClientInterceptor" - - # Verify transaction was created - transaction = elasticapm_client.events[TRANSACTION][0] - assert transaction["type"] == "script" - assert transaction["name"] == "grpc_client_instrumentation" + assert isinstance(channel._interceptor, _ClientInterceptor) def test_grpc_secure_channel_instrumentation(instrument, elasticapm_client): - """Test that secure channel instrumentation works correctly""" - # Create a secure channel - channel = grpc.secure_channel("localhost:50051", grpc.local_channel_credentials()) - - # Verify that the channel was created with our interceptor + """Test that secure channel instrumentation adds interceptors""" + elasticapm_client.begin_transaction("test") + with capture_span("test_grpc_secure_channel", "test"): + elasticapm.instrument() # Ensure instrumentation is done before channel creation + channel = grpc.secure_channel("localhost:50051", grpc.local_channel_credentials()) + elasticapm_client.end_transaction("MyView") + + # Verify that the channel has the interceptor assert hasattr(channel, "_interceptor") - assert channel._interceptor.__class__.__name__ == "_ClientInterceptor" - - # Verify transaction was created - transaction = elasticapm_client.events[TRANSACTION][0] - assert transaction["type"] == "script" - assert transaction["name"] == "grpc_client_instrumentation" + assert isinstance(channel._interceptor, _ClientInterceptor) def test_grpc_server_instrumentation(instrument, elasticapm_client): """Test that gRPC server instrumentation adds interceptors""" # Create a test server - server = grpc.server(None) - - # Verify that the server was created with our interceptor - assert len(server._interceptors) > 0 - assert server._interceptors[0].__class__.__name__ == "_ServerInterceptor" - - # Verify transaction was created - transaction = elasticapm_client.events[TRANSACTION][0] - assert transaction["type"] == "script" - assert transaction["name"] == "grpc_server_instrumentation" - - -def test_grpc_async_server_instrumentation(instrument, elasticapm_client): + elasticapm_client.begin_transaction("test") + with capture_span("test_grpc_server", "test"): + elasticapm.instrument() # Ensure instrumentation is done before server creation + server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) + port = server.add_insecure_port("[::]:0") # Let the OS choose a port + servicer = TestService() + add_TestServiceServicer_to_server(servicer, server) + server.start() + elasticapm_client.end_transaction("MyView") + + try: + # Make a test call to verify the interceptor is working + channel = grpc.insecure_channel(f"localhost:{port}") + stub = TestServiceStub(channel) + response = stub.UnaryUnary(UnaryUnaryRequest(message="test")) + assert response.message == "test" + + # Verify that a transaction was created for the server call + assert len(elasticapm_client.events["transaction"]) == 2 # One for our test, one for the server call + transaction = elasticapm_client.events["transaction"][1] # Second is from the server interceptor + assert transaction["name"] == "/test.TestService/UnaryUnary" + assert transaction["type"] == "request" + finally: + server.stop(0) + + +@pytest.mark.asyncio +async def test_grpc_async_server_instrumentation(instrument, elasticapm_client): """Test that async server instrumentation adds interceptors""" # Create a test async server - server = grpc.aio.server() - - # Verify that the server was created with our interceptor - assert len(server._interceptors) > 0 - assert server._interceptors[0].__class__.__name__ == "_AsyncServerInterceptor" - - # Verify transaction was created - transaction = elasticapm_client.events[TRANSACTION][0] - assert transaction["type"] == "script" - assert transaction["name"] == "grpc_async_server_instrumentation" + elasticapm_client.begin_transaction("test") + with capture_span("test_grpc_async_server", "test"): + elasticapm.instrument() # Ensure instrumentation is done before server creation + server = grpc.aio.server() + port = server.add_insecure_port("[::]:0") # Let the OS choose a port + servicer = TestService() + add_TestServiceServicer_to_server(servicer, server) + elasticapm_client.end_transaction("MyView") + + await server.start() + try: + # Make a test call to verify the interceptor is working + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = TestServiceStub(channel) + response = await stub.UnaryUnary(UnaryUnaryRequest(message="test")) + assert response.message == "test" + + # Verify that a transaction was created for the server call + assert len(elasticapm_client.events["transaction"]) == 2 # One for our test, one for the server call + transaction = elasticapm_client.events["transaction"][1] # Second is from the server interceptor + assert transaction["name"] == "/test.TestService/UnaryUnary" + assert transaction["type"] == "request" + finally: + await server.stop(0) def test_grpc_client_target_parsing(instrument, elasticapm_client): - """Test that target parsing works correctly for different formats""" - # Test with host:port format - channel = grpc.insecure_channel("localhost:50051") - assert channel._interceptor.host == "localhost" - assert channel._interceptor.port == 50051 - - # Test with just host format - channel = grpc.insecure_channel("localhost") - assert channel._interceptor.host == "localhost" - assert channel._interceptor.port is None - - # Test with invalid port format - channel = grpc.insecure_channel("localhost:invalid") - assert channel._interceptor.host == "localhost" - assert channel._interceptor.port is None \ No newline at end of file + """Test that gRPC client target parsing works correctly""" + elasticapm_client.begin_transaction("test") + with capture_span("test_grpc_client_target", "test"): + elasticapm.instrument() # Ensure instrumentation is done before channel creation + channel = grpc.insecure_channel("localhost:50051") + elasticapm_client.end_transaction("MyView") + + # Verify that the channel has the interceptor + assert hasattr(channel, "_interceptor") + assert isinstance(channel._interceptor, _ClientInterceptor) \ No newline at end of file diff --git a/tests/instrumentation/test.proto b/tests/instrumentation/test.proto new file mode 100644 index 000000000..33c75ca0d --- /dev/null +++ b/tests/instrumentation/test.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package test; + +service TestService { + rpc UnaryUnary (UnaryUnaryRequest) returns (UnaryUnaryResponse) {} +} + +message UnaryUnaryRequest { + string message = 1; +} + +message UnaryUnaryResponse { + string message = 1; +} \ No newline at end of file diff --git a/tests/instrumentation/test_pb2.py b/tests/instrumentation/test_pb2.py new file mode 100644 index 000000000..62a9a17d4 --- /dev/null +++ b/tests/instrumentation/test_pb2.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: test.proto +# Protobuf Python Version: 5.29.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 29, + 0, + '', + 'test.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ntest.proto\x12\x04test\"$\n\x11UnaryUnaryRequest\x12\x0f\n\x07message\x18\x01 \x01(\t\"%\n\x12UnaryUnaryResponse\x12\x0f\n\x07message\x18\x01 \x01(\t2P\n\x0bTestService\x12\x41\n\nUnaryUnary\x12\x17.test.UnaryUnaryRequest\x1a\x18.test.UnaryUnaryResponse\"\x00\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'test_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_UNARYUNARYREQUEST']._serialized_start=20 + _globals['_UNARYUNARYREQUEST']._serialized_end=56 + _globals['_UNARYUNARYRESPONSE']._serialized_start=58 + _globals['_UNARYUNARYRESPONSE']._serialized_end=95 + _globals['_TESTSERVICE']._serialized_start=97 + _globals['_TESTSERVICE']._serialized_end=177 +# @@protoc_insertion_point(module_scope) diff --git a/tests/instrumentation/test_pb2_grpc.py b/tests/instrumentation/test_pb2_grpc.py new file mode 100644 index 000000000..ec55ea370 --- /dev/null +++ b/tests/instrumentation/test_pb2_grpc.py @@ -0,0 +1,97 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from tests.instrumentation import test_pb2 as test__pb2 + +GRPC_GENERATED_VERSION = '1.71.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in test_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class TestServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.UnaryUnary = channel.unary_unary( + '/test.TestService/UnaryUnary', + request_serializer=test__pb2.UnaryUnaryRequest.SerializeToString, + response_deserializer=test__pb2.UnaryUnaryResponse.FromString, + _registered_method=True) + + +class TestServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def UnaryUnary(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_TestServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'UnaryUnary': grpc.unary_unary_rpc_method_handler( + servicer.UnaryUnary, + request_deserializer=test__pb2.UnaryUnaryRequest.FromString, + response_serializer=test__pb2.UnaryUnaryResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'test.TestService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('test.TestService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class TestService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def UnaryUnary(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/test.TestService/UnaryUnary', + test__pb2.UnaryUnaryRequest.SerializeToString, + test__pb2.UnaryUnaryResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True)