Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support batch (getmany) in aiokafka instrumentation #3257

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added
- `opentelemetry-instrumentation-aiokafka` Add instrumentation of `consumer.getmany` (batch)
([#3257](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3257))

### Fixed
- `opentelemetry-instrumentation-redis` Add missing entry in doc string for `def _instrument`
([#3247](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3247))
Original file line number Diff line number Diff line change
@@ -78,6 +78,7 @@ async def async_consume_hook(span, record, args, kwargs):
from opentelemetry import trace
from opentelemetry.instrumentation.aiokafka.package import _instruments
from opentelemetry.instrumentation.aiokafka.utils import (
_wrap_getmany,
_wrap_getone,
_wrap_send,
)
@@ -131,7 +132,13 @@ def _instrument(self, **kwargs):
"getone",
_wrap_getone(tracer, async_consume_hook),
)
wrap_function_wrapper(
aiokafka.AIOKafkaConsumer,
"getmany",
_wrap_getmany(tracer, async_consume_hook),
)

def _uninstrument(self, **kwargs):
unwrap(aiokafka.AIOKafkaProducer, "send")
unwrap(aiokafka.AIOKafkaConsumer, "getone")
unwrap(aiokafka.AIOKafkaConsumer, "getmany")
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
import json
from __future__ import annotations

import asyncio
from logging import getLogger
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
MutableSequence,
Optional,
Protocol,
Sequence,
Tuple,
)

import aiokafka
from aiokafka import ConsumerRecord

from opentelemetry import context, propagate, trace
from opentelemetry.context import Context
@@ -13,12 +25,60 @@
from opentelemetry.trace import Tracer
from opentelemetry.trace.span import Span

if TYPE_CHECKING:
from aiokafka.structs import RecordMetadata

class AIOKafkaGetOneProto(Protocol):
async def __call__(
self, *partitions: aiokafka.TopicPartition
) -> aiokafka.ConsumerRecord[object, object]: ...

class AIOKafkaGetManyProto(Protocol):
async def __call__(
self,
*partitions: aiokafka.TopicPartition,
timeout_ms: int = 0,
max_records: int | None = None,
) -> dict[
aiokafka.TopicPartition,
list[aiokafka.ConsumerRecord[object, object]],
]: ...

class AIOKafkaSendProto(Protocol):
async def __call__(
self,
topic: str,
value: object | None = None,
key: object | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: HeadersT | None = None,
) -> asyncio.Future[RecordMetadata]: ...


ProduceHookT = Optional[
Callable[[Span, Tuple[Any, ...], Dict[str, Any]], Awaitable[None]]
]
ConsumeHookT = Optional[
Callable[
[
Span,
aiokafka.ConsumerRecord[object, object],
Tuple[aiokafka.TopicPartition, ...],
Dict[str, Any],
],
Awaitable[None],
]
]

HeadersT = Sequence[Tuple[str, Optional[bytes]]]

_LOG = getLogger(__name__)


def _extract_bootstrap_servers(
client: aiokafka.AIOKafkaClient,
) -> Union[str, List[str]]:
) -> str | list[str]:
return client._bootstrap_servers


@@ -28,51 +88,53 @@ def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str:

def _extract_consumer_group(
consumer: aiokafka.AIOKafkaConsumer,
) -> Optional[str]:
) -> str | None:
return consumer._group_id


def _extract_argument(
key: str,
position: int,
default_value: Any,
args: Tuple[Any],
kwargs: Dict[str, Any],
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> Any:
if len(args) > position:
return args[position]
return kwargs.get(key, default_value)


def _extract_send_topic(args: Tuple[Any], kwargs: Dict[str, Any]) -> str:
def _extract_send_topic(args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
"""extract topic from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("topic", 0, "unknown", args, kwargs)


def _extract_send_value(
args: Tuple[Any], kwargs: Dict[str, Any]
) -> Optional[Any]:
args: tuple[Any, ...], kwargs: dict[str, Any]
) -> object | None:
"""extract value from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("value", 1, None, args, kwargs)


def _extract_send_key(
args: Tuple[Any], kwargs: Dict[str, Any]
) -> Optional[Any]:
args: tuple[Any, ...], kwargs: dict[str, Any]
) -> object | None:
"""extract key from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("key", 2, None, args, kwargs)


def _extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]):
def _extract_send_headers(
args: tuple[Any, ...], kwargs: dict[str, Any]
) -> HeadersT | None:
"""extract headers from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("headers", 5, None, args, kwargs)


async def _extract_send_partition(
instance: aiokafka.AIOKafkaProducer,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> Optional[int]:
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> int | None:
"""extract partition `send` method arguments, using the `_partition` method in AIOKafkaProducer class"""
try:
topic = _extract_send_topic(args, kwargs)
@@ -97,16 +159,8 @@ async def _extract_send_partition(
return None


ProduceHookT = Optional[Callable[[Span, Tuple, Dict], Awaitable[None]]]
ConsumeHookT = Optional[
Callable[[Span, ConsumerRecord, Tuple, Dict], Awaitable[None]]
]

HeadersT = List[Tuple[str, Optional[bytes]]]


class AIOKafkaContextGetter(textmap.Getter[HeadersT]):
def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]:
def get(self, carrier: HeadersT, key: str) -> list[str] | None:
if carrier is None:
return None

@@ -116,19 +170,25 @@ def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]:
return [value.decode()]
return None

def keys(self, carrier: HeadersT) -> List[str]:
def keys(self, carrier: HeadersT) -> list[str]:
if carrier is None:
return []
return [key for (key, value) in carrier]


class AIOKafkaContextSetter(textmap.Setter[HeadersT]):
def set(
self, carrier: HeadersT, key: Optional[str], value: Optional[str]
self, carrier: HeadersT, key: str | None, value: str | None
) -> None:
if carrier is None or key is None:
return

if not isinstance(carrier, MutableSequence):
_LOG.warning(
"Unable to set context in headers. Headers is immutable"
)
return

if value is not None:
carrier.append((key, value.encode()))
else:
@@ -142,19 +202,17 @@ def set(
def _enrich_base_span(
span: Span,
*,
bootstrap_servers: Union[str, List[str]],
bootstrap_servers: str | list[str],
client_id: str,
topic: str,
partition: Optional[int],
key: Optional[Any],
partition: int | None,
key: object | None,
) -> None:
span.set_attribute(
messaging_attributes.MESSAGING_SYSTEM,
messaging_attributes.MessagingSystemValues.KAFKA.value,
)
span.set_attribute(
server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers)
)
span.set_attribute(server_attributes.SERVER_ADDRESS, bootstrap_servers)
span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id)
span.set_attribute(messaging_attributes.MESSAGING_DESTINATION_NAME, topic)

@@ -166,18 +224,19 @@ def _enrich_base_span(

if key is not None:
span.set_attribute(
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY,
key, # FIXME: serialize key to str?
)


def _enrich_send_span(
span: Span,
*,
bootstrap_servers: Union[str, List[str]],
bootstrap_servers: str | list[str],
client_id: str,
topic: str,
partition: Optional[int],
key: Optional[str],
partition: int | None,
key: object | None,
) -> None:
if not span.is_recording():
return
@@ -194,19 +253,19 @@ def _enrich_send_span(
span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "send")
span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_TYPE,
messaging_attributes.MessagingOperationTypeValues.PUBLISH.value,
messaging_attributes.MessagingOperationTypeValues.SEND.value,
)


def _enrich_anext_span(
def _enrich_getone_span(
span: Span,
*,
bootstrap_servers: Union[str, List[str]],
bootstrap_servers: str | list[str],
client_id: str,
consumer_group: Optional[str],
consumer_group: str | None,
topic: str,
partition: Optional[int],
key: Optional[str],
partition: int | None,
key: object | None,
offset: int,
) -> None:
if not span.is_recording():
@@ -247,20 +306,92 @@ def _enrich_anext_span(
)


def _enrich_getmany_poll_span(
span: Span,
*,
bootstrap_servers: str | list[str],
client_id: str,
consumer_group: str | None,
message_count: int,
) -> None:
if not span.is_recording():
return

span.set_attribute(
messaging_attributes.MESSAGING_SYSTEM,
messaging_attributes.MessagingSystemValues.KAFKA.value,
)
span.set_attribute(server_attributes.SERVER_ADDRESS, bootstrap_servers)
span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id)

if consumer_group is not None:
span.set_attribute(
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group
)

span.set_attribute(
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT, message_count
)

span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "poll")
span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_TYPE,
messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
)


def _enrich_getmany_topic_span(
span: Span,
*,
bootstrap_servers: str | list[str],
client_id: str,
consumer_group: str | None,
topic: str,
partition: int,
message_count: int,
) -> None:
if not span.is_recording():
return

_enrich_base_span(
span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
topic=topic,
partition=partition,
key=None,
)

if consumer_group is not None:
span.set_attribute(
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group
)

span.set_attribute(
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT, message_count
)

span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "poll")
span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_TYPE,
messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
)


def _get_span_name(operation: str, topic: str):
return f"{topic} {operation}"


def _wrap_send(
tracer: Tracer, async_produce_hook: ProduceHookT
) -> Callable[..., Awaitable[None]]:
) -> Callable[..., Awaitable[asyncio.Future[RecordMetadata]]]:
async def _traced_send(
func: Callable[..., Awaitable[None]],
func: AIOKafkaSendProto,
instance: aiokafka.AIOKafkaProducer,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> None:
headers = _extract_send_headers(args, kwargs)
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> asyncio.Future[RecordMetadata]:
headers: HeadersT | None = _extract_send_headers(args, kwargs)
if headers is None:
headers = []
kwargs["headers"] = headers
@@ -301,23 +432,23 @@ async def _traced_send(
async def _create_consumer_span(
tracer: Tracer,
async_consume_hook: ConsumeHookT,
record: ConsumerRecord,
record: aiokafka.ConsumerRecord[object, object],
extracted_context: Context,
bootstrap_servers: Union[str, List[str]],
bootstrap_servers: str | list[str],
client_id: str,
consumer_group: Optional[str],
args: Tuple[Any],
kwargs: Dict[str, Any],
):
consumer_group: str | None,
args: tuple[aiokafka.TopicPartition, ...],
kwargs: dict[str, Any],
) -> trace.Span:
span_name = _get_span_name("receive", record.topic)
with tracer.start_as_current_span(
span_name,
context=extracted_context,
kind=trace.SpanKind.CONSUMER,
kind=trace.SpanKind.CLIENT,
) as span:
new_context = trace.set_span_in_context(span, extracted_context)
token = context.attach(new_context)
_enrich_anext_span(
_enrich_getone_span(
span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
@@ -334,16 +465,18 @@ async def _create_consumer_span(
_LOG.exception(hook_exception)
context.detach(token)

return span


def _wrap_getone(
tracer: Tracer, async_consume_hook: ConsumeHookT
) -> Callable[..., Awaitable[aiokafka.ConsumerRecord]]:
async def _traced_next(
func: Callable[..., Awaitable[aiokafka.ConsumerRecord]],
) -> Callable[..., Awaitable[aiokafka.ConsumerRecord[object, object]]]:
async def _traced_getone(
func: AIOKafkaGetOneProto,
instance: aiokafka.AIOKafkaConsumer,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> aiokafka.ConsumerRecord:
args: tuple[aiokafka.TopicPartition, ...],
kwargs: dict[str, Any],
) -> aiokafka.ConsumerRecord[object, object]:
record = await func(*args, **kwargs)

if record:
@@ -367,4 +500,81 @@ async def _traced_next(
)
return record

return _traced_next
return _traced_getone


def _wrap_getmany(
tracer: Tracer, async_consume_hook: ConsumeHookT
) -> Callable[
...,
Awaitable[
dict[
aiokafka.TopicPartition,
list[aiokafka.ConsumerRecord[object, object]],
]
],
]:
async def _traced_getmany(
func: AIOKafkaGetManyProto,
instance: aiokafka.AIOKafkaConsumer,
args: tuple[aiokafka.TopicPartition, ...],
kwargs: dict[str, Any],
) -> dict[
aiokafka.TopicPartition, list[aiokafka.ConsumerRecord[object, object]]
]:
records = await func(*args, **kwargs)

if records:
bootstrap_servers = _extract_bootstrap_servers(instance._client)
client_id = _extract_client_id(instance._client)
consumer_group = _extract_consumer_group(instance)

span_name = _get_span_name(
"poll",
", ".join(sorted({topic.topic for topic in records.keys()})),
)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as poll_span:
_enrich_getmany_poll_span(
poll_span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
consumer_group=consumer_group,
message_count=sum(len(r) for r in records.values()),
)

for topic, topic_records in records.items():
span_name = _get_span_name("poll", topic.topic)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as topic_span:
_enrich_getmany_topic_span(
topic_span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
consumer_group=consumer_group,
topic=topic.topic,
partition=topic.partition,
message_count=len(topic_records),
)

for record in topic_records:
extracted_context = propagate.extract(
record.headers, getter=_aiokafka_getter
)
record_span = await _create_consumer_span(
tracer,
async_consume_hook,
record,
extracted_context,
bootstrap_servers,
client_id,
consumer_group,
args,
kwargs,
)
topic_span.add_link(record_span.get_span_context())
return records

return _traced_getmany
Original file line number Diff line number Diff line change
@@ -12,10 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import uuid
from typing import Any, List, Sequence, Tuple
from typing import Any, Sequence, cast
from unittest import IsolatedAsyncioTestCase, TestCase, mock

import aiokafka
from aiokafka import (
AIOKafkaConsumer,
AIOKafkaProducer,
@@ -44,6 +47,9 @@ def test_instrument_api(self) -> None:
self.assertTrue(
isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper)
)
self.assertTrue(
isinstance(AIOKafkaConsumer.getmany, BoundFunctionWrapper)
)

instrumentation.uninstrument()
self.assertFalse(
@@ -52,12 +58,15 @@ def test_instrument_api(self) -> None:
self.assertFalse(
isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper)
)
self.assertFalse(
isinstance(AIOKafkaConsumer.getmany, BoundFunctionWrapper)
)


class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
@staticmethod
def consumer_record_factory(
number: int, headers: Tuple[Tuple[str, bytes], ...]
number: int, headers: tuple[tuple[str, bytes], ...]
) -> ConsumerRecord:
return ConsumerRecord(
f"topic_{number}",
@@ -73,6 +82,34 @@ def consumer_record_factory(
headers=headers,
)

@staticmethod
def consumer_batch_factory(
*headers: tuple[tuple[str, bytes], ...],
) -> dict[aiokafka.TopicPartition, list[aiokafka.ConsumerRecord]]:
records = {}
for number, record_headers in enumerate(headers, start=1):
records[
aiokafka.TopicPartition(
topic=f"topic_{number}", partition=number
)
] = [
ConsumerRecord(
f"topic_{number}",
number,
number,
number,
number,
f"key_{number}".encode(),
f"value_{number}".encode(),
None,
number,
number,
headers=record_headers,
)
]

return records

@staticmethod
async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer:
consumer = AIOKafkaConsumer(**consumer_kwargs)
@@ -83,6 +120,7 @@ async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer:
await consumer.start()

consumer._fetcher.next_record = mock.AsyncMock()
consumer._fetcher.fetched_records = mock.AsyncMock()

return consumer

@@ -100,24 +138,30 @@ async def producer_factory() -> AIOKafkaProducer:

return producer

async def test_getone(self) -> None:
AIOKafkaInstrumentor().uninstrument()
def setUp(self):
super().setUp()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

def tearDown(self):
super().tearDown()
AIOKafkaInstrumentor().uninstrument()

async def test_getone(self) -> None:
client_id = str(uuid.uuid4())
group_id = str(uuid.uuid4())
consumer = await self.consumer_factory(
client_id=client_id, group_id=group_id
)
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record
self.addAsyncCleanup(consumer.stop)
next_record_mock = cast(mock.AsyncMock, consumer._fetcher.next_record)

expected_spans = [
{
"name": "topic_1 receive",
"kind": SpanKind.CONSUMER,
"kind": SpanKind.CLIENT,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
server_attributes.SERVER_ADDRESS: "localhost",
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1",
@@ -131,10 +175,10 @@ async def test_getone(self) -> None:
},
{
"name": "topic_2 receive",
"kind": SpanKind.CONSUMER,
"kind": SpanKind.CLIENT,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
server_attributes.SERVER_ADDRESS: "localhost",
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2",
@@ -191,7 +235,8 @@ async def async_consume_hook(span, *_) -> None:
)

consumer = await self.consumer_factory()
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record
self.addAsyncCleanup(consumer.stop)
next_record_mock = cast(mock.AsyncMock, consumer._fetcher.next_record)

self.memory_exporter.clear()

@@ -223,7 +268,8 @@ async def test_getone_consume_hook(self) -> None:
)

consumer = await self.consumer_factory()
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record
self.addAsyncCleanup(consumer.stop)
next_record_mock = cast(mock.AsyncMock, consumer._fetcher.next_record)

next_record_mock.side_effect = [
self.consumer_record_factory(1, headers=())
@@ -233,13 +279,121 @@ async def test_getone_consume_hook(self) -> None:

async_consume_hook_mock.assert_awaited_once()

async def test_send(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)
async def test_getmany(self) -> None:
client_id = str(uuid.uuid4())
group_id = str(uuid.uuid4())
consumer = await self.consumer_factory(
client_id=client_id, group_id=group_id
)
self.addAsyncCleanup(consumer.stop)
fetched_records_mock = cast(
mock.AsyncMock, consumer._fetcher.fetched_records
)

expected_spans = [
{
"name": "topic_1 receive",
"kind": SpanKind.CLIENT,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: "localhost",
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1",
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_1",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 1,
messaging_attributes.MESSAGING_MESSAGE_ID: "topic_1.1.1",
},
},
{
"name": "topic_1 poll",
"kind": SpanKind.CLIENT,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: "localhost",
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "poll",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 1,
},
},
{
"name": "topic_2 receive",
"kind": SpanKind.CLIENT,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: "localhost",
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2",
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_2",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 2,
messaging_attributes.MESSAGING_MESSAGE_ID: "topic_2.2.2",
},
},
{
"name": "topic_2 poll",
"kind": SpanKind.CLIENT,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: "localhost",
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "poll",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 1,
},
},
{
"name": "topic_1, topic_2 poll",
"kind": SpanKind.CLIENT,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: "localhost",
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "poll",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 2,
},
},
]
self.memory_exporter.clear()

fetched_records_mock.side_effect = [
self.consumer_batch_factory(
(
(
"traceparent",
b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01",
),
),
(),
),
]

await consumer.getmany()
fetched_records_mock.assert_awaited_with((), 0.0, max_records=None)

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

async def test_send(self) -> None:
producer = await self.producer_factory()
add_message_mock: mock.AsyncMock = (
producer._message_accumulator.add_message
self.addAsyncCleanup(producer.stop)
add_message_mock = cast(
mock.AsyncMock, producer._message_accumulator.add_message
)

tracer = self.tracer_provider.get_tracer(__name__)
@@ -269,12 +423,10 @@ async def test_send(self) -> None:
)

async def test_send_baggage(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

producer = await self.producer_factory()
add_message_mock: mock.AsyncMock = (
producer._message_accumulator.add_message
self.addAsyncCleanup(producer.stop)
add_message_mock = cast(
mock.AsyncMock, producer._message_accumulator.add_message
)

tracer = self.tracer_provider.get_tracer(__name__)
@@ -303,18 +455,21 @@ async def test_send_produce_hook(self) -> None:
)

producer = await self.producer_factory()
self.addAsyncCleanup(producer.stop)

await producer.send("topic_1", b"value_1")

async_produce_hook_mock.assert_awaited_once()

def _compare_spans(
self, spans: Sequence[ReadableSpan], expected_spans: List[dict]
self, spans: Sequence[ReadableSpan], expected_spans: list[dict]
) -> None:
self.assertEqual(len(spans), len(expected_spans))
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
self.assertEqual(expected_span["kind"], span.kind)
self.assertEqual(expected_span["name"], span.name, msg=span.name)
self.assertEqual(expected_span["kind"], span.kind, msg=span.name)
self.assertEqual(
expected_span["attributes"], dict(span.attributes)
expected_span["attributes"],
dict(span.attributes),
msg=span.name,
)
Original file line number Diff line number Diff line change
@@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=unnecessary-dunder-call
from __future__ import annotations

from unittest import IsolatedAsyncioTestCase, mock

import aiokafka

from opentelemetry.instrumentation.aiokafka.utils import (
AIOKafkaContextGetter,
AIOKafkaContextSetter,
@@ -23,6 +26,7 @@
_create_consumer_span,
_extract_send_partition,
_get_span_name,
_wrap_getmany,
_wrap_getone,
_wrap_send,
)
@@ -42,7 +46,7 @@ def test_context_setter(self) -> None:

carrier_list = [("key1", b"val1")]
context_setter.set(carrier_list, "key2", "val2")
self.assertTrue(("key2", "val2".encode()) in carrier_list)
self.assertTrue(("key2", b"val2") in carrier_list)

def test_context_getter(self) -> None:
context_setter = AIOKafkaContextSetter()
@@ -174,7 +178,7 @@ async def wrap_send_helper(
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group"
)
async def test_wrap_next(
async def test_wrap_getone(
self,
extract_consumer_group: mock.MagicMock,
extract_client_id: mock.MagicMock,
@@ -184,12 +188,12 @@ async def test_wrap_next(
) -> None:
tracer = mock.MagicMock()
consume_hook = mock.AsyncMock()
original_next_callback = mock.AsyncMock()
original_getone_callback = mock.AsyncMock()
kafka_consumer = mock.MagicMock()

wrapped_next = _wrap_getone(tracer, consume_hook)
record = await wrapped_next(
original_next_callback, kafka_consumer, self.args, self.kwargs
wrapped_getone = _wrap_getone(tracer, consume_hook)
record = await wrapped_getone(
original_getone_callback, kafka_consumer, self.args, self.kwargs
)

extract_bootstrap_servers.assert_called_once_with(
@@ -203,10 +207,10 @@ async def test_wrap_next(
extract_consumer_group.assert_called_once_with(kafka_consumer)
consumer_group = extract_consumer_group.return_value

original_next_callback.assert_awaited_once_with(
original_getone_callback.assert_awaited_once_with(
*self.args, **self.kwargs
)
self.assertEqual(record, original_next_callback.return_value)
self.assertEqual(record, original_getone_callback.return_value)

extract.assert_called_once_with(
record.headers, getter=_aiokafka_getter
@@ -225,10 +229,90 @@ async def test_wrap_next(
self.kwargs,
)

@mock.patch("opentelemetry.propagate.extract")
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._create_consumer_span"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._enrich_getmany_topic_span"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._enrich_getmany_poll_span"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_client_id"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group"
)
# pylint: disable=too-many-locals
async def test_wrap_getmany(
self,
extract_consumer_group: mock.MagicMock,
extract_client_id: mock.MagicMock,
extract_bootstrap_servers: mock.MagicMock,
_enrich_getmany_poll_span: mock.MagicMock,
_enrich_getmany_topic_span: mock.MagicMock,
_create_consumer_span: mock.MagicMock,
extract: mock.MagicMock,
) -> None:
tracer = mock.MagicMock()
consume_hook = mock.AsyncMock()
record_mock = mock.MagicMock()
original_getmany_callback = mock.AsyncMock(
return_value={
aiokafka.TopicPartition(topic="topic_1", partition=0): [
record_mock
]
}
)
kafka_consumer = mock.MagicMock()

wrapped_getmany = _wrap_getmany(tracer, consume_hook)
records = await wrapped_getmany(
original_getmany_callback, kafka_consumer, self.args, self.kwargs
)

extract_bootstrap_servers.assert_called_once_with(
kafka_consumer._client
)
bootstrap_servers = extract_bootstrap_servers.return_value

extract_client_id.assert_called_once_with(kafka_consumer._client)
client_id = extract_client_id.return_value

extract_consumer_group.assert_called_once_with(kafka_consumer)
consumer_group = extract_consumer_group.return_value

original_getmany_callback.assert_awaited_once_with(
*self.args, **self.kwargs
)
self.assertEqual(records, original_getmany_callback.return_value)

extract.assert_called_once_with(
record_mock.headers, getter=_aiokafka_getter
)
context = extract.return_value

_create_consumer_span.assert_called_once_with(
tracer,
consume_hook,
record_mock,
context,
bootstrap_servers,
client_id,
consumer_group,
self.args,
self.kwargs,
)

@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.context.attach")
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._enrich_anext_span"
"opentelemetry.instrumentation.aiokafka.utils._enrich_getone_span"
)
@mock.patch("opentelemetry.context.detach")
async def test_create_consumer_span(
@@ -263,7 +347,7 @@ async def test_create_consumer_span(
tracer.start_as_current_span.assert_called_once_with(
expected_span_name,
context=extracted_context,
kind=SpanKind.CONSUMER,
kind=SpanKind.CLIENT,
)
span = tracer.start_as_current_span.return_value.__enter__()
set_span_in_context.assert_called_once_with(span, extracted_context)