Skip to content

Commit 8a424e9

Browse files
authored
Acquire client lock in heartbeat thread before sending requests (#2620)
1 parent 00a5e6c commit 8a424e9

File tree

2 files changed

+43
-14
lines changed

2 files changed

+43
-14
lines changed

kafka/coordinator/base.py

+17-12
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ def _reset_find_coordinator_future(self, result):
309309
self._find_coordinator_future = None
310310

311311
def lookup_coordinator(self):
312-
with self._lock:
312+
with self._client._lock, self._lock:
313313
if self._find_coordinator_future is not None:
314314
return self._find_coordinator_future
315315

@@ -883,6 +883,7 @@ def _handle_leave_group_response(self, response):
883883

884884
def _send_heartbeat_request(self):
885885
"""Send a heartbeat request"""
886+
# Note: acquire both client + coordinator lock before calling
886887
if self.coordinator_unknown():
887888
e = Errors.CoordinatorNotAvailableError(self.coordinator_id)
888889
return Future().failure(e)
@@ -1054,7 +1055,9 @@ def run(self):
10541055
heartbeat_log.debug('Heartbeat thread closed')
10551056

10561057
def _run_once(self):
1057-
with self.coordinator._client._lock, self.coordinator._lock:
1058+
self.coordinator._client._lock.acquire()
1059+
self.coordinator._lock.acquire()
1060+
try:
10581061
if self.enabled and self.coordinator.state is MemberState.STABLE:
10591062
# TODO: When consumer.wakeup() is implemented, we need to
10601063
# disable here to prevent propagating an exception to this
@@ -1063,27 +1066,26 @@ def _run_once(self):
10631066
# failure callback in consumer poll
10641067
self.coordinator._client.poll(timeout_ms=0)
10651068

1066-
with self.coordinator._lock:
10671069
if not self.enabled:
10681070
heartbeat_log.debug('Heartbeat disabled. Waiting')
1071+
self.coordinator._client._lock.release()
10691072
self.coordinator._lock.wait()
10701073
heartbeat_log.debug('Heartbeat re-enabled.')
1071-
return
10721074

1073-
if self.coordinator.state is not MemberState.STABLE:
1075+
elif self.coordinator.state is not MemberState.STABLE:
10741076
# the group is not stable (perhaps because we left the
10751077
# group or because the coordinator kicked us out), so
10761078
# disable heartbeats and wait for the main thread to rejoin.
10771079
heartbeat_log.debug('Group state is not stable, disabling heartbeats')
10781080
self.disable()
1079-
return
10801081

1081-
if self.coordinator.coordinator_unknown():
1082+
elif self.coordinator.coordinator_unknown():
10821083
future = self.coordinator.lookup_coordinator()
10831084
if not future.is_done or future.failed():
10841085
# the immediate future check ensures that we backoff
10851086
# properly in the case that no brokers are available
10861087
# to connect to (and the future is automatically failed).
1088+
self.coordinator._client._lock.release()
10871089
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
10881090

10891091
elif self.coordinator.heartbeat.session_timeout_expired():
@@ -1098,24 +1100,27 @@ def _run_once(self):
10981100
# foreground thread has stalled in between calls to
10991101
# poll(), so we explicitly leave the group.
11001102
heartbeat_log.warning('Heartbeat poll expired, leaving group')
1101-
### XXX
1102-
# maybe_leave_group acquires client + coordinator lock;
1103-
# if we hold coordinator lock before calling, we risk deadlock
1104-
# release() is safe here because this is the last code in the current context
1105-
self.coordinator._lock.release()
11061103
self.coordinator.maybe_leave_group()
11071104

11081105
elif not self.coordinator.heartbeat.should_heartbeat():
11091106
# poll again after waiting for the retry backoff in case
11101107
# the heartbeat failed or the coordinator disconnected
11111108
heartbeat_log.log(0, 'Not ready to heartbeat, waiting')
1109+
self.coordinator._client._lock.release()
11121110
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
11131111

11141112
else:
11151113
self.coordinator.heartbeat.sent_heartbeat()
11161114
future = self.coordinator._send_heartbeat_request()
11171115
future.add_callback(self._handle_heartbeat_success)
11181116
future.add_errback(self._handle_heartbeat_failure)
1117+
finally:
1118+
self.coordinator._lock.release()
1119+
try:
1120+
# Possibly released in block above to allow coordinator lock wait()
1121+
self.coordinator._client._lock.release()
1122+
except RuntimeError:
1123+
pass
11191124

11201125
def _handle_heartbeat_success(self, result):
11211126
with self.coordinator._lock:

test/integration/test_consumer_group.py

+26-2
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ def consumer_thread(i):
125125
for partition in range(num_partitions)])
126126
logging.info('Assignment looks good!')
127127

128+
logging.info('Verifying heartbeats')
129+
while True:
130+
for c in range(num_consumers):
131+
heartbeat = consumers[c]._coordinator.heartbeat
132+
last_hb = time.time() - 0.5
133+
if (heartbeat.heartbeat_failed or
134+
heartbeat.last_receive < last_hb or
135+
heartbeat.last_reset > last_hb):
136+
time.sleep(0.1)
137+
continue
138+
else:
139+
break
140+
logging.info('Heartbeats look good')
141+
128142
finally:
129143
logging.info('Shutting down %s consumers', num_consumers)
130144
for c in range(num_consumers):
@@ -163,18 +177,28 @@ def test_heartbeat_thread(kafka_broker, topic):
163177
heartbeat_interval_ms=500)
164178

165179
# poll until we have joined group / have assignment
180+
start = time.time()
166181
while not consumer.assignment():
167182
consumer.poll(timeout_ms=100)
168183

169184
assert consumer._coordinator.state is MemberState.STABLE
170185
last_poll = consumer._coordinator.heartbeat.last_poll
171-
last_beat = consumer._coordinator.heartbeat.last_send
186+
187+
# wait until we receive first heartbeat
188+
while consumer._coordinator.heartbeat.last_receive < start:
189+
time.sleep(0.1)
190+
191+
last_send = consumer._coordinator.heartbeat.last_send
192+
last_recv = consumer._coordinator.heartbeat.last_receive
193+
assert last_poll > start
194+
assert last_send > start
195+
assert last_recv > start
172196

173197
timeout = time.time() + 30
174198
while True:
175199
if time.time() > timeout:
176200
raise RuntimeError('timeout waiting for heartbeat')
177-
if consumer._coordinator.heartbeat.last_send > last_beat:
201+
if consumer._coordinator.heartbeat.last_receive > last_recv:
178202
break
179203
time.sleep(0.5)
180204

0 commit comments

Comments
 (0)