Skip to content

Commit 72f6669

Browse files
committed
Merge #129: Fix "disconnected: write(m_post_fd, &buffer, 1): Broken pipe" EventLoop shutdown races.
0e4f88d Fix "disconnected: write(m_post_fd, &buffer, 1): Broken pipe" EventLoop shutdown races. (Ryan Ofsky) Pull request description: The EventLoop shutdown sequence has race conditions that could cause it to shut down right before a `removeClient` `write(m_post_fd, ...)` call is about to happen, if threads run in an unexpected order, and cause the write to fail. Cases where this can happen are described in bitcoin/bitcoin#31151 (comment) and the possible causes are that (1) `EventLoop::m_mutex` is not used to protect some EventLoop member variables that are accessed from multiple threads, particularly (`m_num_clients` and `m_async_fns`) and (2) the `removeClient` method can do unnecessary `write(m_post_fd, ...)` calls before the loop is supposed to exit because it is not checking the `m_async_fns.empty()` condition, and these multiple write calls can make the event loop exit early and cause the final `write()` call to fail. In practice, only the second cause seems to actually trigger this bug, but PR fixes both possible causes. Fixes bitcoin/bitcoin#31151 Top commit has no ACKs. Tree-SHA512: 90fef8965d21ce80ca1dd4cb5a9cf39746a741685f1f4b62baf30b54d05e5eb658a4ef429ad77bd72abfef99ceb48634108f2fc3eb71ecabd3c7ffe53367f435
2 parents 3b2617b + 0e4f88d commit 72f6669

File tree

2 files changed

+32
-19
lines changed

2 files changed

+32
-19
lines changed

include/mp/proxy-io.h

+2
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ class EventLoop
165165
//! Add/remove remote client reference counts.
166166
void addClient(std::unique_lock<std::mutex>& lock);
167167
void removeClient(std::unique_lock<std::mutex>& lock);
168+
//! Check if loop should exit.
169+
bool done(std::unique_lock<std::mutex>& lock);
168170

169171
Logger log()
170172
{

src/mp/proxy.cpp

+30-19
Original file line numberDiff line numberDiff line change
@@ -188,29 +188,31 @@ void EventLoop::loop()
188188

189189
kj::Own<kj::AsyncIoStream> wait_stream{
190190
m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
191+
int post_fd{m_post_fd};
191192
char buffer = 0;
192193
for (;;) {
193194
size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
194-
if (read_bytes == 1) {
195-
std::unique_lock<std::mutex> lock(m_mutex);
196-
if (m_post_fn) {
197-
Unlock(lock, *m_post_fn);
198-
m_post_fn = nullptr;
199-
}
200-
} else {
201-
throw std::logic_error("EventLoop wait_stream closed unexpectedly");
202-
}
203-
m_cv.notify_all();
204-
if (m_num_clients == 0 && m_async_fns.empty()) {
205-
log() << "EventLoop::loop done, cancelling event listeners.";
206-
m_task_set.reset();
207-
log() << "EventLoop::loop bye.";
195+
if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
196+
std::unique_lock<std::mutex> lock(m_mutex);
197+
if (m_post_fn) {
198+
Unlock(lock, *m_post_fn);
199+
m_post_fn = nullptr;
200+
m_cv.notify_all();
201+
} else if (done(lock)) {
202+
// Intentionally do not break if m_post_fn was set, even if done()
203+
// would return true, to ensure that the removeClient write(post_fd)
204+
// call always succeeds and the loop does not exit between the time
205+
// that the done condition is set and the write call is made.
208206
break;
209207
}
210208
}
209+
log() << "EventLoop::loop done, cancelling event listeners.";
210+
m_task_set.reset();
211+
log() << "EventLoop::loop bye.";
211212
wait_stream = nullptr;
213+
KJ_SYSCALL(::close(post_fd));
214+
std::unique_lock<std::mutex> lock(m_mutex);
212215
m_wait_fd = -1;
213-
KJ_SYSCALL(::close(m_post_fd));
214216
m_post_fd = -1;
215217
}
216218

@@ -222,9 +224,10 @@ void EventLoop::post(const std::function<void()>& fn)
222224
std::unique_lock<std::mutex> lock(m_mutex);
223225
m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
224226
m_post_fn = &fn;
227+
int post_fd{m_post_fd};
225228
Unlock(lock, [&] {
226229
char buffer = 0;
227-
KJ_SYSCALL(write(m_post_fd, &buffer, 1));
230+
KJ_SYSCALL(write(post_fd, &buffer, 1));
228231
});
229232
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
230233
}
@@ -233,13 +236,13 @@ void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients +=
233236

234237
void EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
235238
{
236-
assert(m_num_clients > 0);
237239
m_num_clients -= 1;
238-
if (m_num_clients == 0) {
240+
if (done(lock)) {
239241
m_cv.notify_all();
242+
int post_fd{m_post_fd};
240243
Unlock(lock, [&] {
241244
char buffer = 0;
242-
KJ_SYSCALL(write(m_post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
245+
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
243246
});
244247
}
245248
}
@@ -268,6 +271,14 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
268271
}
269272
}
270273

274+
bool EventLoop::done(std::unique_lock<std::mutex>& lock)
275+
{
276+
assert(m_num_clients >= 0);
277+
assert(lock.owns_lock());
278+
assert(lock.mutex() == &m_mutex);
279+
return m_num_clients == 0 && m_async_fns.empty();
280+
}
281+
271282
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, std::function<Thread::Client()> make_thread)
272283
{
273284
std::unique_lock<std::mutex> lock(mutex);

0 commit comments

Comments
 (0)