@@ -6,23 +6,9 @@ using std::placeholders::_1;
6
6
7
7
namespace replicator {
8
8
9
- DBReader::DBReader (nanomysql::mysql_conn_opts &opts, unsigned connect_retry)
10
- : state(), slave(slave::MasterInfo(opts, connect_retry), state), stopped(false ), last_event_when(0 ) {}
11
-
12
- DBReader::~DBReader ()
13
- {
14
- slave.close_connection ();
15
- }
16
-
17
- void DBReader::AddTable (const std::string& db, const std::string& table, std::map<std::string, unsigned >& filter, bool do_dump)
18
- {
19
- tables.emplace_back (db, table, filter, do_dump);
20
- }
21
-
22
- void DBReader::DumpTables (std::string &binlog_name, unsigned long &binlog_pos, BinlogEventCallback cb)
9
+ void DBReader::DumpTables (std::string& binlog_name, unsigned long & binlog_pos, const BinlogEventCallback& cb)
23
10
{
24
- slave::callback dummycallback = std::bind (&DBReader::DummyEventCallback, this , _1);
25
-
11
+ const static slave::callback dummycallback = [] (const slave::RecordSet& event) {};
26
12
// start temp slave to read DB structure
27
13
slave::Slave tempslave (slave.masterInfo (), state);
28
14
for (auto i = tables.begin (), end = tables.end (); i != end; ++i) {
@@ -32,7 +18,7 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
32
18
tempslave.init ();
33
19
tempslave.createDatabaseStructure ();
34
20
35
- last_event_when = ::time (NULL );
21
+ // last_event_when = ::time(NULL);
36
22
37
23
slave::Slave::binlog_pos_t bp = tempslave.getLastBinlog ();
38
24
binlog_name = bp.first ;
@@ -42,12 +28,9 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
42
28
43
29
// dump tables
44
30
nanomysql::Connection conn (slave.masterInfo ().conn_options );
45
-
46
31
conn.query (" SET NAMES utf8" );
47
32
48
33
for (auto table = tables.begin (), end = tables.end (); !stopped && table != end; ++table) {
49
- if (!table->do_dump ) continue ;
50
-
51
34
// build field_name -> field_ptr map for filtered columns
52
35
std::vector<std::pair<unsigned , slave::PtrField>> filter_;
53
36
const auto rtable = tempslave.getRli ().getTable (table->name );
@@ -56,7 +39,7 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
56
39
for (const auto ptr_field : rtable->fields ) {
57
40
const auto it = table->filter .find (ptr_field->field_name );
58
41
if (it != table->filter .end ()) {
59
- filter_.emplace_back (it->second , ptr_field);
42
+ filter_.emplace_back (it->second . first , ptr_field);
60
43
s_fields += ptr_field->field_name ;
61
44
s_fields += ' ,' ;
62
45
}
@@ -71,7 +54,7 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
71
54
std::cref (table->name .second ),
72
55
std::cref (filter_),
73
56
_1,
74
- cb
57
+ std::cref (cb)
75
58
));
76
59
}
77
60
@@ -81,15 +64,15 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
81
64
ev->binlog_name = binlog_name;
82
65
ev->binlog_pos = binlog_pos;
83
66
// ev->seconds_behind_master = GetSecondsBehindMaster();
84
- ev->unix_timestamp = long (time (NULL ));
67
+ // ev->unix_timestamp = long(time(NULL));
85
68
ev->event = " IGNORE" ;
86
69
stopped = cb (std::move (ev));
87
70
}
88
71
89
72
// tempslave.close_connection();
90
73
}
91
74
92
- void DBReader::ReadBinlog (const std::string & binlog_name, unsigned long binlog_pos, BinlogEventCallback cb)
75
+ void DBReader::ReadBinlog (const std::string& binlog_name, unsigned long binlog_pos, const BinlogEventCallback& cb)
93
76
{
94
77
stopped = false ;
95
78
state.setMasterLogNamePos (binlog_name, binlog_pos);
@@ -98,10 +81,15 @@ void DBReader::ReadBinlog(const std::string &binlog_name, unsigned long binlog_p
98
81
slave.setCallback (
99
82
table->name .first ,
100
83
table->name .second ,
101
- std::bind (&DBReader::EventCallback, this , _1, std::cref (table->filter ), cb)
84
+ std::bind (
85
+ table->is_primary
86
+ ? &DBReader::EventCallbackNormal
87
+ : &DBReader::EventCallbackNullify,
88
+ this , _1, std::cref (table->filter ), std::cref (cb)
89
+ )
102
90
);
103
91
}
104
- slave.setXidCallback (std::bind (&DBReader::XidEventCallback, this , _1, cb ));
92
+ slave.setXidCallback (std::bind (&DBReader::XidEventCallback, this , _1, std::cref (cb) ));
105
93
slave.init ();
106
94
slave.createDatabaseStructure ();
107
95
@@ -114,62 +102,106 @@ void DBReader::Stop()
114
102
slave.close_connection ();
115
103
}
116
104
117
- void DBReader::EventCallback (const slave::RecordSet& event, const std::map<std::string, unsigned >& filter, BinlogEventCallback cb)
118
- {
119
- last_event_when = event.when ;
105
+ void DBReader::EventCallbackNormal (
106
+ const slave::RecordSet& event,
107
+ const std::map<std::string, std::pair<unsigned , bool >>& filter,
108
+ const BinlogEventCallback& cb
109
+ ) {
110
+ if (stopped) return ;
111
+ // last_event_when = event.when;
120
112
121
113
SerializableBinlogEventPtr ev (new SerializableBinlogEvent);
114
+
115
+ switch (event.type_event ) {
116
+ case slave::RecordSet::Delete: ev->event = " DELETE" ; break ;
117
+ case slave::RecordSet::Update: ev->event = " UPDATE" ; break ;
118
+ case slave::RecordSet::Write: ev->event = " INSERT" ; break ;
119
+ default : return ;
120
+ }
121
+
122
122
ev->binlog_name = state.getMasterLogName ();
123
123
ev->binlog_pos = state.getMasterLogPos ();
124
124
// ev->seconds_behind_master = GetSecondsBehindMaster();
125
- ev->unix_timestamp = long (time (NULL ));
125
+ // ev->unix_timestamp = long(time(NULL));
126
126
ev->database = event.db_name ;
127
127
ev->table = event.tbl_name ;
128
128
129
+ for (auto fi = filter.begin (), end = filter.end (); fi != end; ++fi) {
130
+ const auto ri = event.m_row .find (fi->first );
131
+ if (ri != event.m_row .end ()) {
132
+ ev->row [ fi->second .first ] = ri->second ;
133
+ }
134
+ }
135
+ stopped = cb (std::move (ev));
136
+ }
137
+
138
+ void DBReader::EventCallbackNullify (
139
+ const slave::RecordSet& event,
140
+ const std::map<std::string, std::pair<unsigned , bool >>& filter,
141
+ const BinlogEventCallback& cb
142
+ ) {
143
+ if (stopped) return ;
144
+ // last_event_when = event.when;
145
+
146
+ SerializableBinlogEventPtr ev (new SerializableBinlogEvent);
147
+ bool is_delete = false ;
148
+
129
149
switch (event.type_event ) {
130
- case slave::RecordSet::Delete: ev->event = " DELETE" ; break ;
150
+ case slave::RecordSet::Delete: ev->event = " DELETE" ; is_delete = true ;
131
151
case slave::RecordSet::Update: ev->event = " UPDATE" ; break ;
132
152
case slave::RecordSet::Write: ev->event = " INSERT" ; break ;
133
- default : ev-> event = " IGNORE " ;
153
+ default : return ;
134
154
}
155
+
156
+ ev->binlog_name = state.getMasterLogName ();
157
+ ev->binlog_pos = state.getMasterLogPos ();
158
+ // ev->seconds_behind_master = GetSecondsBehindMaster();
159
+ // ev->unix_timestamp = long(time(NULL));
160
+ ev->database = event.db_name ;
161
+ ev->table = event.tbl_name ;
162
+
135
163
for (auto fi = filter.begin (), end = filter.end (); fi != end; ++fi) {
136
164
const auto ri = event.m_row .find (fi->first );
137
165
if (ri != event.m_row .end ()) {
138
- ev->row [ fi->second ] = ri->second ;
166
+ // if it's not a key and event is delete - don't actually delete, just nullify
167
+ ev->row [ fi->second .first ] = !fi->second .second && is_delete ? boost::any () : ri->second ;
139
168
}
140
169
}
141
170
stopped = cb (std::move (ev));
142
171
}
143
172
144
- void DBReader::XidEventCallback (unsigned int server_id, BinlogEventCallback cb)
173
+ void DBReader::XidEventCallback (unsigned int server_id, const BinlogEventCallback& cb)
145
174
{
146
- last_event_when = ::time (NULL );
175
+ if (stopped) return ;
176
+ // last_event_when = ::time(NULL);
147
177
148
178
// send binlog position update event
149
179
SerializableBinlogEventPtr ev (new SerializableBinlogEvent);
150
180
ev->binlog_name = state.getMasterLogName ();
151
181
ev->binlog_pos = state.getMasterLogPos ();
152
182
// ev->seconds_behind_master = GetSecondsBehindMaster();
153
- ev->unix_timestamp = long (time (NULL ));
183
+ // ev->unix_timestamp = long(time(NULL));
154
184
ev->event = " IGNORE" ;
155
185
stopped = cb (std::move (ev));
156
186
}
157
187
158
188
void DBReader::DumpTablesCallback (
159
- const std::string & db_name,
160
- const std::string & tbl_name,
189
+ const std::string& db_name,
190
+ const std::string& tbl_name,
161
191
const std::vector<std::pair<unsigned , slave::PtrField>>& filter,
162
192
const nanomysql::fields_t & fields,
163
- BinlogEventCallback cb
193
+ const BinlogEventCallback& cb
164
194
) {
195
+ if (stopped) return ;
196
+
165
197
SerializableBinlogEventPtr ev (new SerializableBinlogEvent);
166
198
ev->binlog_name = " " ;
167
199
ev->binlog_pos = 0 ;
168
200
ev->database = db_name;
169
201
ev->table = tbl_name;
170
202
ev->event = " INSERT" ;
171
203
// ev->seconds_behind_master = GetSecondsBehindMaster();
172
- ev->unix_timestamp = long (time (NULL ));
204
+ // ev->unix_timestamp = long(time(NULL));
173
205
174
206
for (const auto & it : filter) {
175
207
slave::PtrField ptr_field = it.second ;
@@ -181,13 +213,11 @@ void DBReader::DumpTablesCallback(
181
213
ev->row [ it.first ] = ptr_field->field_data ;
182
214
}
183
215
}
184
- if (!stopped) {
185
- stopped = cb (std::move (ev));
186
- }
216
+ stopped = cb (std::move (ev));
187
217
}
188
218
189
- unsigned DBReader::GetSecondsBehindMaster () const {
190
- return std::max (::time (NULL ) - last_event_when, 0L );
191
- }
219
+ // unsigned DBReader::GetSecondsBehindMaster() const {
220
+ // return std::max(::time(NULL) - last_event_when, 0L);
221
+ // }
192
222
193
223
} // replicator
0 commit comments