File tree 2 files changed +11
-2
lines changed
2 files changed +11
-2
lines changed Original file line number Diff line number Diff line change @@ -193,8 +193,15 @@ def _detect_xerial_stream(payload):
193
193
"""
194
194
195
195
if len (payload ) > 16 :
196
- header = struct .unpack ('!' + _XERIAL_V1_FORMAT , bytes (payload )[:16 ])
197
- return header == _XERIAL_V1_HEADER
196
+ magic = struct .unpack ('!' + _XERIAL_V1_FORMAT [:8 ], bytes (payload )[:8 ])
197
+ version , compat = struct .unpack ('!' + _XERIAL_V1_FORMAT [8 :], bytes (payload )[8 :16 ])
198
+ # Until there is more than one way to do xerial blocking, the version + compat
199
+ # fields can be ignored. Also some producers (i.e., redpanda) are known to
200
+ # incorrectly encode these as little-endian, and that causes us to fail decoding
201
+ # when we otherwise would have succeeded.
202
+ # See https://github.com/dpkp/kafka-python/issues/2414
203
+ if magic == _XERIAL_V1_HEADER [:8 ]:
204
+ return True
198
205
return False
199
206
200
207
Original file line number Diff line number Diff line change @@ -39,12 +39,14 @@ def test_snappy_detect_xerial():
39
39
_detect_xerial_stream = kafka1 .codec ._detect_xerial_stream
40
40
41
41
header = b'\x82 SNAPPY\x00 \x00 \x00 \x00 \x01 \x00 \x00 \x00 \x01 Some extra bytes'
42
+ redpanda_header = b'\x82 SNAPPY\x00 \x01 \x00 \x00 \x00 \x01 \x00 \x00 \x00 Some extra bytes'
42
43
false_header = b'\x01 SNAPPY\x00 \x00 \x00 \x01 \x00 \x00 \x00 \x01 '
43
44
default_snappy = snappy_encode (b'foobar' * 50 )
44
45
random_snappy = snappy_encode (b'SNAPPY' * 50 , xerial_compatible = False )
45
46
short_data = b'\x01 \x02 \x03 \x04 '
46
47
47
48
assert _detect_xerial_stream (header ) is True
49
+ assert _detect_xerial_stream (redpanda_header ) is True
48
50
assert _detect_xerial_stream (b'' ) is False
49
51
assert _detect_xerial_stream (b'\x00 ' ) is False
50
52
assert _detect_xerial_stream (false_header ) is False
You can’t perform that action at this time.
0 commit comments