Skip to content

pika: added instrumentation for pika.connection.Connection and pika.c… #3584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- `opentelemetry-instrumentation-pika` Added instrumentation for All `SelectConnection` adapters
([#3584](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3584))

## Version 1.34.0/0.55b0 (2025-06-04)

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# pylint: disable=unnecessary-dunder-call

from logging import getLogger
from typing import Any, Collection, Dict, Optional
from typing import Any, Collection, Dict, Optional, Union

import pika
import wrapt
Expand All @@ -24,6 +24,8 @@
BlockingChannel,
_QueueConsumerGeneratorInfo,
)
from pika.channel import Channel
from pika.connection import Connection

from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
Expand Down Expand Up @@ -53,12 +55,16 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore

# pylint: disable=attribute-defined-outside-init
@staticmethod
def _instrument_blocking_channel_consumers(
channel: BlockingChannel,
def _instrument_channel_consumers(
channel: Union[BlockingChannel, Channel],
tracer: Tracer,
consume_hook: utils.HookT = utils.dummy_callback,
) -> Any:
for consumer_tag, consumer_info in channel._consumer_infos.items():
if isinstance(channel, BlockingChannel):
consumer_infos = channel._consumer_infos
elif isinstance(channel, Channel):
consumer_infos = channel._consumers
for consumer_tag, consumer_info in consumer_infos.items():
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
consumer_callback = getattr(consumer_info, callback_attr, None)
if consumer_callback is None:
Expand All @@ -79,7 +85,7 @@ def _instrument_blocking_channel_consumers(

@staticmethod
def _instrument_basic_publish(
channel: BlockingChannel,
channel: Union[BlockingChannel, Channel],
tracer: Tracer,
publish_hook: utils.HookT = utils.dummy_callback,
) -> None:
Expand All @@ -93,7 +99,7 @@ def _instrument_basic_publish(

@staticmethod
def _instrument_channel_functions(
channel: BlockingChannel,
channel: Union[BlockingChannel, Channel],
tracer: Tracer,
publish_hook: utils.HookT = utils.dummy_callback,
) -> None:
Expand All @@ -103,7 +109,9 @@ def _instrument_channel_functions(
)

@staticmethod
def _uninstrument_channel_functions(channel: BlockingChannel) -> None:
def _uninstrument_channel_functions(
channel: Union[BlockingChannel, Channel],
) -> None:
for function_name in _FUNCTIONS_TO_UNINSTRUMENT:
if not hasattr(channel, function_name):
continue
Expand All @@ -115,7 +123,7 @@ def _uninstrument_channel_functions(channel: BlockingChannel) -> None:
@staticmethod
# Make sure that the spans are created inside hash them set as parent and not as brothers
def instrument_channel(
channel: BlockingChannel,
channel: Union[BlockingChannel, Channel],
tracer_provider: Optional[TracerProvider] = None,
publish_hook: utils.HookT = utils.dummy_callback,
consume_hook: utils.HookT = utils.dummy_callback,
Expand All @@ -133,7 +141,7 @@ def instrument_channel(
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
PikaInstrumentor._instrument_blocking_channel_consumers(
PikaInstrumentor._instrument_channel_consumers(
channel, tracer, consume_hook
)
PikaInstrumentor._decorate_basic_consume(channel, tracer, consume_hook)
Expand Down Expand Up @@ -178,16 +186,17 @@ def wrapper(wrapped, instance, args, kwargs):
return channel

wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper)
wrapt.wrap_function_wrapper(Connection, "channel", wrapper)

@staticmethod
def _decorate_basic_consume(
channel: BlockingChannel,
channel: Union[BlockingChannel, Channel],
tracer: Optional[Tracer],
consume_hook: utils.HookT = utils.dummy_callback,
) -> None:
def wrapper(wrapped, instance, args, kwargs):
return_value = wrapped(*args, **kwargs)
PikaInstrumentor._instrument_blocking_channel_consumers(
PikaInstrumentor._instrument_channel_consumers(
channel, tracer, consume_hook
)
return return_value
Expand Down Expand Up @@ -236,6 +245,7 @@ def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
if hasattr(self, "__opentelemetry_tracer_provider"):
delattr(self, "__opentelemetry_tracer_provider")
unwrap(BlockingConnection, "channel")
unwrap(Connection, "channel")
unwrap(_QueueConsumerGeneratorInfo, "__init__")

def instrumentation_dependencies(self) -> Collection[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
# limitations under the License.
from unittest import TestCase, mock

from pika.adapters import BlockingConnection
from pika.adapters.blocking_connection import _QueueConsumerGeneratorInfo
from pika.adapters import BaseConnection, BlockingConnection
from pika.adapters.blocking_connection import (
BlockingChannel,
_QueueConsumerGeneratorInfo,
)
from pika.channel import Channel
from pika.connection import Connection
from wrapt import BoundFunctionWrapper

from opentelemetry.instrumentation.pika import PikaInstrumentor
Expand All @@ -31,11 +35,13 @@

class TestPika(TestCase):
def setUp(self) -> None:
self.blocking_channel = mock.MagicMock(spec=BlockingChannel)
self.channel = mock.MagicMock(spec=Channel)
consumer_info = mock.MagicMock()
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
setattr(consumer_info, callback_attr, mock.MagicMock())
self.channel._consumer_infos = {"consumer-tag": consumer_info}
self.blocking_channel._consumer_infos = {"consumer-tag": consumer_info}
self.channel._consumers = {"consumer-tag": consumer_info}
self.mock_callback = mock.MagicMock()

def test_instrument_api(self) -> None:
Expand All @@ -44,6 +50,10 @@ def test_instrument_api(self) -> None:
self.assertTrue(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)
self.assertTrue(isinstance(Connection.channel, BoundFunctionWrapper))
self.assertTrue(
isinstance(BaseConnection.channel, BoundFunctionWrapper)
)
self.assertTrue(
isinstance(
_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper
Expand All @@ -56,6 +66,10 @@ def test_instrument_api(self) -> None:
self.assertFalse(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)
self.assertFalse(isinstance(Connection.channel, BoundFunctionWrapper))
self.assertFalse(
isinstance(BaseConnection.channel, BoundFunctionWrapper)
)
self.assertFalse(
isinstance(
_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper
Expand All @@ -69,24 +83,47 @@ def test_instrument_api(self) -> None:
"opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_basic_consume"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_blocking_channel_consumers"
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_consumers"
)
def test_instrument_blocking_channel(
self,
instrument_channel_consumers: mock.MagicMock,
instrument_basic_consume: mock.MagicMock,
instrument_channel_functions: mock.MagicMock,
):
PikaInstrumentor.instrument_channel(channel=self.blocking_channel)
assert hasattr(
self.blocking_channel, "_is_instrumented_by_opentelemetry"
), "channel is not marked as instrumented!"
instrument_channel_consumers.assert_called_once()
instrument_basic_consume.assert_called_once()
instrument_channel_functions.assert_called_once()

@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_basic_consume"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_consumers"
)
def test_instrument_channel(
self,
instrument_blocking_channel_consumers: mock.MagicMock,
instrument_channel_consumers: mock.MagicMock,
instrument_basic_consume: mock.MagicMock,
instrument_channel_functions: mock.MagicMock,
):
PikaInstrumentor.instrument_channel(channel=self.channel)
assert hasattr(
self.channel, "_is_instrumented_by_opentelemetry"
), "channel is not marked as instrumented!"
instrument_blocking_channel_consumers.assert_called_once()
instrument_channel_consumers.assert_called_once()
instrument_basic_consume.assert_called_once()
instrument_channel_functions.assert_called_once()

@mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback")
def test_instrument_consumers(
def test_instrument_consumers_on_blocking_channel(
self, decorate_callback: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
Expand All @@ -95,23 +132,63 @@ def test_instrument_consumers(
mock.call(
getattr(value, callback_attr), tracer, key, dummy_callback
)
for key, value in self.channel._consumer_infos.items()
for key, value in self.blocking_channel._consumer_infos.items()
]
PikaInstrumentor._instrument_blocking_channel_consumers(
self.channel, tracer
PikaInstrumentor._instrument_channel_consumers(
self.blocking_channel, tracer
)
decorate_callback.assert_has_calls(
calls=expected_decoration_calls, any_order=True
)
assert all(
hasattr(callback, "_original_callback")
for callback in self.channel._consumer_infos.values()
for callback in self.blocking_channel._consumer_infos.values()
)

@mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback")
def test_instrument_consumers_on_channel(
self, decorate_callback: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
expected_decoration_calls = [
mock.call(
getattr(value, callback_attr), tracer, key, dummy_callback
)
for key, value in self.channel._consumers.items()
]
PikaInstrumentor._instrument_channel_consumers(self.channel, tracer)
decorate_callback.assert_has_calls(
calls=expected_decoration_calls, any_order=True
)
assert all(
hasattr(callback, "_original_callback")
for callback in self.channel._consumers.values()
)

@mock.patch(
"opentelemetry.instrumentation.pika.utils._decorate_basic_publish"
)
def test_instrument_basic_publish(
def test_instrument_basic_publish_on_blocking_channel(
self, decorate_basic_publish: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
original_function = self.blocking_channel.basic_publish
PikaInstrumentor._instrument_basic_publish(
self.blocking_channel, tracer
)
decorate_basic_publish.assert_called_once_with(
original_function, self.blocking_channel, tracer, dummy_callback
)
self.assertEqual(
self.blocking_channel.basic_publish,
decorate_basic_publish.return_value,
)

@mock.patch(
"opentelemetry.instrumentation.pika.utils._decorate_basic_publish"
)
def test_instrument_basic_publish_on_channel(
self, decorate_basic_publish: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
Expand Down Expand Up @@ -141,6 +218,17 @@ def test_instrument_queue_consumer_generator(self) -> None:
isinstance(generator_info.pending_events, ReadyMessagesDequeProxy)
)

def test_uninstrument_blocking_channel_functions(self) -> None:
original_function = self.blocking_channel.basic_publish
self.blocking_channel.basic_publish = mock.MagicMock()
self.blocking_channel.basic_publish._original_function = (
original_function
)
PikaInstrumentor._uninstrument_channel_functions(self.blocking_channel)
self.assertEqual(
self.blocking_channel.basic_publish, original_function
)

def test_uninstrument_channel_functions(self) -> None:
original_function = self.channel.basic_publish
self.channel.basic_publish = mock.MagicMock()
Expand Down