Skip to content

Commit b2ac0d7

Browse files
committed
added pool instant destruction flag and feedback methods
Destroying without waiting may be wished too. Feedback about workers number and queue size allows more user control like avoiding out-of-memory or creating tasks more fitting to the number of concurrent executions.
1 parent 51737c3 commit b2ac0d7

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

ThreadPool.hpp

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ freely, subject to the following restrictions:
4343
class ThreadPool {
4444
public:
4545
// the constructor just launches some amount of workers
46-
ThreadPool(size_t threads_n = std::thread::hardware_concurrency()) : stop(false)
46+
ThreadPool(size_t threads_n = std::thread::hardware_concurrency(), const bool terminate = false) : stop(false), terminate(terminate)
4747
{
4848
if(!threads_n)
4949
throw std::invalid_argument("more than zero threads expected");
@@ -77,6 +77,15 @@ class ThreadPool {
7777
ThreadPool& operator=(const ThreadPool&) = delete;
7878
ThreadPool(ThreadPool&&) = delete;
7979
ThreadPool& operator=(ThreadPool&&) = delete;
80+
// number of workers
81+
size_t size() const{
82+
return this->workers.size();
83+
}
84+
// current number of tasks in queue
85+
size_t pending(){
86+
std::unique_lock<std::mutex> lock(this->queue_mutex);
87+
return this->tasks.size();
88+
}
8089
// add new work item to the pool
8190
template<class F, class... Args>
8291
std::future<typename std::result_of<F(Args...)>::type> enqueue(F&& f, Args&&... args)
@@ -94,13 +103,15 @@ class ThreadPool {
94103
this->condition.notify_one();
95104
return res;
96105
}
97-
// the destructor joins all threads
106+
// the destructor joins or terminates all threads
98107
virtual ~ThreadPool()
99108
{
100-
this->stop = true;
101-
this->condition.notify_all();
102-
for(std::thread& worker : this->workers)
103-
worker.join();
109+
if(!this->terminate){
110+
this->stop = true;
111+
this->condition.notify_all();
112+
for(std::thread& worker : this->workers)
113+
worker.join();
114+
}
104115
}
105116
private:
106117
// need to keep track of threads so we can join them
@@ -113,6 +124,9 @@ class ThreadPool {
113124
std::condition_variable condition;
114125
// workers finalization flag
115126
std::atomic_bool stop;
127+
128+
// immediate thread termination flag
129+
const bool terminate;
116130
};
117131

118132
#endif // THREAD_POOL_HPP

example.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
#include <iostream>
22
#include <vector>
33
#include <chrono>
4+
#include <sstream>
45

56
#include "ThreadPool.hpp"
67

78
int main()
89
{
910

10-
ThreadPool pool(4);
11+
ThreadPool pool;
1112
std::vector< std::future<int> > results;
13+
std::ostringstream buf;
14+
15+
std::cout << "Workers: " << pool.size() << std::endl;
1216

1317
for(int i = 0; i < 8; ++i) {
1418
results.emplace_back(
@@ -21,6 +25,9 @@ int main()
2125
);
2226
}
2327

28+
buf << '<' << pool.pending() << '>';
29+
std::cout << buf.str();
30+
2431
for(auto && result: results)
2532
std::cout << result.get() << ' ';
2633
std::cout << std::endl;

0 commit comments

Comments
 (0)