Skip to content

Acquire client lock in heartbeat thread before sending requests #2620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def _reset_find_coordinator_future(self, result):
self._find_coordinator_future = None

def lookup_coordinator(self):
with self._lock:
with self._client._lock, self._lock:
if self._find_coordinator_future is not None:
return self._find_coordinator_future

Expand Down Expand Up @@ -883,6 +883,7 @@ def _handle_leave_group_response(self, response):

def _send_heartbeat_request(self):
"""Send a heartbeat request"""
# Note: acquire both client + coordinator lock before calling
if self.coordinator_unknown():
e = Errors.CoordinatorNotAvailableError(self.coordinator_id)
return Future().failure(e)
Expand Down Expand Up @@ -1054,7 +1055,9 @@ def run(self):
heartbeat_log.debug('Heartbeat thread closed')

def _run_once(self):
with self.coordinator._client._lock, self.coordinator._lock:
self.coordinator._client._lock.acquire()
self.coordinator._lock.acquire()
try:
if self.enabled and self.coordinator.state is MemberState.STABLE:
# TODO: When consumer.wakeup() is implemented, we need to
# disable here to prevent propagating an exception to this
Expand All @@ -1063,27 +1066,26 @@ def _run_once(self):
# failure callback in consumer poll
self.coordinator._client.poll(timeout_ms=0)

with self.coordinator._lock:
if not self.enabled:
heartbeat_log.debug('Heartbeat disabled. Waiting')
self.coordinator._client._lock.release()
self.coordinator._lock.wait()
heartbeat_log.debug('Heartbeat re-enabled.')
return

if self.coordinator.state is not MemberState.STABLE:
elif self.coordinator.state is not MemberState.STABLE:
# the group is not stable (perhaps because we left the
# group or because the coordinator kicked us out), so
# disable heartbeats and wait for the main thread to rejoin.
heartbeat_log.debug('Group state is not stable, disabling heartbeats')
self.disable()
return

if self.coordinator.coordinator_unknown():
elif self.coordinator.coordinator_unknown():
future = self.coordinator.lookup_coordinator()
if not future.is_done or future.failed():
# the immediate future check ensures that we backoff
# properly in the case that no brokers are available
# to connect to (and the future is automatically failed).
self.coordinator._client._lock.release()
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)

elif self.coordinator.heartbeat.session_timeout_expired():
Expand All @@ -1098,24 +1100,27 @@ def _run_once(self):
# foreground thread has stalled in between calls to
# poll(), so we explicitly leave the group.
heartbeat_log.warning('Heartbeat poll expired, leaving group')
### XXX
# maybe_leave_group acquires client + coordinator lock;
# if we hold coordinator lock before calling, we risk deadlock
# release() is safe here because this is the last code in the current context
self.coordinator._lock.release()
self.coordinator.maybe_leave_group()

elif not self.coordinator.heartbeat.should_heartbeat():
# poll again after waiting for the retry backoff in case
# the heartbeat failed or the coordinator disconnected
heartbeat_log.log(0, 'Not ready to heartbeat, waiting')
self.coordinator._client._lock.release()
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)

else:
self.coordinator.heartbeat.sent_heartbeat()
future = self.coordinator._send_heartbeat_request()
future.add_callback(self._handle_heartbeat_success)
future.add_errback(self._handle_heartbeat_failure)
finally:
self.coordinator._lock.release()
try:
# Possibly released in block above to allow coordinator lock wait()
self.coordinator._client._lock.release()
except RuntimeError:
pass

def _handle_heartbeat_success(self, result):
with self.coordinator._lock:
Expand Down
28 changes: 26 additions & 2 deletions test/integration/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ def consumer_thread(i):
for partition in range(num_partitions)])
logging.info('Assignment looks good!')

logging.info('Verifying heartbeats')
while True:
for c in range(num_consumers):
heartbeat = consumers[c]._coordinator.heartbeat
last_hb = time.time() - 0.5
if (heartbeat.heartbeat_failed or
heartbeat.last_receive < last_hb or
heartbeat.last_reset > last_hb):
time.sleep(0.1)
continue
else:
break
logging.info('Heartbeats look good')

finally:
logging.info('Shutting down %s consumers', num_consumers)
for c in range(num_consumers):
Expand Down Expand Up @@ -163,18 +177,28 @@ def test_heartbeat_thread(kafka_broker, topic):
heartbeat_interval_ms=500)

# poll until we have joined group / have assignment
start = time.time()
while not consumer.assignment():
consumer.poll(timeout_ms=100)

assert consumer._coordinator.state is MemberState.STABLE
last_poll = consumer._coordinator.heartbeat.last_poll
last_beat = consumer._coordinator.heartbeat.last_send

# wait until we receive first heartbeat
while consumer._coordinator.heartbeat.last_receive < start:
time.sleep(0.1)

last_send = consumer._coordinator.heartbeat.last_send
last_recv = consumer._coordinator.heartbeat.last_receive
assert last_poll > start
assert last_send > start
assert last_recv > start

timeout = time.time() + 30
while True:
if time.time() > timeout:
raise RuntimeError('timeout waiting for heartbeat')
if consumer._coordinator.heartbeat.last_send > last_beat:
if consumer._coordinator.heartbeat.last_receive > last_recv:
break
time.sleep(0.5)

Expand Down
Loading