Skip to content

Commit 5a6c9e0

Browse files
committed
Fix "disconnected: write(m_post_fd, &buffer, 1): Broken pipe" EventLoop shutdown races.
The EventLoop shutdown sequence has race conditions that could cause it to shut down right before a removeClient write(m_post_fd, ...) call is about to happen, if thread run in an unexpected order and cause the write to fail. Cases where this can happen are described in bitcoin/bitcoin#31151 (comment) and the possible causes are that (1) m_mutex is not used to protect some EventLoop member variables that are accessed from multiple threads, and (2) the removeClient method can do unnecessary write(m_post_fd, ...) calls before the loop is supposed to exit because it is not checking m_async_fns.empty(), and these multiple write calls can make the event loop exit early and cause the last write() call to fail. PR should fix both these issues. Fixes bitcoin/bitcoin#31151
1 parent 621a04a commit 5a6c9e0

File tree

2 files changed

+26
-19
lines changed

2 files changed

+26
-19
lines changed

include/mp/proxy-io.h

+2
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ class EventLoop
165165
//! Add/remove remote client reference counts.
166166
void addClient(std::unique_lock<std::mutex>& lock);
167167
void removeClient(std::unique_lock<std::mutex>& lock);
168+
//! Check if loop should exit.
169+
bool done(std::unique_lock<std::mutex>& lock);
168170

169171
Logger log()
170172
{

src/mp/proxy.cpp

+24-19
Original file line numberDiff line numberDiff line change
@@ -188,29 +188,27 @@ void EventLoop::loop()
188188

189189
kj::Own<kj::AsyncIoStream> wait_stream{
190190
m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
191+
int post_fd{m_post_fd};
191192
char buffer = 0;
192193
for (;;) {
193194
size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
194-
if (read_bytes == 1) {
195-
std::unique_lock<std::mutex> lock(m_mutex);
196-
if (m_post_fn) {
197-
Unlock(lock, *m_post_fn);
198-
m_post_fn = nullptr;
199-
}
200-
} else {
201-
throw std::logic_error("EventLoop wait_stream closed unexpectedly");
202-
}
203-
m_cv.notify_all();
204-
if (m_num_clients == 0 && m_async_fns.empty()) {
205-
log() << "EventLoop::loop done, cancelling event listeners.";
206-
m_task_set.reset();
207-
log() << "EventLoop::loop bye.";
195+
if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
196+
std::unique_lock<std::mutex> lock(m_mutex);
197+
if (m_post_fn) {
198+
Unlock(lock, *m_post_fn);
199+
m_post_fn = nullptr;
200+
m_cv.notify_all();
201+
} else if (done(lock)) {
208202
break;
209203
}
210204
}
205+
log() << "EventLoop::loop done, cancelling event listeners.";
206+
m_task_set.reset();
207+
log() << "EventLoop::loop bye.";
211208
wait_stream = nullptr;
209+
KJ_SYSCALL(::close(post_fd));
210+
std::unique_lock<std::mutex> lock(m_mutex);
212211
m_wait_fd = -1;
213-
KJ_SYSCALL(::close(m_post_fd));
214212
m_post_fd = -1;
215213
}
216214

@@ -222,9 +220,10 @@ void EventLoop::post(const std::function<void()>& fn)
222220
std::unique_lock<std::mutex> lock(m_mutex);
223221
m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
224222
m_post_fn = &fn;
223+
int post_fd{m_post_fd};
225224
Unlock(lock, [&] {
226225
char buffer = 0;
227-
KJ_SYSCALL(write(m_post_fd, &buffer, 1));
226+
KJ_SYSCALL(write(post_fd, &buffer, 1));
228227
});
229228
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
230229
}
@@ -233,13 +232,13 @@ void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients +=
233232

234233
void EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
235234
{
236-
assert(m_num_clients > 0);
237235
m_num_clients -= 1;
238-
if (m_num_clients == 0) {
236+
if (done(lock)) {
239237
m_cv.notify_all();
238+
int post_fd{m_post_fd};
240239
Unlock(lock, [&] {
241240
char buffer = 0;
242-
KJ_SYSCALL(write(m_post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
241+
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
243242
});
244243
}
245244
}
@@ -268,6 +267,12 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
268267
}
269268
}
270269

270+
bool EventLoop::done(std::unique_lock<std::mutex>& lock)
271+
{
272+
assert(m_num_clients >= 0);
273+
return m_num_clients == 0 && m_async_fns.empty();
274+
}
275+
271276
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, std::function<Thread::Client()> make_thread)
272277
{
273278
std::unique_lock<std::mutex> lock(mutex);

0 commit comments

Comments
 (0)