Skip to content

Commit 6fd91ee

Browse files
author
pryanikov
committed
some sugar
1 parent 26ce92b commit 6fd91ee

File tree

2 files changed

+42
-29
lines changed

2 files changed

+42
-29
lines changed

main.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ static void tpwriter_worker()
5353
const auto cb_fetch = std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1);
5454

5555
while (!is_term) {
56+
// for (unsigned cnt = queue.size(); cnt > 0; --cnt) {
57+
// tpwriter->BinlogEventCallback(queue.pop());
58+
// }
5659
queue.try_fetch(cb_fetch, timeout);
5760
tpwriter->Sync();
5861
tpwriter->RecvAll();

queue.h

+39-29
Original file line numberDiff line numberDiff line change
@@ -9,65 +9,75 @@
99

1010
template<typename T> class Queue
1111
{
12+
private:
13+
template<typename M> class ulock : public std::unique_lock<M> {
14+
std::condition_variable& cv;
15+
public:
16+
ulock(M& m, std::condition_variable& cv_) : std::unique_lock<M>(m), cv(cv_) {}
17+
void unlock() { std::unique_lock<M>::unlock(); cv.notify_all(); }
18+
};
19+
1220
public:
1321
Queue(const unsigned limit_) : limit(limit_) {}
1422

1523
inline T pop()
1624
{
17-
std::unique_lock<std::mutex> lock(mutex);
25+
// std::unique_lock<std::mutex> lock_(mutex);
26+
ulock<std::mutex> lock_(mutex, cv2);
1827

1928
if (queue.empty()) {
20-
cv1.wait(lock, [this] { return !queue.empty(); });
29+
cv1.wait(lock_, [this] { return !queue.empty(); });
2130
}
2231

2332
T item = queue.front();
2433
queue.pop_front();
25-
lock.unlock();
26-
cv2.notify_all();
34+
// lock_.unlock();
35+
// cv2.notify_all();
2736

2837
return item;
2938
}
3039

31-
void try_fetch(const std::function<void (T&)>& cb, const std::chrono::milliseconds timeout)
40+
inline void try_fetch(const std::function<void (T&)>& cb, const std::chrono::milliseconds timeout)
3241
{
33-
std::unique_lock<std::mutex> lock(mutex);
34-
35-
if (!queue.empty() || cv1.wait_for(lock, timeout, [this] { return !queue.empty(); })) {
36-
unsigned cnt = queue.size();
37-
do {
38-
T item = queue.front();
39-
queue.pop_front();
40-
lock.unlock();
41-
cv2.notify_all();
42-
43-
cb(item);
44-
45-
if (--cnt) {
46-
lock.lock();
47-
continue;
48-
}
49-
} while (false);
42+
// std::unique_lock<std::mutex> lock_(mutex);
43+
ulock<std::mutex> lock_(mutex, cv2);
44+
45+
if (queue.empty() && !cv1.wait_for(lock_, timeout, [this] { return !queue.empty(); })) {
46+
return;
5047
}
5148

52-
lock.unlock();
53-
cv2.notify_all();
49+
unsigned cnt = queue.size();
50+
do {
51+
T item = queue.front();
52+
queue.pop_front();
53+
lock_.unlock();
54+
//cv2.notify_all();
55+
56+
cb(item);
57+
58+
if (--cnt) {
59+
lock_.lock();
60+
continue;
61+
}
62+
} while (false);
5463
}
5564

5665
inline void push(const T& item)
5766
{
58-
std::unique_lock<std::mutex> lock(mutex);
67+
// std::unique_lock<std::mutex> lock_(mutex);
68+
ulock<std::mutex> lock_(mutex, cv1);
5969

6070
if (queue.size() >= limit) {
61-
cv2.wait(lock, [this] { return queue.size() < limit; });
71+
cv2.wait(lock_, [this] { return queue.size() < limit; });
6272
}
6373

6474
queue.push_back(item);
65-
lock.unlock();
66-
cv1.notify_one();
75+
// lock_.unlock();
76+
// cv1.notify_one();
6777
}
6878

6979
inline unsigned size() const {
70-
// std::lock_guard<std::mutex> lock(mutex);
80+
// std::lock_guard<std::mutex> lock_(mutex);
7181
return queue.size();
7282
}
7383

0 commit comments

Comments
 (0)