Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""
Kafka admin client: create, view, alter, and delete topics and resources.
"""
import gc
import warnings
import concurrent.futures
from typing import Any, Dict, List, Optional, Union, Tuple, Set
Expand Down Expand Up @@ -135,6 +136,8 @@ def _make_topics_result(f: concurrent.futures.Future, futmap: Dict[str, concurre
Map per-topic results to per-topic futures in futmap.
The result value of each (successful) future is None.
"""
gc_was_enabled = gc.isenabled()
gc.disable()
try:
result = f.result()
for topic, error in result.items():
Expand All @@ -152,6 +155,9 @@ def _make_topics_result(f: concurrent.futures.Future, futmap: Dict[str, concurre
# Request-level exception, raise the same for all topics
for topic, fut in futmap.items():
fut.set_exception(e)
finally:
if gc_was_enabled:
gc.enable()

@staticmethod
def _make_resource_result(f: concurrent.futures.Future,
Expand All @@ -160,6 +166,8 @@ def _make_resource_result(f: concurrent.futures.Future,
Map per-resource results to per-resource futures in futmap.
The result value of each (successful) future is a ConfigResource.
"""
gc_was_enabled = gc.isenabled()
gc.disable()
try:
result = f.result()
for resource, configs in result.items():
Expand All @@ -178,6 +186,9 @@ def _make_resource_result(f: concurrent.futures.Future,
# Request-level exception, raise the same for all resources
for resource, fut in futmap.items():
fut.set_exception(e)
finally:
if gc_was_enabled:
gc.enable()

@staticmethod
def _make_list_consumer_groups_result(f: concurrent.futures.Future, futmap: Any) -> None:
Expand All @@ -189,8 +200,9 @@ def _make_consumer_groups_result(f: concurrent.futures.Future,
"""
Map per-group results to per-group futures in futmap.
"""
gc_was_enabled = gc.isenabled()
gc.disable()
try:

results = f.result()
futmap_values = list(futmap.values())
len_results = len(results)
Expand All @@ -208,6 +220,9 @@ def _make_consumer_groups_result(f: concurrent.futures.Future,
# Request-level exception, raise the same for all groups
for _, fut in futmap.items():
fut.set_exception(e)
finally:
if gc_was_enabled:
gc.enable()

@staticmethod
def _make_consumer_group_offsets_result(f: concurrent.futures.Future,
Expand All @@ -216,8 +231,13 @@ def _make_consumer_group_offsets_result(f: concurrent.futures.Future,
Map per-group results to per-group futures in futmap.
The result value of each (successful) future is ConsumerGroupTopicPartitions.
"""
try:
# Disable GC during callback to prevent AdminClient destruction from librdkafka thread.
# This callback runs in librdkafka's background thread, and if GC runs here, it may
# try to destroy AdminClient objects, which librdkafka forbids from its own threads.
gc_was_enabled = gc.isenabled()
gc.disable()

try:
results = f.result()
futmap_values = list(futmap.values())
len_results = len(results)
Expand All @@ -235,6 +255,10 @@ def _make_consumer_group_offsets_result(f: concurrent.futures.Future,
# Request-level exception, raise the same for all groups
for _, fut in futmap.items():
fut.set_exception(e)
finally:
# Re-enable GC if it was enabled before
if gc_was_enabled:
gc.enable()

@staticmethod
def _make_acls_result(f: concurrent.futures.Future, futmap: Dict[Any, concurrent.futures.Future]) -> None:
Expand All @@ -243,6 +267,8 @@ def _make_acls_result(f: concurrent.futures.Future, futmap: Dict[Any, concurrent
For create_acls the result value of each (successful) future is None.
For delete_acls the result value of each (successful) future is the list of deleted AclBindings.
"""
gc_was_enabled = gc.isenabled()
gc.disable()
try:
results = f.result()
futmap_values = list(futmap.values())
Expand All @@ -261,12 +287,16 @@ def _make_acls_result(f: concurrent.futures.Future, futmap: Dict[Any, concurrent
# Request-level exception, raise the same for all the AclBindings or AclBindingFilters
for resource, fut in futmap.items():
fut.set_exception(e)
finally:
if gc_was_enabled:
gc.enable()

@staticmethod
def _make_futmap_result_from_list(f: concurrent.futures.Future,
futmap: Dict[Any, concurrent.futures.Future]) -> None:
gc_was_enabled = gc.isenabled()
gc.disable()
try:

results = f.result()
futmap_values = list(futmap.values())
len_results = len(results)
Expand All @@ -284,9 +314,14 @@ def _make_futmap_result_from_list(f: concurrent.futures.Future,
# Request-level exception, raise the same for all topics
for _, fut in futmap.items():
fut.set_exception(e)
finally:
if gc_was_enabled:
gc.enable()

@staticmethod
def _make_futmap_result(f: concurrent.futures.Future, futmap: Dict[str, concurrent.futures.Future]) -> None:
gc_was_enabled = gc.isenabled()
gc.disable()
try:
results = f.result()
len_results = len(results)
Expand All @@ -306,6 +341,9 @@ def _make_futmap_result(f: concurrent.futures.Future, futmap: Dict[str, concurre
except Exception as e:
for _, fut in futmap.items():
fut.set_exception(e)
finally:
if gc_was_enabled:
gc.enable()

@staticmethod
def _create_future() -> concurrent.futures.Future:
Expand Down Expand Up @@ -1126,7 +1164,6 @@ def list_consumer_group_offsets( # type: ignore[override]
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""

AdminClient._check_list_consumer_group_offsets_request(list_consumer_group_offsets_request)

f, futmap = AdminClient._make_futures(
Expand Down Expand Up @@ -1163,7 +1200,6 @@ def alter_consumer_group_offsets( # type: ignore[override]
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""

AdminClient._check_alter_consumer_group_offsets_request(alter_consumer_group_offsets_request)

f, futmap = AdminClient._make_futures([request.group_id for request in alter_consumer_group_offsets_request],
Expand Down