1616Kafka admin client: create, view, alter, and delete topics and resources.
1717"""
1818import gc
19+ # Importing GC to disable it during callback to prevent AdminClient destruction from
20+ # librdkafka thread: if Python's garbage collection triggers during callback, it will
21+ # try to destroy AdminClient objects from librdkafka's background thread, which librdkafka
22+ # explicitly forbids and would send ABORT signal to fail the test.
1923import warnings
2024import concurrent .futures
2125from typing import Any , Dict , List , Optional , Union , Tuple , Set
@@ -137,7 +141,7 @@ def _make_topics_result(f: concurrent.futures.Future, futmap: Dict[str, concurre
137141 The result value of each (successful) future is None.
138142 """
139143 gc_was_enabled = gc .isenabled ()
140- gc .disable ()
144+ gc .disable () # See gc import comment for preventing librdkafka thread segfault
141145 try :
142146 result = f .result ()
143147 for topic , error in result .items ():
@@ -167,7 +171,7 @@ def _make_resource_result(f: concurrent.futures.Future,
167171 The result value of each (successful) future is a ConfigResource.
168172 """
169173 gc_was_enabled = gc .isenabled ()
170- gc .disable ()
174+ gc .disable () # See gc import comment for preventing librdkafka thread segfault
171175 try :
172176 result = f .result ()
173177 for resource , configs in result .items ():
@@ -201,7 +205,7 @@ def _make_consumer_groups_result(f: concurrent.futures.Future,
201205 Map per-group results to per-group futures in futmap.
202206 """
203207 gc_was_enabled = gc .isenabled ()
204- gc .disable ()
208+ gc .disable () # See gc import comment for preventing librdkafka thread segfault
205209 try :
206210 results = f .result ()
207211 futmap_values = list (futmap .values ())
@@ -231,11 +235,8 @@ def _make_consumer_group_offsets_result(f: concurrent.futures.Future,
231235 Map per-group results to per-group futures in futmap.
232236 The result value of each (successful) future is ConsumerGroupTopicPartitions.
233237 """
234- # Disable GC during callback to prevent AdminClient destruction from librdkafka thread.
235- # This callback runs in librdkafka's background thread, and if GC runs here, it may
236- # try to destroy AdminClient objects, which librdkafka forbids from its own threads.
237238 gc_was_enabled = gc .isenabled ()
238- gc .disable ()
239+ gc .disable () # See gc import comment for preventing librdkafka thread segfault
239240
240241 try :
241242 results = f .result ()
@@ -268,7 +269,7 @@ def _make_acls_result(f: concurrent.futures.Future, futmap: Dict[Any, concurrent
268269 For delete_acls the result value of each (successful) future is the list of deleted AclBindings.
269270 """
270271 gc_was_enabled = gc .isenabled ()
271- gc .disable ()
272+ gc .disable () # See gc import comment for preventing librdkafka thread segfault
272273 try :
273274 results = f .result ()
274275 futmap_values = list (futmap .values ())
@@ -295,7 +296,7 @@ def _make_acls_result(f: concurrent.futures.Future, futmap: Dict[Any, concurrent
295296 def _make_futmap_result_from_list (f : concurrent .futures .Future ,
296297 futmap : Dict [Any , concurrent .futures .Future ]) -> None :
297298 gc_was_enabled = gc .isenabled ()
298- gc .disable ()
299+ gc .disable () # See gc import comment for preventing librdkafka thread segfault
299300 try :
300301 results = f .result ()
301302 futmap_values = list (futmap .values ())
@@ -321,7 +322,7 @@ def _make_futmap_result_from_list(f: concurrent.futures.Future,
321322 @staticmethod
322323 def _make_futmap_result (f : concurrent .futures .Future , futmap : Dict [str , concurrent .futures .Future ]) -> None :
323324 gc_was_enabled = gc .isenabled ()
324- gc .disable ()
325+ gc .disable () # See gc import comment for preventing librdkafka thread segfault
325326 try :
326327 results = f .result ()
327328 len_results = len (results )
0 commit comments