Skip to content

Commit 98d7137

Browse files
committed
Rename CorruptRecordException -> CorruptRecordError
1 parent d2d1cdd commit 98d7137

File tree

6 files changed

+17
-17
lines changed

6 files changed

+17
-17
lines changed

Diff for: kafka/consumer/fetcher.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def fetched_records(self, max_records=None, update_offsets=True):
341341
342342
Raises:
343343
OffsetOutOfRangeError: if no subscription offset_reset_strategy
344-
CorruptRecordException: if message crc validation fails (check_crcs
344+
CorruptRecordError: if message crc validation fails (check_crcs
345345
must be set to True)
346346
RecordTooLargeError: if a message is larger than the currently
347347
configured max_partition_fetch_bytes
@@ -925,7 +925,7 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
925925
last_batch = batch
926926

927927
if self.check_crcs and not batch.validate_crc():
928-
raise Errors.CorruptRecordException(
928+
raise Errors.CorruptRecordError(
929929
"Record batch for partition %s at offset %s failed crc check" % (
930930
self.topic_partition, batch.base_offset))
931931

@@ -963,7 +963,7 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
963963

964964
for record in batch:
965965
if self.check_crcs and not record.validate_crc():
966-
raise Errors.CorruptRecordException(
966+
raise Errors.CorruptRecordError(
967967
"Record for partition %s at offset %s failed crc check" % (
968968
self.topic_partition, record.offset))
969969
key_size = len(record.key) if record.key is not None else -1

Diff for: kafka/errors.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,14 @@ class OffsetOutOfRangeError(BrokerResponseError):
120120
' maintained by the server for the given topic/partition.')
121121

122122

123-
class CorruptRecordException(BrokerResponseError):
123+
class CorruptRecordError(BrokerResponseError):
124124
errno = 2
125125
message = 'CORRUPT_MESSAGE'
126126
description = ('This message has failed its CRC checksum, exceeds the'
127127
' valid size, or is otherwise corrupt.')
128128

129129
# Backward compatibility
130-
InvalidMessageError = CorruptRecordException
130+
CorruptRecordException = CorruptRecordError
131131

132132

133133
class UnknownTopicOrPartitionError(BrokerResponseError):

Diff for: kafka/record/default_records.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
from kafka.record.util import (
6161
decode_varint, encode_varint, calc_crc32c, size_of_varint
6262
)
63-
from kafka.errors import CorruptRecordException, UnsupportedCodecError
63+
from kafka.errors import CorruptRecordError, UnsupportedCodecError
6464
from kafka.codec import (
6565
gzip_encode, snappy_encode, lz4_encode, zstd_encode,
6666
gzip_decode, snappy_decode, lz4_decode, zstd_decode
@@ -288,14 +288,14 @@ def _read_msg(
288288

289289
header_count, pos = decode_varint(buffer, pos)
290290
if header_count < 0:
291-
raise CorruptRecordException("Found invalid number of record "
291+
raise CorruptRecordError("Found invalid number of record "
292292
"headers {}".format(header_count))
293293
headers = []
294294
while header_count:
295295
# Header key is of type String, that can't be None
296296
h_key_len, pos = decode_varint(buffer, pos)
297297
if h_key_len < 0:
298-
raise CorruptRecordException(
298+
raise CorruptRecordError(
299299
"Invalid negative header key size {}".format(h_key_len))
300300
h_key = buffer[pos: pos + h_key_len].decode("utf-8")
301301
pos += h_key_len
@@ -313,7 +313,7 @@ def _read_msg(
313313

314314
# validate whether we have read all header bytes in the current record
315315
if pos - start_pos != length:
316-
raise CorruptRecordException(
316+
raise CorruptRecordError(
317317
"Invalid record size: expected to read {} bytes in record "
318318
"payload, but instead read {}".format(length, pos - start_pos))
319319
self._pos = pos
@@ -332,14 +332,14 @@ def __iter__(self):
332332
def __next__(self):
333333
if self._next_record_index >= self._num_records:
334334
if self._pos != len(self._buffer):
335-
raise CorruptRecordException(
335+
raise CorruptRecordError(
336336
"{} unconsumed bytes after all records consumed".format(
337337
len(self._buffer) - self._pos))
338338
raise StopIteration
339339
try:
340340
msg = self._read_msg()
341341
except (ValueError, IndexError) as err:
342-
raise CorruptRecordException(
342+
raise CorruptRecordError(
343343
"Found invalid record structure: {!r}".format(err))
344344
else:
345345
self._next_record_index += 1

Diff for: kafka/record/legacy_records.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka,
5353
)
5454
import kafka.codec as codecs
55-
from kafka.errors import CorruptRecordException, UnsupportedCodecError
55+
from kafka.errors import CorruptRecordError, UnsupportedCodecError
5656

5757

5858
class LegacyRecordBase(object):
@@ -191,7 +191,7 @@ def _decompress(self, key_offset):
191191
value_size = struct.unpack_from(">i", self._buffer, pos)[0]
192192
pos += self.VALUE_LENGTH
193193
if value_size == -1:
194-
raise CorruptRecordException("Value of compressed message is None")
194+
raise CorruptRecordError("Value of compressed message is None")
195195
else:
196196
data = self._buffer[pos:pos + value_size]
197197

Diff for: kafka/record/memory_records.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import struct
2424

25-
from kafka.errors import CorruptRecordException, IllegalStateError, UnsupportedVersionError
25+
from kafka.errors import CorruptRecordError, IllegalStateError, UnsupportedVersionError
2626
from kafka.record.abc import ABCRecords
2727
from kafka.record.legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder
2828
from kafka.record.default_records import DefaultRecordBatch, DefaultRecordBatchBuilder
@@ -99,7 +99,7 @@ def next_batch(self, _min_slice=MIN_SLICE,
9999
if next_slice is None:
100100
return None
101101
if len(next_slice) < _min_slice:
102-
raise CorruptRecordException(
102+
raise CorruptRecordError(
103103
"Record size is less than the minimum record overhead "
104104
"({})".format(_min_slice - self.LOG_OVERHEAD))
105105
self._cache_next()

Diff for: test/record/test_records.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from __future__ import unicode_literals
33
import pytest
44
from kafka.record import MemoryRecords, MemoryRecordsBuilder
5-
from kafka.errors import CorruptRecordException
5+
from kafka.errors import CorruptRecordError
66

77
from test.testutil import maybe_skip_unsupported_compression
88

@@ -174,7 +174,7 @@ def test_memory_records_corrupt():
174174
b"\x00\x00\x00\x03" # Length=3
175175
b"\xfe\xb0\x1d", # Some random bytes
176176
)
177-
with pytest.raises(CorruptRecordException):
177+
with pytest.raises(CorruptRecordError):
178178
records.next_batch()
179179

180180

0 commit comments

Comments
 (0)