Skip to content

Commit 55ba0e0

Browse files
committed
Merge commit '4f4a8f8f2d667471208557fa934d752b7371ea45'
2 parents ecf0a17 + 4f4a8f8 commit 55ba0e0

File tree

9 files changed

+324
-95
lines changed

9 files changed

+324
-95
lines changed

zongji/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ npm-install-stamp
2424
*.swo
2525
*.swp
2626
ben.js
27+
.vagrant
28+
Vagrantfile

zongji/README.md

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ ZongJi (踪迹) is pronounced as `zōng jì` in Chinese.
55

66
This package is a "pure JS" implementation based on [`node-mysql`](https://github.com/felixge/node-mysql). Since v0.2.0, The native part (which was written in C++) has been dropped.
77

8+
This package has been tested with MySQL server 5.5.40 and 5.6.19. All MySQL server versions >= 5.1.15 are supported.
9+
810
## Quick Start
911

1012
```javascript
@@ -106,6 +108,7 @@ Name | Description
106108
* :star2: [All types allowed by `node-mysql`](https://github.com/felixge/node-mysql#type-casting) are supported by this package.
107109
* :speak_no_evil: While 64-bit integers in MySQL (`BIGINT` type) allow values in the range of 2<sup>64</sup> (± ½ × 2<sup>64</sup> for signed values), Javascript's internal storage of numbers limits values to 2<sup>53</sup>, making the allowed range of `BIGINT` fields only `-9007199254740992` to `9007199254740992`. Unsigned 64-bit integers must also not exceed `9007199254740992`.
108110
* :point_right: `TRUNCATE` statement does not cause corresponding `DeleteRows` event. Use unqualified `DELETE FROM` for same effect.
111+
* When using fractional seconds with `DATETIME` and `TIMESTAMP` data types in MySQL > 5.6.4, only millisecond precision is available due to the limit of Javascript's `Date` object.
109112

110113
## Run Tests
111114

@@ -124,6 +127,7 @@ I learnt many things from following resources while making ZongJi.
124127
* http://dev.mysql.com/doc/internals/en/replication-protocol.html
125128
* http://www.cs.wichita.edu/~chang/lecture/cs742/program/how-mysql-c-api.html
126129
* https://github.com/jeremycole/mysql_binlog (Ruby implemenation of MySQL binlog parser)
130+
* http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
127131

128132
## License
129133
MIT

zongji/index.js

+43-25
Original file line numberDiff line numberDiff line change
@@ -38,36 +38,54 @@ util.inherits(ZongJi, EventEmitter);
3838

3939
ZongJi.prototype._init = function() {
4040
var self = this;
41+
var binlogOptions = {
42+
tableMap: self.tableMap,
43+
};
4144

42-
this._isChecksumEnabled(function(checksumEnabled) {
43-
self.useChecksum = checksumEnabled;
44-
var options = {
45-
tableMap: self.tableMap,
46-
useChecksum: checksumEnabled,
47-
};
48-
49-
if(self.options.serverId !== undefined){
50-
options.serverId = self.options.serverId;
45+
var asyncMethods = [
46+
{
47+
name: '_isChecksumEnabled',
48+
callback: function(checksumEnabled) {
49+
self.useChecksum = checksumEnabled;
50+
binlogOptions.useChecksum = checksumEnabled
51+
}
52+
},
53+
{
54+
name: '_findBinlogEnd',
55+
callback: function(result){
56+
if(result && self.options.startAtEnd){
57+
binlogOptions.filename = result.Log_name;
58+
binlogOptions.position = result.File_size;
59+
}
60+
}
5161
}
62+
];
63+
64+
var methodIndex = 0;
65+
var nextMethod = function(){
66+
var method = asyncMethods[methodIndex];
67+
self[method.name](function(/* args */){
68+
method.callback.apply(this, arguments);
69+
methodIndex++;
70+
if(methodIndex < asyncMethods.length){
71+
nextMethod();
72+
}else{
73+
ready();
74+
}
75+
});
76+
};
77+
nextMethod();
5278

53-
var ready = function(){
54-
self.binlog = generateBinlog.call(self, options);
55-
self.ready = true;
56-
self._executeCtrlCallbacks();
79+
var ready = function(){
80+
// Run asynchronously from _init(), as serverId option set in start()
81+
if(self.options.serverId !== undefined){
82+
binlogOptions.serverId = self.options.serverId;
5783
}
5884

59-
if(self.options.startAtEnd){
60-
self._findBinlogEnd(function(result){
61-
if(result){
62-
options.filename = result.Log_name;
63-
options.position = result.File_size;
64-
}
65-
ready();
66-
});
67-
}else{
68-
ready();
69-
}
70-
});
85+
self.binlog = generateBinlog.call(self, binlogOptions);
86+
self.ready = true;
87+
self._executeCtrlCallbacks();
88+
};
7189
};
7290

7391
ZongJi.prototype._isChecksumEnabled = function(next) {

zongji/lib/common.js

+96
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ var MysqlTypes = {
1616
'NEWDATE': 14,
1717
'VARCHAR': 15,
1818
'BIT': 16,
19+
// Fractional temporal types in MySQL >=5.6.4
20+
'TIMESTAMP2': 17,
21+
'DATETIME2': 18,
22+
'TIME2': 19,
1923
'NEWDECIMAL': 246,
2024
'ENUM': 247,
2125
'SET': 248,
@@ -81,6 +85,14 @@ var parseSetEnumTypeDef = function(type){
8185
});
8286
};
8387

88+
// @param {string} type - ex. 'time(4)'
89+
// @return {int} - ex. 4
90+
var parseTypeDefIntArg = function(type){
91+
var start = type.indexOf('(') + 1;
92+
if(start === 0) return undefined;
93+
return parseInt(type.substr(start, type.length - start - 1), 10);
94+
};
95+
8496
var zeroPad = function(num, size) {
8597
// Max 32 digits
8698
var s = "00000000000000000000000000000000" + num;
@@ -328,6 +340,36 @@ var convertToMysqlType = exports.convertToMysqlType = function(code) {
328340
return result;
329341
};
330342

343+
var readTemporalFraction = function(parser, columnSchema) {
344+
var fractionPrecision = parseTypeDefIntArg(columnSchema.COLUMN_TYPE);
345+
if(fractionPrecision === undefined || fractionPrecision === 0) return false;
346+
347+
var fractionSize = Math.ceil(fractionPrecision / 2);
348+
var fraction = readIntBE(parser._buffer, parser._offset, fractionSize);
349+
parser._offset += fractionSize;
350+
if(fractionPrecision % 2 !== 0) fraction /= 10; // Not using full space
351+
if(fraction < 0) fraction *= -1; // Negative time, fraction not negative
352+
353+
var fractionStr = zeroPad(fraction, fractionPrecision);
354+
var milliseconds;
355+
if(fractionStr.length > 3){
356+
milliseconds = fractionStr.substr(0, 3);
357+
}else if(fractionStr.length === 2){
358+
milliseconds = fractionStr + '0';
359+
}else if(fractionStr.length === 1){
360+
milliseconds = fractionStr + '00';
361+
}else{
362+
milliseconds = fractionStr;
363+
}
364+
365+
return {
366+
value: fraction,
367+
precision: fractionPrecision,
368+
string: fractionStr,
369+
milliseconds: parseInt(milliseconds, 10)
370+
};
371+
};
372+
331373
exports.readMysqlValue = function(parser, column, columnSchema) {
332374
var result;
333375
// jshint indent: false
@@ -431,11 +473,36 @@ exports.readMysqlValue = function(parser, column, columnSchema) {
431473
var minute = Math.floor((raw % 10000) / 100);
432474
var second = raw % 100;
433475
if(isNegative) second += 1;
476+
477+
result = (isNegative ? '-' : '') +
478+
zeroPad(hour, hour > 99 ? 3 : 2) + ':' +
479+
zeroPad(minute, 2) + ':' +
480+
zeroPad(second, 2);
481+
break;
482+
case MysqlTypes.TIME2:
483+
var raw = readIntBE(parser._buffer, parser._offset, 3);
484+
parser._offset += 3;
485+
var fraction = readTemporalFraction(parser, columnSchema);
486+
487+
var isNegative = (raw & (1 << 23)) === 0;
488+
if(isNegative) raw = raw ^ ((1 << 24) - 1); // flip all bits
489+
490+
var hour = sliceBits(raw, 12, 22);
491+
var minute = sliceBits(raw, 6, 12);
492+
var second = sliceBits(raw, 0, 6);
493+
494+
if(isNegative && (fraction === false || fraction.value === 0)){
495+
second++;
496+
}
434497

435498
result = (isNegative ? '-' : '') +
436499
zeroPad(hour, hour > 99 ? 3 : 2) + ':' +
437500
zeroPad(minute, 2) + ':' +
438501
zeroPad(second, 2);
502+
503+
if(fraction !== false){
504+
result += '.' + fraction.string;
505+
}
439506
break;
440507
case MysqlTypes.DATETIME:
441508
var raw = exports.parseUInt64(parser);
@@ -450,10 +517,39 @@ exports.readMysqlValue = function(parser, column, columnSchema) {
450517
time % 100 // seconds
451518
);
452519
break;
520+
case MysqlTypes.DATETIME2:
521+
// Overlapping high-low to get all data in 32-bit numbers
522+
var rawHigh = readIntBE(parser._buffer, parser._offset, 4);
523+
var rawLow = readIntBE(parser._buffer, parser._offset + 1, 4);
524+
parser._offset += 5;
525+
var fraction = readTemporalFraction(parser, columnSchema);
526+
527+
var yearMonth = sliceBits(rawHigh, 14, 31);
528+
result = new Date(
529+
Math.floor(yearMonth / 13), // year
530+
(yearMonth % 13) - 1, // month
531+
sliceBits(rawLow, 17, 22), // day
532+
sliceBits(rawLow, 12, 17), // hour
533+
sliceBits(rawLow, 6, 12), // minutes
534+
sliceBits(rawLow, 0, 6) // seconds
535+
);
536+
537+
if(fraction !== false){
538+
// Javascript Date only supports milliseconds
539+
result.setMilliseconds(fraction.milliseconds);
540+
}
541+
break;
453542
case MysqlTypes.TIMESTAMP:
454543
var raw = parser.parseUnsignedNumber(4);
455544
result = new Date(raw * 1000);
456545
break;
546+
case MysqlTypes.TIMESTAMP2:
547+
var raw = readIntBE(parser._buffer, parser._offset, 4);
548+
parser._offset += 4;
549+
var fraction = readTemporalFraction(parser, columnSchema);
550+
var milliseconds = fraction !== false ? fraction.milliseconds : 0;
551+
result = new Date((raw * 1000) + milliseconds);
552+
break;
457553
case MysqlTypes.YEAR:
458554
var raw = parser.parseUnsignedNumber(1);
459555
result = raw + 1900;

zongji/lib/rows_event.js

+15
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,21 @@ var Version2Events = [
88
0x20, // DELETE_ROWS_EVENT_V2
99
];
1010

11+
var CHECKSUM_SIZE = 4;
12+
1113
/**
1214
* Generic RowsEvent class
1315
* Attributes:
1416
* position: Position inside next binlog
1517
* binlogName: Name of next binlog file
18+
* zongji: ZongJi instance
1619
**/
1720

1821
function RowsEvent(parser, options, zongji) {
1922
BinlogEvent.apply(this, arguments);
2023
this._readTableId(parser);
2124
this.flags = parser.parseUnsignedNumber(2);
25+
this.useChecksum = zongji.useChecksum;
2226

2327
// Version 2 Events
2428
if (Version2Events.indexOf(options.eventType) !== -1) {
@@ -49,10 +53,21 @@ function RowsEvent(parser, options, zongji) {
4953
parser._offset += columnsPresentBitmapSize;
5054
}
5155

56+
if(this.useChecksum){
57+
// Ignore the checksum at the end of this packet
58+
parser._packetEnd -= CHECKSUM_SIZE;
59+
}
60+
5261
this.rows = [];
5362
while (!parser.reachedPacketEnd()) {
5463
this.rows.push(this._fetchOneRow(parser));
5564
}
65+
66+
if(this.useChecksum){
67+
// Skip past the checksum at the end of the packet
68+
parser._packetEnd += CHECKSUM_SIZE;
69+
parser._offset += CHECKSUM_SIZE;
70+
}
5671
}
5772
}
5873

zongji/test/events.js

+13-10
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ var conn = process.testZongJi || {};
88

99
var checkTableMatches = function(tableName){
1010
return function(test, event){
11-
var tableDetails = event.tableMap[event.tableId];
11+
var tableDetails = event.tableMap[event.tableId];
1212
test.strictEqual(tableDetails.parentSchema, settings.database);
1313
test.strictEqual(tableDetails.tableName, tableName);
1414
};
@@ -65,15 +65,18 @@ module.exports = {
6565
includeEvents: ['tablemap', 'writerows']
6666
});
6767

68-
querySequence(conn.db, [
69-
'INSERT INTO ' + conn.escId(testTable) + ' (col) VALUES (10)',
70-
], function(results){
71-
// Should only have 2 events since ZongJi start
72-
test.equal(events.length, 2);
73-
test.equal(events[1].rows[0].col, 10);
74-
zongji.stop();
75-
test.done();
76-
});
68+
// Give enough time to initialize
69+
setTimeout(function(){
70+
querySequence(conn.db, [
71+
'INSERT INTO ' + conn.escId(testTable) + ' (col) VALUES (10)',
72+
], function(results){
73+
// Should only have 2 events since ZongJi start
74+
test.equal(events.length, 2);
75+
test.equal(events[1].rows[0].col, 10);
76+
zongji.stop();
77+
test.done();
78+
});
79+
}, 200);
7780

7881
});
7982
},

zongji/test/helpers/connector.js

+10-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,16 @@ module.exports = function(settings, callback){
1818
'CREATE DATABASE ' + escId(settings.database),
1919
'USE ' + escId(settings.database),
2020
'RESET MASTER',
21-
], function(){
21+
'SELECT VERSION() AS version'
22+
], function(results){
23+
24+
self.mysqlVersion = results[results.length - 1][0].version
25+
.split('-')[0]
26+
.split('.')
27+
.map(function(part){
28+
return parseInt(part, 10);
29+
});
30+
2231
var zongji = self.zongji = new ZongJi(settings.connection);
2332

2433
zongji.on('binlog', function(event) {

zongji/test/helpers/querySequence.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ module.exports = function(connection, debug, queries, callback){
1919
if(index < sequence.length - 1){
2020
sequence[index + 1]();
2121
}else{
22-
callback(results);
22+
setTimeout(function(){
23+
callback(results);
24+
}, 200);
2325
}
2426
});
2527
}

0 commit comments

Comments
 (0)