Skip to content

feat: support batch (getmany) in aiokafka instrumentation #3257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5a68ca6
feat: support batch (getmany) in aiokafka instrumentation
dimastbk Feb 11, 2025
29eb8d3
test: fix unclosed resources and typing
dimastbk Feb 12, 2025
ae227a1
test: add test_wrap_getmany
dimastbk Feb 12, 2025
745d7ad
fix: get unique topic list in batch
dimastbk Feb 12, 2025
774d741
fix: update typing, run pyupgrade
dimastbk Feb 12, 2025
5f75b27
fix: remove json.dumps from SERVER_ADDRESS attribute
dimastbk Feb 12, 2025
8f68412
fix pylint
dimastbk Feb 12, 2025
bf40659
fix: sync span_kind with spec
dimastbk Feb 12, 2025
54fdd95
add CHANGELOG entry
dimastbk Feb 12, 2025
d9bdae4
Merge branch 'main' into add-aiokafka-batch
dimastbk Feb 12, 2025
d7dfc07
Merge branch 'main' into add-aiokafka-batch
dimastbk Feb 12, 2025
570aecb
Merge branch 'main' into add-aiokafka-batch
dimastbk Mar 7, 2025
ebe92be
Merge branch 'main' into add-aiokafka-batch
dimastbk Mar 30, 2025
76c8faa
remove changes not from this issue
dimastbk Mar 30, 2025
4d6107f
move types under TYPE_CHECKING
dimastbk Apr 10, 2025
eed611e
Merge branch 'main' into add-aiokafka-batch
xrmx Apr 10, 2025
c7704ef
move CHANGELOG entry to unreleased
dimastbk Apr 11, 2025
0e9508c
Merge branch 'main' into add-aiokafka-batch
dimastbk Apr 11, 2025
08b698a
Merge branch 'main' into add-aiokafka-batch
dimastbk Apr 13, 2025
3a05425
Merge branch 'main' into add-aiokafka-batch
dimastbk Apr 15, 2025
0467411
enable pyright for aiokafka, fix key type
dimastbk Apr 15, 2025
1be05b4
Merge branch 'main' into add-aiokafka-batch
dimastbk Apr 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3385](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3385))
- `opentelemetry-instrumentation` Make auto instrumentation use the same dependency resolver as manual instrumentation does
([#3202](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3202))
- `opentelemetry-instrumentation-aiokafka` Add instrumentation of `consumer.getmany` (batch)
([#3257](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3257))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"opentelemetry-api ~= 1.27",
"opentelemetry-instrumentation == 0.54b0.dev",
"opentelemetry-semantic-conventions == 0.54b0.dev",
"typing_extensions ~= 4.1",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,20 @@ async def async_consume_hook(span, record, args, kwargs):
___
"""

from __future__ import annotations

from asyncio import iscoroutinefunction
from typing import Collection
from typing import TYPE_CHECKING, Collection

import aiokafka
from wrapt import wrap_function_wrapper
from wrapt import (
wrap_function_wrapper, # type: ignore[reportUnknownVariableType]
)

from opentelemetry import trace
from opentelemetry.instrumentation.aiokafka.package import _instruments
from opentelemetry.instrumentation.aiokafka.utils import (
_wrap_getmany,
_wrap_getone,
_wrap_send,
)
Expand All @@ -86,6 +91,21 @@ async def async_consume_hook(span, record, args, kwargs):
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.schemas import Schemas

if TYPE_CHECKING:
from typing import TypedDict

from typing_extensions import Unpack

from .utils import ConsumeHookT, ProduceHookT

class InstrumentKwargs(TypedDict, total=False):
tracer_provider: trace.TracerProvider
async_produce_hook: ProduceHookT
async_consume_hook: ConsumeHookT

class UninstrumentKwargs(TypedDict, total=False):
pass


class AIOKafkaInstrumentor(BaseInstrumentor):
"""An instrumentor for kafka module
Expand All @@ -95,7 +115,7 @@ class AIOKafkaInstrumentor(BaseInstrumentor):
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
def _instrument(self, **kwargs: Unpack[InstrumentKwargs]):
"""Instruments the kafka module

Args:
Expand Down Expand Up @@ -131,7 +151,13 @@ def _instrument(self, **kwargs):
"getone",
_wrap_getone(tracer, async_consume_hook),
)
wrap_function_wrapper(
aiokafka.AIOKafkaConsumer,
"getmany",
_wrap_getmany(tracer, async_consume_hook),
)

def _uninstrument(self, **kwargs):
def _uninstrument(self, **kwargs: Unpack[UninstrumentKwargs]):
unwrap(aiokafka.AIOKafkaProducer, "send")
unwrap(aiokafka.AIOKafkaConsumer, "getone")
unwrap(aiokafka.AIOKafkaConsumer, "getmany")
Loading