Skip to content

Commit c2fe7c3

Browse files
authored
KIP-98: Add Consumer support for READ_COMMITTED (#2582)
1 parent 103025f commit c2fe7c3

File tree

8 files changed

+162
-24
lines changed

8 files changed

+162
-24
lines changed

kafka/consumer/fetcher.py

+81-17
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import kafka.errors as Errors
1313
from kafka.future import Future
1414
from kafka.metrics.stats import Avg, Count, Max, Rate
15-
from kafka.protocol.fetch import FetchRequest
15+
from kafka.protocol.fetch import FetchRequest, AbortedTransaction
1616
from kafka.protocol.list_offsets import (
1717
ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET
1818
)
@@ -28,6 +28,11 @@
2828
READ_UNCOMMITTED = 0
2929
READ_COMMITTED = 1
3030

31+
ISOLATION_LEVEL_CONFIG = {
32+
'read_uncommitted': READ_UNCOMMITTED,
33+
'read_committed': READ_COMMITTED,
34+
}
35+
3136
ConsumerRecord = collections.namedtuple("ConsumerRecord",
3237
["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type",
3338
"key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"])
@@ -60,6 +65,7 @@ class Fetcher(six.Iterator):
6065
'metric_group_prefix': 'consumer',
6166
'retry_backoff_ms': 100,
6267
'enable_incremental_fetch_sessions': True,
68+
'isolation_level': 'read_uncommitted',
6369
}
6470

6571
def __init__(self, client, subscriptions, **configs):
@@ -100,12 +106,18 @@ def __init__(self, client, subscriptions, **configs):
100106
consumed. This ensures no on-the-wire or on-disk corruption to
101107
the messages occurred. This check adds some overhead, so it may
102108
be disabled in cases seeking extreme performance. Default: True
109+
isolation_level (str): Configure KIP-98 transactional consumer by
110+
setting to 'read_committed'. This will cause the consumer to
111+
skip records from aborted tranactions. Default: 'read_uncommitted'
103112
"""
104113
self.config = copy.copy(self.DEFAULT_CONFIG)
105114
for key in self.config:
106115
if key in configs:
107116
self.config[key] = configs[key]
108117

118+
if self.config['isolation_level'] not in ISOLATION_LEVEL_CONFIG:
119+
raise Errors.KafkaConfigurationError('Unrecognized isolation_level')
120+
109121
self._client = client
110122
self._subscriptions = subscriptions
111123
self._completed_fetches = collections.deque() # Unparsed responses
@@ -116,7 +128,7 @@ def __init__(self, client, subscriptions, **configs):
116128
self._sensors = FetchManagerMetrics(self.config['metrics'], self.config['metric_group_prefix'])
117129
else:
118130
self._sensors = None
119-
self._isolation_level = READ_UNCOMMITTED
131+
self._isolation_level = ISOLATION_LEVEL_CONFIG[self.config['isolation_level']]
120132
self._session_handlers = {}
121133
self._nodes_with_pending_fetch_requests = set()
122134

@@ -244,7 +256,7 @@ def _reset_offset(self, partition, timeout_ms=None):
244256
else:
245257
raise NoOffsetForPartitionError(partition)
246258

247-
log.debug("Resetting offset for partition %s to %s offset.",
259+
log.debug("Resetting offset for partition %s to offset %s.",
248260
partition, strategy)
249261
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms=timeout_ms)
250262

@@ -765,14 +777,21 @@ def _parse_fetched_data(self, completed_fetch):
765777
return None
766778

767779
records = MemoryRecords(completed_fetch.partition_data[-1])
780+
aborted_transactions = None
781+
if completed_fetch.response_version >= 11:
782+
aborted_transactions = completed_fetch.partition_data[-3]
783+
elif completed_fetch.response_version >= 4:
784+
aborted_transactions = completed_fetch.partition_data[-2]
768785
log.debug("Preparing to read %s bytes of data for partition %s with offset %d",
769786
records.size_in_bytes(), tp, fetch_offset)
770787
parsed_records = self.PartitionRecords(fetch_offset, tp, records,
771-
self.config['key_deserializer'],
772-
self.config['value_deserializer'],
773-
self.config['check_crcs'],
774-
completed_fetch.metric_aggregator,
775-
self._on_partition_records_drain)
788+
key_deserializer=self.config['key_deserializer'],
789+
value_deserializer=self.config['value_deserializer'],
790+
check_crcs=self.config['check_crcs'],
791+
isolation_level=self._isolation_level,
792+
aborted_transactions=aborted_transactions,
793+
metric_aggregator=completed_fetch.metric_aggregator,
794+
on_drain=self._on_partition_records_drain)
776795
if not records.has_next() and records.size_in_bytes() > 0:
777796
if completed_fetch.response_version < 3:
778797
# Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
@@ -845,13 +864,23 @@ def close(self):
845864
self._next_partition_records.drain()
846865

847866
class PartitionRecords(object):
848-
def __init__(self, fetch_offset, tp, records, key_deserializer, value_deserializer, check_crcs, metric_aggregator, on_drain):
867+
def __init__(self, fetch_offset, tp, records,
868+
key_deserializer=None, value_deserializer=None,
869+
check_crcs=True, isolation_level=READ_UNCOMMITTED,
870+
aborted_transactions=None, # raw data from response / list of (producer_id, first_offset) tuples
871+
metric_aggregator=None, on_drain=lambda x: None):
849872
self.fetch_offset = fetch_offset
850873
self.topic_partition = tp
851874
self.leader_epoch = -1
852875
self.next_fetch_offset = fetch_offset
853876
self.bytes_read = 0
854877
self.records_read = 0
878+
self.isolation_level = isolation_level
879+
self.aborted_producer_ids = set()
880+
self.aborted_transactions = collections.deque(
881+
sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [],
882+
key=lambda txn: txn.first_offset)
883+
)
855884
self.metric_aggregator = metric_aggregator
856885
self.check_crcs = check_crcs
857886
self.record_iterator = itertools.dropwhile(
@@ -900,18 +929,35 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
900929
"Record batch for partition %s at offset %s failed crc check" % (
901930
self.topic_partition, batch.base_offset))
902931

932+
903933
# Try DefaultsRecordBatch / message log format v2
904-
# base_offset, last_offset_delta, and control batches
934+
# base_offset, last_offset_delta, aborted transactions, and control batches
905935
if batch.magic == 2:
906936
self.leader_epoch = batch.leader_epoch
937+
if self.isolation_level == READ_COMMITTED and batch.has_producer_id():
938+
# remove from the aborted transaction queue all aborted transactions which have begun
939+
# before the current batch's last offset and add the associated producerIds to the
940+
# aborted producer set
941+
self._consume_aborted_transactions_up_to(batch.last_offset)
942+
943+
producer_id = batch.producer_id
944+
if self._contains_abort_marker(batch):
945+
try:
946+
self.aborted_producer_ids.remove(producer_id)
947+
except KeyError:
948+
pass
949+
elif self._is_batch_aborted(batch):
950+
log.debug("Skipping aborted record batch from partition %s with producer_id %s and"
951+
" offsets %s to %s",
952+
self.topic_partition, producer_id, batch.base_offset, batch.last_offset)
953+
self.next_fetch_offset = batch.next_offset
954+
batch = records.next_batch()
955+
continue
956+
907957
# Control batches have a single record indicating whether a transaction
908-
# was aborted or committed.
909-
# When isolation_level is READ_COMMITTED (currently unsupported)
910-
# we should also skip all messages from aborted transactions
911-
# For now we only support READ_UNCOMMITTED and so we ignore the
912-
# abort/commit signal.
958+
# was aborted or committed. These are not returned to the consumer.
913959
if batch.is_control_batch:
914-
self.next_fetch_offset = next(batch).offset + 1
960+
self.next_fetch_offset = batch.next_offset
915961
batch = records.next_batch()
916962
continue
917963

@@ -944,7 +990,7 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
944990
# unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck
945991
# fetching the same batch repeatedly).
946992
if last_batch and last_batch.magic == 2:
947-
self.next_fetch_offset = last_batch.base_offset + last_batch.last_offset_delta + 1
993+
self.next_fetch_offset = last_batch.next_offset
948994
self.drain()
949995

950996
# If unpacking raises StopIteration, it is erroneously
@@ -961,6 +1007,24 @@ def _deserialize(self, f, topic, bytes_):
9611007
return f.deserialize(topic, bytes_)
9621008
return f(bytes_)
9631009

1010+
def _consume_aborted_transactions_up_to(self, offset):
1011+
if not self.aborted_transactions:
1012+
return
1013+
1014+
while self.aborted_transactions and self.aborted_transactions[0].first_offset <= offset:
1015+
self.aborted_producer_ids.add(self.aborted_transactions.popleft().producer_id)
1016+
1017+
def _is_batch_aborted(self, batch):
1018+
return batch.is_transactional and batch.producer_id in self.aborted_producer_ids
1019+
1020+
def _contains_abort_marker(self, batch):
1021+
if not batch.is_control_batch:
1022+
return False
1023+
record = next(batch)
1024+
if not record:
1025+
return False
1026+
return record.abort
1027+
9641028

9651029
class FetchSessionHandler(object):
9661030
"""

kafka/consumer/group.py

+4
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ class KafkaConsumer(six.Iterator):
121121
consumed. This ensures no on-the-wire or on-disk corruption to
122122
the messages occurred. This check adds some overhead, so it may
123123
be disabled in cases seeking extreme performance. Default: True
124+
isolation_level (str): Configure KIP-98 transactional consumer by
125+
setting to 'read_committed'. This will cause the consumer to
126+
skip records from aborted tranactions. Default: 'read_uncommitted'
124127
allow_auto_create_topics (bool): Enable/disable auto topic creation
125128
on metadata request. Only available with api_version >= (0, 11).
126129
Default: True
@@ -290,6 +293,7 @@ class KafkaConsumer(six.Iterator):
290293
'auto_commit_interval_ms': 5000,
291294
'default_offset_commit_callback': lambda offsets, response: True,
292295
'check_crcs': True,
296+
'isolation_level': 'read_uncommitted',
293297
'allow_auto_create_topics': True,
294298
'metadata_max_age_ms': 5 * 60 * 1000,
295299
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),

kafka/consumer/subscription_state.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
import abc
4-
from collections import defaultdict, OrderedDict
4+
from collections import OrderedDict
55
try:
66
from collections.abc import Sequence
77
except ImportError:

kafka/protocol/fetch.py

+6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
from __future__ import absolute_import
22

3+
import collections
4+
35
from kafka.protocol.api import Request, Response
46
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String, Bytes
57

68

9+
AbortedTransaction = collections.namedtuple("AbortedTransaction",
10+
["producer_id", "first_offset"])
11+
12+
713
class FetchResponse_v0(Response):
814
API_KEY = 1
915
API_VERSION = 0

kafka/record/abc.py

+10
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ def __iter__(self):
110110
if needed.
111111
"""
112112

113+
@abc.abstractproperty
114+
def base_offset(self):
115+
""" Return base offset for batch
116+
"""
117+
118+
@abc.abstractproperty
119+
def size_in_bytes(self):
120+
""" Return size of batch in bytes (includes header overhead)
121+
"""
122+
113123
@abc.abstractproperty
114124
def magic(self):
115125
""" Return magic value (0, 1, 2) for batch.

kafka/record/default_records.py

+45
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ class DefaultRecordBase(object):
104104

105105
LOG_APPEND_TIME = 1
106106
CREATE_TIME = 0
107+
NO_PRODUCER_ID = -1
108+
NO_SEQUENCE = -1
109+
MAX_INT = 2147483647
107110

108111
def _assert_has_codec(self, compression_type):
109112
if compression_type == self.CODEC_GZIP:
@@ -136,6 +139,10 @@ def __init__(self, buffer):
136139
def base_offset(self):
137140
return self._header_data[0]
138141

142+
@property
143+
def size_in_bytes(self):
144+
return self._header_data[1] + self.AFTER_LEN_OFFSET
145+
139146
@property
140147
def leader_epoch(self):
141148
return self._header_data[2]
@@ -156,6 +163,14 @@ def attributes(self):
156163
def last_offset_delta(self):
157164
return self._header_data[6]
158165

166+
@property
167+
def last_offset(self):
168+
return self.base_offset + self.last_offset_delta
169+
170+
@property
171+
def next_offset(self):
172+
return self.last_offset + 1
173+
159174
@property
160175
def compression_type(self):
161176
return self.attributes & self.CODEC_MASK
@@ -180,6 +195,36 @@ def first_timestamp(self):
180195
def max_timestamp(self):
181196
return self._header_data[8]
182197

198+
@property
199+
def producer_id(self):
200+
return self._header_data[9]
201+
202+
def has_producer_id(self):
203+
return self.producer_id > self.NO_PRODUCER_ID
204+
205+
@property
206+
def producer_epoch(self):
207+
return self._header_data[10]
208+
209+
@property
210+
def base_sequence(self):
211+
return self._header_data[11]
212+
213+
@property
214+
def last_sequence(self):
215+
if self.base_sequence == self.NO_SEQUENCE:
216+
return self.NO_SEQUENCE
217+
return self._increment_sequence(self.base_sequence, self.last_offset_delta)
218+
219+
def _increment_sequence(self, base, increment):
220+
if base > (self.MAX_INT - increment):
221+
return increment - (self.MAX_INT - base) - 1
222+
return base + increment
223+
224+
@property
225+
def records_count(self):
226+
return self._header_data[12]
227+
183228
def _maybe_uncompress(self):
184229
if not self._decompressed:
185230
compression_type = self.compression_type

kafka/record/legacy_records.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def _assert_has_codec(self, compression_type):
129129

130130
class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
131131

132-
__slots__ = ("_buffer", "_magic", "_offset", "_crc", "_timestamp",
132+
__slots__ = ("_buffer", "_magic", "_offset", "_length", "_crc", "_timestamp",
133133
"_attributes", "_decompressed")
134134

135135
def __init__(self, buffer, magic):
@@ -141,11 +141,20 @@ def __init__(self, buffer, magic):
141141
assert magic == magic_
142142

143143
self._offset = offset
144+
self._length = length
144145
self._crc = crc
145146
self._timestamp = timestamp
146147
self._attributes = attrs
147148
self._decompressed = False
148149

150+
@property
151+
def base_offset(self):
152+
return self._offset
153+
154+
@property
155+
def size_in_bytes(self):
156+
return self._length + self.LOG_OVERHEAD
157+
149158
@property
150159
def timestamp_type(self):
151160
"""0 for CreateTime; 1 for LogAppendTime; None if unsupported.

0 commit comments

Comments
 (0)