Skip to content

Commit 7bc9703

Browse files
committed
KafkaProducer
1 parent c600344 commit 7bc9703

File tree

1 file changed

+103
-6
lines changed

1 file changed

+103
-6
lines changed

kafka/producer/kafka.py

+103-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
2020
from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
2121
from kafka.producer.sender import Sender
22-
from kafka.producer.transaction_state import TransactionState
22+
from kafka.producer.transaction_manager import TransactionManager
2323
from kafka.record.default_records import DefaultRecordBatchBuilder
2424
from kafka.record.legacy_records import LegacyRecordBatchBuilder
2525
from kafka.serializer import Serializer
@@ -318,6 +318,8 @@ class KafkaProducer(object):
318318
'key_serializer': None,
319319
'value_serializer': None,
320320
'enable_idempotence': False,
321+
'transactional_id': None,
322+
'transaction_timeout_ms': 60000,
321323
'acks': 1,
322324
'bootstrap_topics_filter': set(),
323325
'compression_type': None,
@@ -444,9 +446,28 @@ def __init__(self, **configs):
444446
assert checker(), "Libraries for {} compression codec not found".format(ct)
445447
self.config['compression_attrs'] = compression_attrs
446448

447-
self._transaction_state = None
449+
self._metadata = client.cluster
450+
self._transaction_manager = None
451+
self._init_transactions_result = None
452+
if 'enable_idempotence' in user_provided_configs and not self.config['enable_idempotence'] and self.config['transactional_id']:
453+
raise Errors.KafkaConfigurationError("Cannot set transactional_id without enable_idempotence.")
454+
455+
if self.config['transactional_id']:
456+
self.config['enable_idempotence'] = True
457+
448458
if self.config['enable_idempotence']:
449-
self._transaction_state = TransactionState()
459+
self._transaction_manager = TransactionManager(
460+
transactional_id=self.config['transactional_id'],
461+
transaction_timeout_ms=self.config['transaction_timeout_ms'],
462+
retry_backoff_ms=self.config['retry_backoff_ms'],
463+
api_version=self.config['api_version'],
464+
metadata=self._metadata,
465+
)
466+
if self._transaction_manager.is_transactional():
467+
log.info("Instantiated a transactional producer.")
468+
else:
469+
log.info("Instantiated an idempotent producer.")
470+
450471
if 'retries' not in user_provided_configs:
451472
log.info("Overriding the default 'retries' config to 3 since the idempotent producer is enabled.")
452473
self.config['retries'] = 3
@@ -470,15 +491,14 @@ def __init__(self, **configs):
470491

471492
message_version = self.max_usable_produce_magic(self.config['api_version'])
472493
self._accumulator = RecordAccumulator(
473-
transaction_state=self._transaction_state,
494+
transaction_manager=self._transaction_manager,
474495
message_version=message_version,
475496
**self.config)
476-
self._metadata = client.cluster
477497
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
478498
self._sender = Sender(client, self._metadata,
479499
self._accumulator,
480500
metrics=self._metrics,
481-
transaction_state=self._transaction_state,
501+
transaction_manager=self._transaction_manager,
482502
guarantee_message_order=guarantee_message_order,
483503
**self.config)
484504
self._sender.daemon = True
@@ -610,6 +630,79 @@ def _estimate_size_in_bytes(self, key, value, headers=[]):
610630
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
611631
magic, self.config['compression_type'], key, value)
612632

633+
def init_transactions(self):
634+
"""
635+
Needs to be called before any other methods when the transactional.id is set in the configuration.
636+
637+
This method does the following:
638+
1. Ensures any transactions initiated by previous instances of the producer with the same
639+
transactional_id are completed. If the previous instance had failed with a transaction in
640+
progress, it will be aborted. If the last transaction had begun completion,
641+
but not yet finished, this method awaits its completion.
642+
2. Gets the internal producer id and epoch, used in all future transactional
643+
messages issued by the producer.
644+
645+
Note that this method will raise KafkaTimeoutError if the transactional state cannot
646+
be initialized before expiration of `max_block_ms`. It is safe to retry, but once the
647+
transactional state has been successfully initialized, this method should no longer be used.
648+
649+
Raises:
650+
IllegalStateError: if no transactional_id has been configured
651+
AuthorizationError: fatal error indicating that the configured
652+
transactional_id is not authorized.
653+
KafkaError: if the producer has encountered a previous fatal error or for any other unexpected error
654+
KafkaTimeoutError: if the time taken for initialize the transaction has surpassed `max.block.ms`.
655+
"""
656+
if not self._transaction_manager:
657+
raise Errors.IllegalStateError("Cannot call init_transactions without setting a transactional_id.")
658+
if self._init_transactions_result is None:
659+
self._init_transactions_result = self._transaction_manager.initialize_transactions()
660+
self._sender.wakeup()
661+
662+
if self._init_transactions_result.wait(timeout_ms=self.config['max_block_ms']):
663+
self._init_transactions_result = None
664+
else:
665+
raise Errors.KafkaTimeoutError("Timeout expired while initializing transactional state in %s ms." % (self.config['max_block_ms'],))
666+
667+
def begin_transaction(self):
668+
""" Should be called before the start of each new transaction.
669+
670+
Note that prior to the first invocation of this method,
671+
you must invoke `init_transactions()` exactly one time.
672+
673+
Raises:
674+
ProducerFencedError if another producer is with the same
675+
transactional_id is active.
676+
"""
677+
# Set the transactional bit in the producer.
678+
if not self._transaction_manager:
679+
raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions")
680+
self._transaction_manager.begin_transaction()
681+
682+
def commit_transaction(self):
683+
""" Commits the ongoing transaction.
684+
685+
Raises: ProducerFencedError if another producer with the same
686+
transactional_id is active.
687+
"""
688+
if not self._transaction_manager:
689+
raise Errors.IllegalStateError("Cannot commit transaction since transactions are not enabled")
690+
result = self._transaction_manager.begin_commit()
691+
self._sender.wakeup()
692+
result.wait()
693+
694+
def abort_transaction(self):
695+
""" Aborts the ongoing transaction.
696+
697+
Raises: ProducerFencedError if another producer with the same
698+
transactional_id is active.
699+
"""
700+
if not self._transaction_manager:
701+
raise Errors.IllegalStateError("Cannot abort transaction since transactions are not enabled.")
702+
result = self._transaction_manager.begin_abort()
703+
self._sender.wakeup()
704+
result.wait()
705+
613706
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
614707
"""Publish a message to a topic.
615708
@@ -687,6 +780,10 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
687780

688781
tp = TopicPartition(topic, partition)
689782
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
783+
784+
if self._transaction_manager and self._transaction_manager.is_transactional():
785+
self._transaction_manager.maybe_add_partition_to_transaction(tp)
786+
690787
result = self._accumulator.append(tp, timestamp_ms,
691788
key_bytes, value_bytes, headers)
692789
future, batch_is_full, new_batch_created = result

0 commit comments

Comments
 (0)