Skip to content

Commit a2f8c58

Browse files
committed
Avoid self refcount in log messages; test thread close on all pythons
1 parent 99c08e6 commit a2f8c58

File tree

3 files changed

+51
-55
lines changed

3 files changed

+51
-55
lines changed

kafka/producer/kafka.py

+19-19
Original file line numberDiff line numberDiff line change
@@ -409,9 +409,9 @@ def __init__(self, **configs):
409409
else:
410410
self.config['api_version'] = tuple(map(int, deprecated.split('.')))
411411
log.warning('%s: use api_version=%s [tuple] -- "%s" as str is deprecated',
412-
self, str(self.config['api_version']), deprecated)
412+
str(self), str(self.config['api_version']), deprecated)
413413

414-
log.debug("%s: Starting Kafka producer", self)
414+
log.debug("%s: Starting Kafka producer", str(self))
415415

416416
# Configure metrics
417417
if self.config['metrics_enabled']:
@@ -467,26 +467,26 @@ def __init__(self, **configs):
467467
metadata=self._metadata,
468468
)
469469
if self._transaction_manager.is_transactional():
470-
log.info("%s: Instantiated a transactional producer.", self)
470+
log.info("%s: Instantiated a transactional producer.", str(self))
471471
else:
472-
log.info("%s: Instantiated an idempotent producer.", self)
472+
log.info("%s: Instantiated an idempotent producer.", str(self))
473473

474474
if 'retries' not in user_provided_configs:
475-
log.info("%s: Overriding the default 'retries' config to 3 since the idempotent producer is enabled.", self)
475+
log.info("%s: Overriding the default 'retries' config to 3 since the idempotent producer is enabled.", str(self))
476476
self.config['retries'] = 3
477477
elif self.config['retries'] == 0:
478478
raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.")
479479

480480
if 'max_in_flight_requests_per_connection' not in user_provided_configs:
481-
log.info("%s: Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.", self)
481+
log.info("%s: Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.", str(self))
482482
self.config['max_in_flight_requests_per_connection'] = 1
483483
elif self.config['max_in_flight_requests_per_connection'] != 1:
484484
raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to 1 in order"
485485
" to use the idempotent producer."
486486
" Otherwise we cannot guarantee idempotence.")
487487

488488
if 'acks' not in user_provided_configs:
489-
log.info("%s: Overriding the default 'acks' config to 'all' since idempotence is enabled", self)
489+
log.info("%s: Overriding the default 'acks' config to 'all' since idempotence is enabled", str(self))
490490
self.config['acks'] = -1
491491
elif self.config['acks'] != -1:
492492
raise Errors.KafkaConfigurationError("Must set 'acks' config to 'all' in order to use the idempotent"
@@ -510,7 +510,7 @@ def __init__(self, **configs):
510510

511511
self._cleanup = self._cleanup_factory()
512512
atexit.register(self._cleanup)
513-
log.debug("%s: Kafka producer started", self)
513+
log.debug("%s: Kafka producer started", str(self))
514514

515515
def bootstrap_connected(self):
516516
"""Return True if the bootstrap is connected."""
@@ -565,7 +565,7 @@ def __getattr__(self, name):
565565
self._unregister_cleanup()
566566

567567
if not hasattr(self, '_closed') or self._closed:
568-
log.info('%s: Kafka producer closed', self)
568+
log.info('%s: Kafka producer closed', str(self))
569569
return
570570
if timeout is None:
571571
# threading.TIMEOUT_MAX is available in Python3.3+
@@ -575,7 +575,7 @@ def __getattr__(self, name):
575575
else:
576576
assert timeout >= 0
577577

578-
log.info("%s: Closing the Kafka producer with %s secs timeout.", self, timeout)
578+
log.info("%s: Closing the Kafka producer with %s secs timeout.", str(self), timeout)
579579
self.flush(timeout)
580580
invoked_from_callback = bool(threading.current_thread() is self._sender)
581581
if timeout > 0:
@@ -584,7 +584,7 @@ def __getattr__(self, name):
584584
" prevent useless blocking due to self-join. This"
585585
" means you have incorrectly invoked close with a"
586586
" non-zero timeout from the producer call-back.",
587-
self, timeout)
587+
str(self), timeout)
588588
else:
589589
# Try to close gracefully.
590590
if self._sender is not None:
@@ -594,7 +594,7 @@ def __getattr__(self, name):
594594
if self._sender is not None and self._sender.is_alive():
595595
log.info("%s: Proceeding to force close the producer since pending"
596596
" requests could not be completed within timeout %s.",
597-
self, timeout)
597+
str(self), timeout)
598598
self._sender.force_close()
599599

600600
if self._metrics:
@@ -608,7 +608,7 @@ def __getattr__(self, name):
608608
except AttributeError:
609609
pass
610610
self._closed = True
611-
log.debug("%s: The Kafka producer has closed.", self)
611+
log.debug("%s: The Kafka producer has closed.", str(self))
612612

613613
def partitions_for(self, topic):
614614
"""Returns set of all known partitions for the topic."""
@@ -817,7 +817,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
817817
self._ensure_valid_record_size(message_size)
818818

819819
tp = TopicPartition(topic, partition)
820-
log.debug("%s: Sending (key=%r value=%r headers=%r) to %s", self, key, value, headers, tp)
820+
log.debug("%s: Sending (key=%r value=%r headers=%r) to %s", str(self), key, value, headers, tp)
821821

822822
if self._transaction_manager and self._transaction_manager.is_transactional():
823823
self._transaction_manager.maybe_add_partition_to_transaction(tp)
@@ -827,15 +827,15 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
827827
future, batch_is_full, new_batch_created = result
828828
if batch_is_full or new_batch_created:
829829
log.debug("%s: Waking up the sender since %s is either full or"
830-
" getting a new batch", self, tp)
830+
" getting a new batch", str(self), tp)
831831
self._sender.wakeup()
832832

833833
return future
834834
# handling exceptions and record the errors;
835835
# for API exceptions return them in the future,
836836
# for other exceptions raise directly
837837
except Errors.BrokerResponseError as e:
838-
log.error("%s: Exception occurred during message send: %s", self, e)
838+
log.error("%s: Exception occurred during message send: %s", str(self), e)
839839
return FutureRecordMetadata(
840840
FutureProduceResult(TopicPartition(topic, partition)),
841841
-1, None, None,
@@ -866,7 +866,7 @@ def flush(self, timeout=None):
866866
KafkaTimeoutError: failure to flush buffered records within the
867867
provided timeout
868868
"""
869-
log.debug("%s: Flushing accumulated records in producer.", self)
869+
log.debug("%s: Flushing accumulated records in producer.", str(self))
870870
self._accumulator.begin_flush()
871871
self._sender.wakeup()
872872
self._accumulator.await_flush_completion(timeout=timeout)
@@ -912,7 +912,7 @@ def _wait_on_metadata(self, topic, max_wait):
912912
if not metadata_event:
913913
metadata_event = threading.Event()
914914

915-
log.debug("%s: Requesting metadata update for topic %s", self, topic)
915+
log.debug("%s: Requesting metadata update for topic %s", str(self), topic)
916916

917917
metadata_event.clear()
918918
future = self._metadata.request_update()
@@ -926,7 +926,7 @@ def _wait_on_metadata(self, topic, max_wait):
926926
raise Errors.TopicAuthorizationFailedError(set([topic]))
927927
else:
928928
elapsed = time.time() - begin
929-
log.debug("%s: _wait_on_metadata woke after %s secs.", self, elapsed)
929+
log.debug("%s: _wait_on_metadata woke after %s secs.", str(self), elapsed)
930930

931931
def _serialize(self, f, topic, data):
932932
if not f:

0 commit comments

Comments
 (0)