Skip to content

KIP-98: Transactional Producer #2587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b7d2f4f
Add transactional kwargs to MemoryRecordsBuilder
dpkp Apr 4, 2025
f9b7c00
DefaultRecordBatch.has_sequence / ProducerBatch.has_sequence
dpkp Apr 7, 2025
a943442
Add producer_epoch to records builder
dpkp Apr 7, 2025
a1145ac
Add __str__ to DefaultRecordBatch + Builder
dpkp Apr 7, 2025
c980f0c
TransactionManager - producer only (no consumer offsets)
dpkp Apr 5, 2025
1561d45
Drop producer TransactionState
dpkp Apr 5, 2025
e9cc202
RecordAccumulator
dpkp Apr 6, 2025
8560167
Sender
dpkp Apr 6, 2025
9d57758
KafkaProducer
dpkp Apr 6, 2025
ff27b06
fixup record_accumulator test
dpkp Apr 8, 2025
937bfa9
transaction manager comment typo
dpkp Apr 8, 2025
1f99769
Revert KAFKA-5494 / idempotent producer with multiple inflight requests
dpkp Apr 8, 2025
a586ad7
sender fixes
dpkp Apr 8, 2025
1ed9891
check is_send_to_partitions_allowed
dpkp Apr 8, 2025
5b205c3
prefix test-only methods
dpkp Apr 8, 2025
4cad6f1
test_sender updates for transaction manager
dpkp Apr 8, 2025
ab28572
Check reset_producer_id for out of order sequence number on idempoten…
dpkp Apr 8, 2025
5ef6595
fixup set_producer_id_and_epoch from sender
dpkp Apr 15, 2025
243b55b
add small timeout to producer/consumer fixture closes in test_producer
dpkp Apr 15, 2025
1321f98
Do not update reconnect backoff when closing w/o error
dpkp Apr 15, 2025
549d8cb
Move producer unit test to test/test_producer.py
dpkp Apr 15, 2025
716b912
Transactional/Idempotent producer requires 0.11+
dpkp Apr 15, 2025
318f331
Very basic idempotent producer integration test
dpkp Apr 15, 2025
f231983
Handle empty batch lists from accumulator.drain
dpkp Apr 15, 2025
852e550
more transaction manager typos
dpkp Apr 15, 2025
f67ca6a
Improve producer.init_transactions() semantics
dpkp Apr 15, 2025
97d2895
Test producer transaction
dpkp Apr 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,8 @@ def close(self, error=None):
if self.state is ConnectionStates.DISCONNECTED:
return
log.log(logging.ERROR if error else logging.INFO, '%s: Closing connection. %s', self, error or '')
self._update_reconnect_backoff()
if error:
self._update_reconnect_backoff()
self._api_versions_future = None
self._sasl_auth_future = None
self._init_sasl_mechanism()
Expand Down
116 changes: 110 additions & 6 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
from kafka.producer.sender import Sender
from kafka.producer.transaction_state import TransactionState
from kafka.producer.transaction_manager import TransactionManager
from kafka.record.default_records import DefaultRecordBatchBuilder
from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
Expand Down Expand Up @@ -318,6 +318,8 @@ class KafkaProducer(object):
'key_serializer': None,
'value_serializer': None,
'enable_idempotence': False,
'transactional_id': None,
'transaction_timeout_ms': 60000,
'acks': 1,
'bootstrap_topics_filter': set(),
'compression_type': None,
Expand Down Expand Up @@ -444,9 +446,30 @@ def __init__(self, **configs):
assert checker(), "Libraries for {} compression codec not found".format(ct)
self.config['compression_attrs'] = compression_attrs

self._transaction_state = None
self._metadata = client.cluster
self._transaction_manager = None
self._init_transactions_result = None
if 'enable_idempotence' in user_provided_configs and not self.config['enable_idempotence'] and self.config['transactional_id']:
raise Errors.KafkaConfigurationError("Cannot set transactional_id without enable_idempotence.")

if self.config['transactional_id']:
self.config['enable_idempotence'] = True

if self.config['enable_idempotence']:
self._transaction_state = TransactionState()
assert self.config['api_version'] >= (0, 11), "Transactional/Idempotent producer requires >= Kafka 0.11 Brokers"

self._transaction_manager = TransactionManager(
transactional_id=self.config['transactional_id'],
transaction_timeout_ms=self.config['transaction_timeout_ms'],
retry_backoff_ms=self.config['retry_backoff_ms'],
api_version=self.config['api_version'],
metadata=self._metadata,
)
if self._transaction_manager.is_transactional():
log.info("Instantiated a transactional producer.")
else:
log.info("Instantiated an idempotent producer.")

if 'retries' not in user_provided_configs:
log.info("Overriding the default 'retries' config to 3 since the idempotent producer is enabled.")
self.config['retries'] = 3
Expand All @@ -470,15 +493,14 @@ def __init__(self, **configs):

message_version = self.max_usable_produce_magic(self.config['api_version'])
self._accumulator = RecordAccumulator(
transaction_state=self._transaction_state,
transaction_manager=self._transaction_manager,
message_version=message_version,
**self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
self._sender = Sender(client, self._metadata,
self._accumulator,
metrics=self._metrics,
transaction_state=self._transaction_state,
transaction_manager=self._transaction_manager,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
Expand Down Expand Up @@ -610,6 +632,84 @@ def _estimate_size_in_bytes(self, key, value, headers=[]):
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
magic, self.config['compression_type'], key, value)

def init_transactions(self):
"""
Needs to be called before any other methods when the transactional.id is set in the configuration.

This method does the following:
1. Ensures any transactions initiated by previous instances of the producer with the same
transactional_id are completed. If the previous instance had failed with a transaction in
progress, it will be aborted. If the last transaction had begun completion,
but not yet finished, this method awaits its completion.
2. Gets the internal producer id and epoch, used in all future transactional
messages issued by the producer.

Note that this method will raise KafkaTimeoutError if the transactional state cannot
be initialized before expiration of `max_block_ms`.

Retrying after a KafkaTimeoutError will continue to wait for the prior request to succeed or fail.
Retrying after any other exception will start a new initialization attempt.
Retrying after a successful initialization will do nothing.

Raises:
IllegalStateError: if no transactional_id has been configured
AuthorizationError: fatal error indicating that the configured
transactional_id is not authorized.
KafkaError: if the producer has encountered a previous fatal error or for any other unexpected error
KafkaTimeoutError: if the time taken for initialize the transaction has surpassed `max.block.ms`.
"""
if not self._transaction_manager:
raise Errors.IllegalStateError("Cannot call init_transactions without setting a transactional_id.")
if self._init_transactions_result is None:
self._init_transactions_result = self._transaction_manager.initialize_transactions()
self._sender.wakeup()

try:
if not self._init_transactions_result.wait(timeout_ms=self.config['max_block_ms']):
raise Errors.KafkaTimeoutError("Timeout expired while initializing transactional state in %s ms." % (self.config['max_block_ms'],))
finally:
if self._init_transactions_result.failed:
self._init_transactions_result = None

def begin_transaction(self):
""" Should be called before the start of each new transaction.

Note that prior to the first invocation of this method,
you must invoke `init_transactions()` exactly one time.

Raises:
ProducerFencedError if another producer is with the same
transactional_id is active.
"""
# Set the transactional bit in the producer.
if not self._transaction_manager:
raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions")
self._transaction_manager.begin_transaction()

def commit_transaction(self):
""" Commits the ongoing transaction.

Raises: ProducerFencedError if another producer with the same
transactional_id is active.
"""
if not self._transaction_manager:
raise Errors.IllegalStateError("Cannot commit transaction since transactions are not enabled")
result = self._transaction_manager.begin_commit()
self._sender.wakeup()
result.wait()

def abort_transaction(self):
""" Aborts the ongoing transaction.

Raises: ProducerFencedError if another producer with the same
transactional_id is active.
"""
if not self._transaction_manager:
raise Errors.IllegalStateError("Cannot abort transaction since transactions are not enabled.")
result = self._transaction_manager.begin_abort()
self._sender.wakeup()
result.wait()

def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.

Expand Down Expand Up @@ -687,6 +787,10 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest

tp = TopicPartition(topic, partition)
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)

if self._transaction_manager and self._transaction_manager.is_transactional():
self._transaction_manager.maybe_add_partition_to_transaction(tp)

result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes, headers)
future, batch_is_full, new_batch_created = result
Expand Down
65 changes: 52 additions & 13 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ def record_count(self):
def producer_id(self):
return self.records.producer_id if self.records else None

@property
def producer_epoch(self):
return self.records.producer_epoch if self.records else None

@property
def has_sequence(self):
return self.records.has_sequence if self.records else False

def try_append(self, timestamp_ms, key, value, headers, now=None):
metadata = self.records.append(timestamp_ms, key, value, headers)
if metadata is None:
Expand Down Expand Up @@ -170,7 +178,7 @@ class RecordAccumulator(object):
'compression_attrs': 0,
'linger_ms': 0,
'retry_backoff_ms': 100,
'transaction_state': None,
'transaction_manager': None,
'message_version': 0,
}

Expand All @@ -181,7 +189,7 @@ def __init__(self, **configs):
self.config[key] = configs.pop(key)

self._closed = False
self._transaction_state = self.config['transaction_state']
self._transaction_manager = self.config['transaction_manager']
self._flushes_in_progress = AtomicInteger()
self._appends_in_progress = AtomicInteger()
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
Expand Down Expand Up @@ -244,7 +252,7 @@ def append(self, tp, timestamp_ms, key, value, headers):
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False

if self._transaction_state and self.config['message_version'] < 2:
if self._transaction_manager and self.config['message_version'] < 2:
raise Errors.UnsupportedVersionError("Attempting to use idempotence with a broker which"
" does not support the required message format (v2)."
" The broker must be version 0.11 or later.")
Expand Down Expand Up @@ -418,8 +426,8 @@ def ready(self, cluster, now=None):

return ready_nodes, next_ready_check, unknown_leaders_exist

def has_unsent(self):
"""Return whether there is any unsent record in the accumulator."""
def has_undrained(self):
"""Check whether there are any batches which haven't been drained"""
for tp in list(self._batches.keys()):
with self._tp_locks[tp]:
dq = self._batches[tp]
Expand Down Expand Up @@ -479,8 +487,10 @@ def drain(self, cluster, nodes, max_size, now=None):
break
else:
producer_id_and_epoch = None
if self._transaction_state:
producer_id_and_epoch = self._transaction_state.producer_id_and_epoch
if self._transaction_manager:
if not self._transaction_manager.is_send_to_partition_allowed(tp):
break
producer_id_and_epoch = self._transaction_manager.producer_id_and_epoch
if not producer_id_and_epoch.is_valid:
# we cannot send the batch until we have refreshed the PID
log.debug("Waiting to send ready batches because transaction producer id is not valid")
Expand All @@ -493,11 +503,16 @@ def drain(self, cluster, nodes, max_size, now=None):
# the previous attempt may actually have been accepted, and if we change
# the pid and sequence here, this attempt will also be accepted, causing
# a duplicate.
sequence_number = self._transaction_state.sequence_number(batch.topic_partition)
sequence_number = self._transaction_manager.sequence_number(batch.topic_partition)
log.debug("Dest: %s: %s producer_id=%s epoch=%s sequence=%s",
node_id, batch.topic_partition, producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch,
sequence_number)
batch.records.set_producer_state(producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, sequence_number)
batch.records.set_producer_state(
producer_id_and_epoch.producer_id,
producer_id_and_epoch.epoch,
sequence_number,
self._transaction_manager.is_transactional()
)
batch.records.close()
size += batch.records.size_in_bytes()
ready.append(batch)
Expand Down Expand Up @@ -544,6 +559,10 @@ def await_flush_completion(self, timeout=None):
finally:
self._flushes_in_progress.decrement()

@property
def has_incomplete(self):
return bool(self._incomplete)

def abort_incomplete_batches(self):
"""
This function is only called when sender is closed forcefully. It will fail all the
Expand All @@ -553,27 +572,41 @@ def abort_incomplete_batches(self):
# 1. Avoid losing batches.
# 2. Free up memory in case appending threads are blocked on buffer full.
# This is a tight loop but should be able to get through very quickly.
error = Errors.IllegalStateError("Producer is closed forcefully.")
while True:
self._abort_batches()
self._abort_batches(error)
if not self._appends_in_progress.get():
break
# After this point, no thread will append any messages because they will see the close
# flag set. We need to do the last abort after no thread was appending in case the there was a new
# batch appended by the last appending thread.
self._abort_batches()
self._abort_batches(error)
self._batches.clear()

def _abort_batches(self):
def _abort_batches(self, error):
"""Go through incomplete batches and abort them."""
error = Errors.IllegalStateError("Producer is closed forcefully.")
for batch in self._incomplete.all():
tp = batch.topic_partition
# Close the batch before aborting
with self._tp_locks[tp]:
batch.records.close()
self._batches[tp].remove(batch)
batch.done(exception=error)
self.deallocate(batch)

def abort_undrained_batches(self, error):
for batch in self._incomplete.all():
tp = batch.topic_partition
with self._tp_locks[tp]:
aborted = False
if not batch.is_done:
aborted = True
batch.records.close()
self._batches[tp].remove(batch)
if aborted:
batch.done(exception=error)
self.deallocate(batch)

def close(self):
"""Close this accumulator and force all the record buffers to be drained."""
self._closed = True
Expand All @@ -600,3 +633,9 @@ def remove(self, batch):
def all(self):
with self._lock:
return list(self._incomplete)

def __bool__(self):
return bool(self._incomplete)


__nonzero__ = __bool__
Loading
Loading