Skip to content

Commit a19a0d6

Browse files
committed
feat: Add before/after fork functions for executor fork safety
1 parent 35371cf commit a19a0d6

File tree

4 files changed

+53
-2
lines changed

4 files changed

+53
-2
lines changed

conanfile.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ def build(self):
7070
cmake = CMake(self)
7171
cmake.configure()
7272
cmake.build()
73-
if self.should_install and self.develop:
74-
cmake.install()
73+
cmake.install()
7574

7675
def package_info(self):
7776
self.cpp_info.libs = ["tconcurrent"]

include/tconcurrent/executor.hpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ class executor
5151
return _p->signal_error(e);
5252
}
5353

54+
void stop_before_fork()
55+
{
56+
return _p->stop_before_fork();
57+
}
58+
59+
void resume_after_fork()
60+
{
61+
return _p->resume_after_fork();
62+
}
63+
5464
explicit operator bool() const
5565
{
5666
return !!_p;
@@ -65,6 +75,8 @@ class executor
6575
virtual bool is_single_threaded() const = 0;
6676
virtual bool is_in_this_context() const = 0;
6777
virtual void signal_error(std::exception_ptr const& e) = 0;
78+
virtual void stop_before_fork() = 0;
79+
virtual void resume_after_fork() = 0;
6880
};
6981

7082
template <typename T>
@@ -100,6 +112,16 @@ class executor
100112
return _context.signal_error(e);
101113
}
102114

115+
void stop_before_fork() override
116+
{
117+
return _context.stop_before_fork();
118+
}
119+
120+
void resume_after_fork() override
121+
{
122+
return _context.resume_after_fork();
123+
}
124+
103125
private:
104126
T& _context;
105127
};

include/tconcurrent/thread_pool.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ class TCONCURRENT_EXPORT thread_pool
4343
void start(unsigned int thread_count);
4444
void stop(bool cancel_work = false);
4545

46+
void stop_before_fork();
47+
void resume_after_fork();
48+
4649
bool is_running() const;
4750

4851
bool is_in_this_context() const;

src/thread_pool.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ struct thread_pool::impl
2525
std::atomic<unsigned> _num_running_threads{0};
2626
std::atomic<bool> _dead{false};
2727

28+
// We need to be fork-safe, which means stopping all our threads before
29+
// a fork and restoring them after, so that they restart from a clean state
30+
std::atomic<unsigned> _num_threads_before_fork{0};
31+
2832
error_handler_cb _error_cb{detail::default_error_cb};
2933
task_trace_handler_cb _task_trace_handler;
3034
};
@@ -144,6 +148,29 @@ void thread_pool::stop(bool cancel_work)
144148
}
145149
}
146150

151+
void thread_pool::stop_before_fork()
152+
{
153+
assert(!_p->_num_threads_before_fork);
154+
_p->_num_threads_before_fork.store(_p->_num_running_threads);
155+
156+
// Note that this can't _p.release(), that only happens on Windows during
157+
// process exit, so _p will still be valid in resume_after_fork
158+
stop(true);
159+
}
160+
161+
void thread_pool::resume_after_fork()
162+
{
163+
unsigned num_threads = _p->_num_threads_before_fork.load();
164+
auto error_cb = std::move(_p->_error_cb);
165+
auto task_trace_handler_cb = std::move(_p->_task_trace_handler);
166+
167+
// Fresh start
168+
_p.reset(new impl);
169+
_p->_error_cb = std::move(error_cb);
170+
_p->_task_trace_handler = std::move(task_trace_handler_cb);
171+
this->start(num_threads);
172+
}
173+
147174
bool thread_pool::is_running() const
148175
{
149176
return _p->_work != std::nullopt;

0 commit comments

Comments
 (0)