Skip to content

add target in case there is no host in target #2225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion elasticapm/instrumentation/packages/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def call(self, module, method, wrapped, instance, args, kwargs):
except ValueError:
port = None
else:
host, port = None, None
host, port = target, None
return grpc.intercept_channel(result, _ClientInterceptor(host, port, secure=method == "secure_channel"))


Expand Down
158 changes: 158 additions & 0 deletions tests/instrumentation/grpc_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# BSD 3-Clause License
#
# Copyright (c) 2024, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import pytest # isort:skip

grpc = pytest.importorskip("grpc") # isort:skip

import asyncio
from concurrent import futures

import elasticapm
from elasticapm.conf import constants
from elasticapm.conf.constants import TRANSACTION
from elasticapm.traces import capture_span
from elasticapm import Client
from elasticapm.contrib.grpc.client_interceptor import _ClientInterceptor
from elasticapm.contrib.grpc.server_interceptor import _ServerInterceptor
from elasticapm.contrib.grpc.async_server_interceptor import _AsyncServerInterceptor
from tests.fixtures import TempStoreClient, instrument
from tests.instrumentation.test_pb2 import UnaryUnaryRequest, UnaryUnaryResponse
from tests.instrumentation.test_pb2_grpc import TestServiceServicer, TestServiceStub, add_TestServiceServicer_to_server

pytestmark = pytest.mark.grpc


class TestService(TestServiceServicer):
def UnaryUnary(self, request, context):
return UnaryUnaryResponse(message=request.message)


@pytest.fixture
def elasticapm_client():
return TempStoreClient()


def test_grpc_client_instrumentation(instrument, elasticapm_client):
"""Test that gRPC client instrumentation adds interceptors"""
elasticapm_client.begin_transaction("test")
with capture_span("test_grpc_client", "test"):
elasticapm.instrument() # Ensure instrumentation is done before channel creation
channel = grpc.insecure_channel("localhost:50051")
elasticapm_client.end_transaction("MyView")

# Verify that the channel has the interceptor
assert hasattr(channel, "_interceptor")
assert isinstance(channel._interceptor, _ClientInterceptor)


def test_grpc_secure_channel_instrumentation(instrument, elasticapm_client):
"""Test that secure channel instrumentation adds interceptors"""
elasticapm_client.begin_transaction("test")
with capture_span("test_grpc_secure_channel", "test"):
elasticapm.instrument() # Ensure instrumentation is done before channel creation
channel = grpc.secure_channel("localhost:50051", grpc.local_channel_credentials())
elasticapm_client.end_transaction("MyView")

# Verify that the channel has the interceptor
assert hasattr(channel, "_interceptor")
assert isinstance(channel._interceptor, _ClientInterceptor)


def test_grpc_server_instrumentation(instrument, elasticapm_client):
"""Test that gRPC server instrumentation adds interceptors"""
# Create a test server
elasticapm_client.begin_transaction("test")
with capture_span("test_grpc_server", "test"):
elasticapm.instrument() # Ensure instrumentation is done before server creation
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
port = server.add_insecure_port("[::]:0") # Let the OS choose a port
servicer = TestService()
add_TestServiceServicer_to_server(servicer, server)
server.start()
elasticapm_client.end_transaction("MyView")

try:
# Make a test call to verify the interceptor is working
channel = grpc.insecure_channel(f"localhost:{port}")
stub = TestServiceStub(channel)
response = stub.UnaryUnary(UnaryUnaryRequest(message="test"))
assert response.message == "test"

# Verify that a transaction was created for the server call
assert len(elasticapm_client.events["transaction"]) == 2 # One for our test, one for the server call
transaction = elasticapm_client.events["transaction"][1] # Second is from the server interceptor
assert transaction["name"] == "/test.TestService/UnaryUnary"
assert transaction["type"] == "request"
finally:
server.stop(0)


@pytest.mark.asyncio
async def test_grpc_async_server_instrumentation(instrument, elasticapm_client):
"""Test that async server instrumentation adds interceptors"""
# Create a test async server
elasticapm_client.begin_transaction("test")
with capture_span("test_grpc_async_server", "test"):
elasticapm.instrument() # Ensure instrumentation is done before server creation
server = grpc.aio.server()
port = server.add_insecure_port("[::]:0") # Let the OS choose a port
servicer = TestService()
add_TestServiceServicer_to_server(servicer, server)
elasticapm_client.end_transaction("MyView")

await server.start()
try:
# Make a test call to verify the interceptor is working
channel = grpc.aio.insecure_channel(f"localhost:{port}")
stub = TestServiceStub(channel)
response = await stub.UnaryUnary(UnaryUnaryRequest(message="test"))
assert response.message == "test"

# Verify that a transaction was created for the server call
assert len(elasticapm_client.events["transaction"]) == 2 # One for our test, one for the server call
transaction = elasticapm_client.events["transaction"][1] # Second is from the server interceptor
assert transaction["name"] == "/test.TestService/UnaryUnary"
assert transaction["type"] == "request"
finally:
await server.stop(0)


def test_grpc_client_target_parsing(instrument, elasticapm_client):
"""Test that gRPC client target parsing works correctly"""
elasticapm_client.begin_transaction("test")
with capture_span("test_grpc_client_target", "test"):
elasticapm.instrument() # Ensure instrumentation is done before channel creation
channel = grpc.insecure_channel("localhost:50051")
elasticapm_client.end_transaction("MyView")

# Verify that the channel has the interceptor
assert hasattr(channel, "_interceptor")
assert isinstance(channel._interceptor, _ClientInterceptor)
15 changes: 15 additions & 0 deletions tests/instrumentation/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

package test;

service TestService {
rpc UnaryUnary (UnaryUnaryRequest) returns (UnaryUnaryResponse) {}
}

message UnaryUnaryRequest {
string message = 1;
}

message UnaryUnaryResponse {
string message = 1;
}
40 changes: 40 additions & 0 deletions tests/instrumentation/test_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 97 additions & 0 deletions tests/instrumentation/test_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings

from tests.instrumentation import test_pb2 as test__pb2

GRPC_GENERATED_VERSION = '1.71.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True

if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in test_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)


class TestServiceStub(object):
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.

Args:
channel: A grpc.Channel.
"""
self.UnaryUnary = channel.unary_unary(
'/test.TestService/UnaryUnary',
request_serializer=test__pb2.UnaryUnaryRequest.SerializeToString,
response_deserializer=test__pb2.UnaryUnaryResponse.FromString,
_registered_method=True)


class TestServiceServicer(object):
"""Missing associated documentation comment in .proto file."""

def UnaryUnary(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_TestServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'UnaryUnary': grpc.unary_unary_rpc_method_handler(
servicer.UnaryUnary,
request_deserializer=test__pb2.UnaryUnaryRequest.FromString,
response_serializer=test__pb2.UnaryUnaryResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'test.TestService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('test.TestService', rpc_method_handlers)


# This class is part of an EXPERIMENTAL API.
class TestService(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def UnaryUnary(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/test.TestService/UnaryUnary',
test__pb2.UnaryUnaryRequest.SerializeToString,
test__pb2.UnaryUnaryResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
Loading