Skip to content

Commit 8060091

Browse files
author
pryanikov
committed
no sugar, it's harmful & don't copy, use pointers
1 parent 6fd91ee commit 8060091

File tree

8 files changed

+90
-110
lines changed

8 files changed

+90
-110
lines changed

build.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ git submodule update --init --recursive
22
git submodule update --init --recursive
33
cmake -DCMAKE_BUILD_TYPE=Release -DYAML_CPP_BUILD_TOOLS=OFF -DYAML_CPP_BUILD_CONTRIB=OFF -DTARANTOOL_C_EMBEDDED=1 .
44
make
5-
strip -s replicatord
5+
#strip -s replicatord

dbreader.cpp

+38-38
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,13 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
7777

7878
// send binlog position update event
7979
if (!stopped) {
80-
SerializableBinlogEvent ev;
81-
ev.binlog_name = binlog_name;
82-
ev.binlog_pos = binlog_pos;
83-
// ev.seconds_behind_master = GetSecondsBehindMaster();
84-
ev.unix_timestamp = long(time(NULL));
85-
ev.event = "IGNORE";
86-
stopped = cb(ev);
80+
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
81+
ev->binlog_name = binlog_name;
82+
ev->binlog_pos = binlog_pos;
83+
// ev->seconds_behind_master = GetSecondsBehindMaster();
84+
ev->unix_timestamp = long(time(NULL));
85+
ev->event = "IGNORE";
86+
stopped = cb(std::move(ev));
8787
}
8888

8989
//tempslave.close_connection();
@@ -118,41 +118,41 @@ void DBReader::EventCallback(const slave::RecordSet& event, const std::map<std::
118118
{
119119
last_event_when = event.when;
120120

121-
SerializableBinlogEvent ev;
122-
ev.binlog_name = state.getMasterLogName();
123-
ev.binlog_pos = state.getMasterLogPos();
124-
// ev.seconds_behind_master = GetSecondsBehindMaster();
125-
ev.unix_timestamp = long(time(NULL));
126-
ev.database = event.db_name;
127-
ev.table = event.tbl_name;
121+
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
122+
ev->binlog_name = state.getMasterLogName();
123+
ev->binlog_pos = state.getMasterLogPos();
124+
// ev->seconds_behind_master = GetSecondsBehindMaster();
125+
ev->unix_timestamp = long(time(NULL));
126+
ev->database = event.db_name;
127+
ev->table = event.tbl_name;
128128

129129
switch (event.type_event) {
130-
case slave::RecordSet::Delete: ev.event = "DELETE"; break;
131-
case slave::RecordSet::Update: ev.event = "UPDATE"; break;
132-
case slave::RecordSet::Write: ev.event = "INSERT"; break;
133-
default: ev.event = "IGNORE";
130+
case slave::RecordSet::Delete: ev->event = "DELETE"; break;
131+
case slave::RecordSet::Update: ev->event = "UPDATE"; break;
132+
case slave::RecordSet::Write: ev->event = "INSERT"; break;
133+
default: ev->event = "IGNORE";
134134
}
135135
for (auto fi = filter.begin(), end = filter.end(); fi != end; ++fi) {
136136
const auto ri = event.m_row.find(fi->first);
137137
if (ri != event.m_row.end()) {
138-
ev.row[ fi->second ] = ri->second;
138+
ev->row[ fi->second ] = ri->second;
139139
}
140140
}
141-
stopped = cb(ev);
141+
stopped = cb(std::move(ev));
142142
}
143143

144144
void DBReader::XidEventCallback(unsigned int server_id, BinlogEventCallback cb)
145145
{
146146
last_event_when = ::time(NULL);
147147

148148
// send binlog position update event
149-
SerializableBinlogEvent ev;
150-
ev.binlog_name = state.getMasterLogName();
151-
ev.binlog_pos = state.getMasterLogPos();
152-
// ev.seconds_behind_master = GetSecondsBehindMaster();
153-
ev.unix_timestamp = long(time(NULL));
154-
ev.event = "IGNORE";
155-
stopped = cb(ev);
149+
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
150+
ev->binlog_name = state.getMasterLogName();
151+
ev->binlog_pos = state.getMasterLogPos();
152+
// ev->seconds_behind_master = GetSecondsBehindMaster();
153+
ev->unix_timestamp = long(time(NULL));
154+
ev->event = "IGNORE";
155+
stopped = cb(std::move(ev));
156156
}
157157

158158
void DBReader::DumpTablesCallback(
@@ -162,27 +162,27 @@ void DBReader::DumpTablesCallback(
162162
const nanomysql::fields_t& fields,
163163
BinlogEventCallback cb
164164
) {
165-
SerializableBinlogEvent ev;
166-
ev.binlog_name = "";
167-
ev.binlog_pos = 0;
168-
ev.database = db_name;
169-
ev.table = tbl_name;
170-
ev.event = "INSERT";
171-
// ev.seconds_behind_master = GetSecondsBehindMaster();
172-
ev.unix_timestamp = long(time(NULL));
165+
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
166+
ev->binlog_name = "";
167+
ev->binlog_pos = 0;
168+
ev->database = db_name;
169+
ev->table = tbl_name;
170+
ev->event = "INSERT";
171+
// ev->seconds_behind_master = GetSecondsBehindMaster();
172+
ev->unix_timestamp = long(time(NULL));
173173

174174
for (const auto& it : filter) {
175175
slave::PtrField ptr_field = it.second;
176176
const auto& field = fields.at(ptr_field->field_name);
177177
if (field.is_null) {
178-
ev.row[ it.first ] = boost::any();
178+
ev->row[ it.first ] = boost::any();
179179
} else {
180180
ptr_field->unpack_str(field.data);
181-
ev.row[ it.first ] = ptr_field->field_data;
181+
ev->row[ it.first ] = ptr_field->field_data;
182182
}
183183
}
184184
if (!stopped) {
185-
stopped = cb(ev);
185+
stopped = cb(std::move(ev));
186186
}
187187
}
188188

dbreader.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
namespace replicator {
1616

17-
typedef std::function<bool (const SerializableBinlogEvent &ev)> BinlogEventCallback;
17+
typedef std::function<bool (SerializableBinlogEventPtr&&)> BinlogEventCallback;
1818

1919
struct DBTable
2020
{

main.cpp

+6-9
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ static DBReader *dbreader = NULL;
3434

3535
static void sigint_handler(int sig);
3636

37-
static Queue<SerializableBinlogEvent> queue(50);
37+
static Queue<SerializableBinlogEventPtr> queue(50);
3838

3939
// ===============
4040

@@ -48,15 +48,12 @@ static void tpwriter_worker()
4848
try {
4949
tpwriter->ReadBinlogPos(binlog_name, binlog_pos);
5050
reset = true;
51-
5251
const std::chrono::milliseconds timeout(1000);
53-
const auto cb_fetch = std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1);
5452

5553
while (!is_term) {
56-
// for (unsigned cnt = queue.size(); cnt > 0; --cnt) {
57-
// tpwriter->BinlogEventCallback(queue.pop());
58-
// }
59-
queue.try_fetch(cb_fetch, timeout);
54+
for (unsigned cnt = queue.wait(timeout); cnt > 0; --cnt) {
55+
tpwriter->BinlogEventCallback(queue.pop());
56+
}
6057
tpwriter->Sync();
6158
tpwriter->RecvAll();
6259
}
@@ -78,13 +75,13 @@ static void tpwriter_worker()
7875

7976
// ====================
8077

81-
static bool dbread_callback(const SerializableBinlogEvent &ev)
78+
static bool dbread_callback(SerializableBinlogEventPtr&& ev)
8279
{
8380
if (is_term || reset) {
8481
return true;
8582
}
8683

87-
queue.push(ev);
84+
queue.push(std::forward<SerializableBinlogEventPtr>(ev));
8885
return false;
8986
}
9087

queue.h

+24-44
Original file line numberDiff line numberDiff line change
@@ -9,83 +9,63 @@
99

1010
template<typename T> class Queue
1111
{
12-
private:
13-
template<typename M> class ulock : public std::unique_lock<M> {
14-
std::condition_variable& cv;
15-
public:
16-
ulock(M& m, std::condition_variable& cv_) : std::unique_lock<M>(m), cv(cv_) {}
17-
void unlock() { std::unique_lock<M>::unlock(); cv.notify_all(); }
18-
};
19-
2012
public:
2113
Queue(const unsigned limit_) : limit(limit_) {}
2214

2315
inline T pop()
2416
{
25-
// std::unique_lock<std::mutex> lock_(mutex);
26-
ulock<std::mutex> lock_(mutex, cv2);
17+
std::unique_lock<std::mutex> lock(mutex);
2718

2819
if (queue.empty()) {
29-
cv1.wait(lock_, [this] { return !queue.empty(); });
20+
cv1.wait(lock, [this] { return !queue.empty(); });
3021
}
3122

32-
T item = queue.front();
23+
T item = std::move(queue.front());
3324
queue.pop_front();
34-
// lock_.unlock();
35-
// cv2.notify_all();
25+
26+
lock.unlock();
27+
cv2.notify_all();
3628

3729
return item;
3830
}
3931

40-
inline void try_fetch(const std::function<void (T&)>& cb, const std::chrono::milliseconds timeout)
32+
inline void push(T&& item)
4133
{
42-
// std::unique_lock<std::mutex> lock_(mutex);
43-
ulock<std::mutex> lock_(mutex, cv2);
34+
std::unique_lock<std::mutex> lock(mutex);
4435

45-
if (queue.empty() && !cv1.wait_for(lock_, timeout, [this] { return !queue.empty(); })) {
46-
return;
36+
if (queue.size() >= limit) {
37+
cv2.wait(lock, [this] { return queue.size() < limit; });
4738
}
4839

49-
unsigned cnt = queue.size();
50-
do {
51-
T item = queue.front();
52-
queue.pop_front();
53-
lock_.unlock();
54-
//cv2.notify_all();
40+
queue.push_back(std::forward<T>(item));
5541

56-
cb(item);
57-
58-
if (--cnt) {
59-
lock_.lock();
60-
continue;
61-
}
62-
} while (false);
42+
lock.unlock();
43+
cv1.notify_one();
6344
}
6445

65-
inline void push(const T& item)
46+
inline unsigned wait(const std::chrono::milliseconds timeout) const
6647
{
67-
// std::unique_lock<std::mutex> lock_(mutex);
68-
ulock<std::mutex> lock_(mutex, cv1);
48+
std::unique_lock<std::mutex> lock(mutex);
6949

70-
if (queue.size() >= limit) {
71-
cv2.wait(lock_, [this] { return queue.size() < limit; });
72-
}
50+
if (!queue.empty())
51+
return queue.size();
52+
53+
if (cv1.wait_for(lock, timeout, [this] { return !queue.empty(); }))
54+
return queue.size();
7355

74-
queue.push_back(item);
75-
// lock_.unlock();
76-
// cv1.notify_one();
56+
return 0;
7757
}
7858

7959
inline unsigned size() const {
80-
// std::lock_guard<std::mutex> lock_(mutex);
60+
// std::lock_guard<std::mutex> lock(mutex);
8161
return queue.size();
8262
}
8363

8464
private:
8565
std::deque<T> queue;
8666
mutable std::mutex mutex;
87-
std::condition_variable cv1;
88-
std::condition_variable cv2;
67+
mutable std::condition_variable cv1;
68+
mutable std::condition_variable cv2;
8969
const unsigned limit;
9070
};
9171

serializable.h

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <string>
55
#include <map>
66
#include <sstream>
7+
#include <memory>
78
#include <boost/any.hpp>
89

910
namespace replicator {
@@ -82,6 +83,8 @@ struct SerializableBinlogEvent
8283
std::map<unsigned, SerializableValue> row;
8384
};
8485

86+
typedef std::unique_ptr<SerializableBinlogEvent> SerializableBinlogEventPtr;
87+
8588
} // replicator
8689

8790
#endif // REPLICATOR_SERIALIZABLE_H

0 commit comments

Comments
 (0)