Skip to content

Commit 2f95590

Browse files
authored
Wait for next heartbeat in thread loop; check for connected coordinator (#2622)
1 parent 8a424e9 commit 2f95590

File tree

2 files changed

+24
-15
lines changed

2 files changed

+24
-15
lines changed

kafka/coordinator/base.py

+23-15
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,11 @@ def coordinator(self):
250250
else:
251251
return self.coordinator_id
252252

253+
def connected(self):
254+
"""Return True iff the coordinator node is connected"""
255+
with self._lock:
256+
return self.coordinator_id is not None and self._client.connected(self.coordinator_id)
257+
253258
def ensure_coordinator_ready(self, timeout_ms=None):
254259
"""Block until the coordinator for this group is known.
255260
@@ -1058,28 +1063,28 @@ def _run_once(self):
10581063
self.coordinator._client._lock.acquire()
10591064
self.coordinator._lock.acquire()
10601065
try:
1061-
if self.enabled and self.coordinator.state is MemberState.STABLE:
1062-
# TODO: When consumer.wakeup() is implemented, we need to
1063-
# disable here to prevent propagating an exception to this
1064-
# heartbeat thread
1065-
# must get client._lock, or maybe deadlock at heartbeat
1066-
# failure callback in consumer poll
1067-
self.coordinator._client.poll(timeout_ms=0)
1068-
10691066
if not self.enabled:
10701067
heartbeat_log.debug('Heartbeat disabled. Waiting')
10711068
self.coordinator._client._lock.release()
10721069
self.coordinator._lock.wait()
1073-
heartbeat_log.debug('Heartbeat re-enabled.')
1070+
if self.enabled:
1071+
heartbeat_log.debug('Heartbeat re-enabled.')
1072+
return
10741073

1075-
elif self.coordinator.state is not MemberState.STABLE:
1074+
if self.coordinator.state is not MemberState.STABLE:
10761075
# the group is not stable (perhaps because we left the
10771076
# group or because the coordinator kicked us out), so
10781077
# disable heartbeats and wait for the main thread to rejoin.
10791078
heartbeat_log.debug('Group state is not stable, disabling heartbeats')
10801079
self.disable()
1080+
return
1081+
1082+
# TODO: When consumer.wakeup() is implemented, we need to
1083+
# disable here to prevent propagating an exception to this
1084+
# heartbeat thread
1085+
self.coordinator._client.poll(timeout_ms=0)
10811086

1082-
elif self.coordinator.coordinator_unknown():
1087+
if self.coordinator.coordinator_unknown():
10831088
future = self.coordinator.lookup_coordinator()
10841089
if not future.is_done or future.failed():
10851090
# the immediate future check ensures that we backoff
@@ -1088,6 +1093,10 @@ def _run_once(self):
10881093
self.coordinator._client._lock.release()
10891094
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
10901095

1096+
elif not self.coordinator.connected():
1097+
self.coordinator._client._lock.release()
1098+
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
1099+
10911100
elif self.coordinator.heartbeat.session_timeout_expired():
10921101
# the session timeout has expired without seeing a
10931102
# successful heartbeat, so we should probably make sure
@@ -1103,11 +1112,10 @@ def _run_once(self):
11031112
self.coordinator.maybe_leave_group()
11041113

11051114
elif not self.coordinator.heartbeat.should_heartbeat():
1106-
# poll again after waiting for the retry backoff in case
1107-
# the heartbeat failed or the coordinator disconnected
1108-
heartbeat_log.log(0, 'Not ready to heartbeat, waiting')
1115+
next_hb = self.coordinator.heartbeat.time_to_next_heartbeat()
1116+
heartbeat_log.debug('Waiting %0.1f secs to send next heartbeat', next_hb)
11091117
self.coordinator._client._lock.release()
1110-
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
1118+
self.coordinator._lock.wait(next_hb)
11111119

11121120
else:
11131121
self.coordinator.heartbeat.sent_heartbeat()

test/test_coordinator.py

+1
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,7 @@ def test_heartbeat(mocker, patched_coord):
658658
heartbeat.enable()
659659
patched_coord.state = MemberState.STABLE
660660
mocker.spy(patched_coord, '_send_heartbeat_request')
661+
mocker.patch.object(patched_coord, 'connected', return_value=True)
661662
mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True)
662663
heartbeat._run_once()
663664
assert patched_coord._send_heartbeat_request.call_count == 1

0 commit comments

Comments
 (0)