1
1
from __future__ import annotations
2
2
3
3
import asyncio
4
+ import contextlib
4
5
import json
5
6
from logging import getLogger
6
7
from typing import (
14
15
Protocol ,
15
16
Sequence ,
16
17
Tuple ,
18
+ cast ,
17
19
)
18
20
19
21
import aiokafka
@@ -89,7 +91,7 @@ def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str:
89
91
def _extract_consumer_group (
90
92
consumer : aiokafka .AIOKafkaConsumer ,
91
93
) -> str | None :
92
- return consumer ._group_id
94
+ return consumer ._group_id # type: ignore[reportUnknownVariableType]
93
95
94
96
95
97
def _extract_argument (
@@ -139,6 +141,17 @@ def _move_headers_to_kwargs(
139
141
return args [:5 ], kwargs
140
142
141
143
144
+ def _deserialize_key (key : object | None ) -> str | None :
145
+ if key is None :
146
+ return None
147
+
148
+ if isinstance (key , bytes ):
149
+ with contextlib .suppress (UnicodeDecodeError ):
150
+ return key .decode ()
151
+
152
+ return str (key )
153
+
154
+
142
155
async def _extract_send_partition (
143
156
instance : aiokafka .AIOKafkaProducer ,
144
157
args : tuple [Any , ...],
@@ -150,17 +163,20 @@ async def _extract_send_partition(
150
163
key = _extract_send_key (args , kwargs )
151
164
value = _extract_send_value (args , kwargs )
152
165
partition = _extract_argument ("partition" , 3 , None , args , kwargs )
153
- key_bytes , value_bytes = instance ._serialize (topic , key , value )
166
+ key_bytes , value_bytes = cast (
167
+ tuple [bytes | None , bytes | None ],
168
+ instance ._serialize (topic , key , value ), # type: ignore[reportUnknownMemberType]
169
+ )
154
170
valid_types = (bytes , bytearray , memoryview , type (None ))
155
171
if (
156
172
type (key_bytes ) not in valid_types
157
173
or type (value_bytes ) not in valid_types
158
174
):
159
175
return None
160
176
161
- await instance .client ._wait_on_metadata (topic )
177
+ await instance .client ._wait_on_metadata (topic ) # type: ignore[reportUnknownMemberType]
162
178
163
- return instance ._partition (
179
+ return instance ._partition ( # type: ignore[reportUnknownMemberType]
164
180
topic , partition , key , value , key_bytes , value_bytes
165
181
)
166
182
except Exception as exception : # pylint: disable=W0703
@@ -170,26 +186,21 @@ async def _extract_send_partition(
170
186
171
187
class AIOKafkaContextGetter (textmap .Getter ["HeadersT" ]):
172
188
def get (self , carrier : HeadersT , key : str ) -> list [str ] | None :
173
- if carrier is None :
174
- return None
175
-
176
189
for item_key , value in carrier :
177
190
if item_key == key :
178
191
if value is not None :
179
192
return [value .decode ()]
180
193
return None
181
194
182
195
def keys (self , carrier : HeadersT ) -> list [str ]:
183
- if carrier is None :
184
- return []
185
- return [key for (key , value ) in carrier ]
196
+ return [key for (key , _ ) in carrier ]
186
197
187
198
188
199
class AIOKafkaContextSetter (textmap .Setter ["HeadersT" ]):
189
200
def set (
190
201
self , carrier : HeadersT , key : str | None , value : str | None
191
202
) -> None :
192
- if carrier is None or key is None :
203
+ if key is None :
193
204
return
194
205
195
206
if not isinstance (carrier , MutableSequence ):
@@ -215,7 +226,7 @@ def _enrich_base_span(
215
226
client_id : str ,
216
227
topic : str ,
217
228
partition : int | None ,
218
- key : object | None ,
229
+ key : str | None ,
219
230
) -> None :
220
231
span .set_attribute (
221
232
messaging_attributes .MESSAGING_SYSTEM ,
@@ -235,8 +246,7 @@ def _enrich_base_span(
235
246
236
247
if key is not None :
237
248
span .set_attribute (
238
- messaging_attributes .MESSAGING_KAFKA_MESSAGE_KEY ,
239
- key , # FIXME: serialize key to str?
249
+ messaging_attributes .MESSAGING_KAFKA_MESSAGE_KEY , key
240
250
)
241
251
242
252
@@ -247,7 +257,7 @@ def _enrich_send_span(
247
257
client_id : str ,
248
258
topic : str ,
249
259
partition : int | None ,
250
- key : object | None ,
260
+ key : str | None ,
251
261
) -> None :
252
262
if not span .is_recording ():
253
263
return
@@ -276,7 +286,7 @@ def _enrich_getone_span(
276
286
consumer_group : str | None ,
277
287
topic : str ,
278
288
partition : int | None ,
279
- key : object | None ,
289
+ key : str | None ,
280
290
offset : int ,
281
291
) -> None :
282
292
if not span .is_recording ():
@@ -399,7 +409,7 @@ def _get_span_name(operation: str, topic: str):
399
409
return f"{ topic } { operation } "
400
410
401
411
402
- def _wrap_send (
412
+ def _wrap_send ( # type: ignore[reportUnusedFunction]
403
413
tracer : Tracer , async_produce_hook : ProduceHookT
404
414
) -> Callable [..., Awaitable [asyncio .Future [RecordMetadata ]]]:
405
415
async def _traced_send (
@@ -417,7 +427,7 @@ async def _traced_send(
417
427
topic = _extract_send_topic (args , kwargs )
418
428
bootstrap_servers = _extract_bootstrap_servers (instance .client )
419
429
client_id = _extract_client_id (instance .client )
420
- key = _extract_send_key (args , kwargs )
430
+ key = _deserialize_key ( _extract_send_key (args , kwargs ) )
421
431
partition = await _extract_send_partition (instance , args , kwargs )
422
432
span_name = _get_span_name ("send" , topic )
423
433
with tracer .start_as_current_span (
@@ -473,7 +483,7 @@ async def _create_consumer_span(
473
483
consumer_group = consumer_group ,
474
484
topic = record .topic ,
475
485
partition = record .partition ,
476
- key = record .key ,
486
+ key = _deserialize_key ( record .key ) ,
477
487
offset = record .offset ,
478
488
)
479
489
try :
@@ -486,7 +496,7 @@ async def _create_consumer_span(
486
496
return span
487
497
488
498
489
- def _wrap_getone (
499
+ def _wrap_getone ( # type: ignore[reportUnusedFunction]
490
500
tracer : Tracer , async_consume_hook : ConsumeHookT
491
501
) -> Callable [..., Awaitable [aiokafka .ConsumerRecord [object , object ]]]:
492
502
async def _traced_getone (
@@ -521,7 +531,7 @@ async def _traced_getone(
521
531
return _traced_getone
522
532
523
533
524
- def _wrap_getmany (
534
+ def _wrap_getmany ( # type: ignore[reportUnusedFunction]
525
535
tracer : Tracer , async_consume_hook : ConsumeHookT
526
536
) -> Callable [
527
537
...,
0 commit comments