41
41
#include < atomic>
42
42
#include < condition_variable>
43
43
#include < utility>
44
+ #include < stdexcept>
44
45
#include " promise-cpp/promise.hpp"
45
46
46
47
@@ -55,11 +56,13 @@ class Service {
55
56
Tasks tasks_;
56
57
std::recursive_mutex mutex_;
57
58
std::condition_variable_any cond_;
58
- std::atomic<bool > isAutoExit_;
59
+ std::atomic<bool > isAutoStop_;
60
+ std::atomic<bool > isStop_;
59
61
60
62
public:
61
63
Service ()
62
- : isAutoExit_(true ) {
64
+ : isAutoStop_(true )
65
+ , isStop_(false ) {
63
66
}
64
67
65
68
// delay for milliseconds
@@ -92,9 +95,9 @@ class Service {
92
95
}
93
96
94
97
// Set if the io thread will auto exist if no waiting tasks and timers.
95
- void setAutoExit (bool isAutoExit) {
98
+ void setAutoStop (bool isAutoExit) {
96
99
std::lock_guard<std::recursive_mutex> lock (mutex_);
97
- isAutoExit_ = isAutoExit;
100
+ isAutoStop_ = isAutoExit;
98
101
cond_.notify_one ();
99
102
}
100
103
@@ -103,14 +106,14 @@ class Service {
103
106
void run () {
104
107
std::unique_lock<std::recursive_mutex> lock (mutex_);
105
108
106
- while (!isAutoExit_ || tasks_.size () > 0 || timers_.size () > 0 ){
109
+ while (!isStop_ && (!isAutoStop_ || tasks_.size () > 0 || timers_.size () > 0 )) {
107
110
108
111
if (tasks_.size () == 0 && timers_.size () == 0 ) {
109
112
cond_.wait (lock);
110
113
continue ;
111
114
}
112
115
113
- while (timers_.size () > 0 ) {
116
+ while (!isStop_ && timers_.size () > 0 ) {
114
117
TimePoint now = std::chrono::steady_clock::now ();
115
118
TimePoint time = timers_.begin ()->first ;
116
119
if (time <= now) {
@@ -128,7 +131,7 @@ class Service {
128
131
}
129
132
130
133
// Check fixed size of tasks in this loop, so that timer have a chance to run.
131
- if (tasks_.size () > 0 ) {
134
+ if (!isStop_ && tasks_.size () > 0 ) {
132
135
size_t size = tasks_.size ();
133
136
for (size_t i = 0 ; i < size; ++i){
134
137
Defer defer = tasks_.front ();
@@ -137,6 +140,27 @@ class Service {
137
140
}
138
141
}
139
142
}
143
+
144
+ // Clear pending timers and tasks
145
+ while (timers_.size () > 0 || tasks_.size ()) {
146
+ while (timers_.size () > 0 ) {
147
+ Defer defer = timers_.begin ()->second ;
148
+ timers_.erase (timers_.begin ());
149
+ defer.reject (std::runtime_error (" service stopped" ));
150
+ }
151
+ while (tasks_.size () > 0 ) {
152
+ Defer defer = tasks_.front ();
153
+ tasks_.pop_front ();
154
+ defer.reject (std::runtime_error (" service stopped" ));
155
+ }
156
+ }
157
+ }
158
+
159
+ // stop the service loop
160
+ void stop () {
161
+ std::lock_guard<std::recursive_mutex> lock (mutex_);
162
+ isStop_ = true ;
163
+ cond_.notify_one ();
140
164
}
141
165
};
142
166
0 commit comments