Skip to content

Commit 4f4a8f8

Browse files
committed
Squashed 'zongji/' changes from a6a4d82..8f012db
8f012db TIMESTAMP2 data type added and tested f39e91b DATETIME2 data type added and tested 93f498a TIME2 data type added and tested 57ca7ab Recognize checksum in row event 14dee38 Make querySequence callback asynchronous git-subtree-dir: zongji git-subtree-split: 8f012dbe1cc9aa04c900d62e578159bcc799b1eb
1 parent a7a6309 commit 4f4a8f8

File tree

9 files changed

+324
-95
lines changed

9 files changed

+324
-95
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ npm-install-stamp
2424
*.swo
2525
*.swp
2626
ben.js
27+
.vagrant
28+
Vagrantfile

README.md

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ ZongJi (踪迹) is pronounced as `zōng jì` in Chinese.
55

66
This package is a "pure JS" implementation based on [`node-mysql`](https://github.com/felixge/node-mysql). Since v0.2.0, The native part (which was written in C++) has been dropped.
77

8+
This package has been tested with MySQL server 5.5.40 and 5.6.19. All MySQL server versions >= 5.1.15 are supported.
9+
810
## Quick Start
911

1012
```javascript
@@ -106,6 +108,7 @@ Name | Description
106108
* :star2: [All types allowed by `node-mysql`](https://github.com/felixge/node-mysql#type-casting) are supported by this package.
107109
* :speak_no_evil: While 64-bit integers in MySQL (`BIGINT` type) allow values in the range of 2<sup>64</sup> (± ½ × 2<sup>64</sup> for signed values), Javascript's internal storage of numbers limits values to 2<sup>53</sup>, making the allowed range of `BIGINT` fields only `-9007199254740992` to `9007199254740992`. Unsigned 64-bit integers must also not exceed `9007199254740992`.
108110
* :point_right: `TRUNCATE` statement does not cause corresponding `DeleteRows` event. Use unqualified `DELETE FROM` for same effect.
111+
* When using fractional seconds with `DATETIME` and `TIMESTAMP` data types in MySQL > 5.6.4, only millisecond precision is available due to the limit of Javascript's `Date` object.
109112

110113
## Run Tests
111114

@@ -124,6 +127,7 @@ I learnt many things from following resources while making ZongJi.
124127
* http://dev.mysql.com/doc/internals/en/replication-protocol.html
125128
* http://www.cs.wichita.edu/~chang/lecture/cs742/program/how-mysql-c-api.html
126129
* https://github.com/jeremycole/mysql_binlog (Ruby implemenation of MySQL binlog parser)
130+
* http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
127131

128132
## License
129133
MIT

index.js

+43-25
Original file line numberDiff line numberDiff line change
@@ -38,36 +38,54 @@ util.inherits(ZongJi, EventEmitter);
3838

3939
ZongJi.prototype._init = function() {
4040
var self = this;
41+
var binlogOptions = {
42+
tableMap: self.tableMap,
43+
};
4144

42-
this._isChecksumEnabled(function(checksumEnabled) {
43-
self.useChecksum = checksumEnabled;
44-
var options = {
45-
tableMap: self.tableMap,
46-
useChecksum: checksumEnabled,
47-
};
48-
49-
if(self.options.serverId !== undefined){
50-
options.serverId = self.options.serverId;
45+
var asyncMethods = [
46+
{
47+
name: '_isChecksumEnabled',
48+
callback: function(checksumEnabled) {
49+
self.useChecksum = checksumEnabled;
50+
binlogOptions.useChecksum = checksumEnabled
51+
}
52+
},
53+
{
54+
name: '_findBinlogEnd',
55+
callback: function(result){
56+
if(result && self.options.startAtEnd){
57+
binlogOptions.filename = result.Log_name;
58+
binlogOptions.position = result.File_size;
59+
}
60+
}
5161
}
62+
];
63+
64+
var methodIndex = 0;
65+
var nextMethod = function(){
66+
var method = asyncMethods[methodIndex];
67+
self[method.name](function(/* args */){
68+
method.callback.apply(this, arguments);
69+
methodIndex++;
70+
if(methodIndex < asyncMethods.length){
71+
nextMethod();
72+
}else{
73+
ready();
74+
}
75+
});
76+
};
77+
nextMethod();
5278

53-
var ready = function(){
54-
self.binlog = generateBinlog.call(self, options);
55-
self.ready = true;
56-
self._executeCtrlCallbacks();
79+
var ready = function(){
80+
// Run asynchronously from _init(), as serverId option set in start()
81+
if(self.options.serverId !== undefined){
82+
binlogOptions.serverId = self.options.serverId;
5783
}
5884

59-
if(self.options.startAtEnd){
60-
self._findBinlogEnd(function(result){
61-
if(result){
62-
options.filename = result.Log_name;
63-
options.position = result.File_size;
64-
}
65-
ready();
66-
});
67-
}else{
68-
ready();
69-
}
70-
});
85+
self.binlog = generateBinlog.call(self, binlogOptions);
86+
self.ready = true;
87+
self._executeCtrlCallbacks();
88+
};
7189
};
7290

7391
ZongJi.prototype._isChecksumEnabled = function(next) {

lib/common.js

+96
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ var MysqlTypes = {
1616
'NEWDATE': 14,
1717
'VARCHAR': 15,
1818
'BIT': 16,
19+
// Fractional temporal types in MySQL >=5.6.4
20+
'TIMESTAMP2': 17,
21+
'DATETIME2': 18,
22+
'TIME2': 19,
1923
'NEWDECIMAL': 246,
2024
'ENUM': 247,
2125
'SET': 248,
@@ -81,6 +85,14 @@ var parseSetEnumTypeDef = function(type){
8185
});
8286
};
8387

88+
// @param {string} type - ex. 'time(4)'
89+
// @return {int} - ex. 4
90+
var parseTypeDefIntArg = function(type){
91+
var start = type.indexOf('(') + 1;
92+
if(start === 0) return undefined;
93+
return parseInt(type.substr(start, type.length - start - 1), 10);
94+
};
95+
8496
var zeroPad = function(num, size) {
8597
// Max 32 digits
8698
var s = "00000000000000000000000000000000" + num;
@@ -328,6 +340,36 @@ var convertToMysqlType = exports.convertToMysqlType = function(code) {
328340
return result;
329341
};
330342

343+
var readTemporalFraction = function(parser, columnSchema) {
344+
var fractionPrecision = parseTypeDefIntArg(columnSchema.COLUMN_TYPE);
345+
if(fractionPrecision === undefined || fractionPrecision === 0) return false;
346+
347+
var fractionSize = Math.ceil(fractionPrecision / 2);
348+
var fraction = readIntBE(parser._buffer, parser._offset, fractionSize);
349+
parser._offset += fractionSize;
350+
if(fractionPrecision % 2 !== 0) fraction /= 10; // Not using full space
351+
if(fraction < 0) fraction *= -1; // Negative time, fraction not negative
352+
353+
var fractionStr = zeroPad(fraction, fractionPrecision);
354+
var milliseconds;
355+
if(fractionStr.length > 3){
356+
milliseconds = fractionStr.substr(0, 3);
357+
}else if(fractionStr.length === 2){
358+
milliseconds = fractionStr + '0';
359+
}else if(fractionStr.length === 1){
360+
milliseconds = fractionStr + '00';
361+
}else{
362+
milliseconds = fractionStr;
363+
}
364+
365+
return {
366+
value: fraction,
367+
precision: fractionPrecision,
368+
string: fractionStr,
369+
milliseconds: parseInt(milliseconds, 10)
370+
};
371+
};
372+
331373
exports.readMysqlValue = function(parser, column, columnSchema) {
332374
var result;
333375
// jshint indent: false
@@ -431,11 +473,36 @@ exports.readMysqlValue = function(parser, column, columnSchema) {
431473
var minute = Math.floor((raw % 10000) / 100);
432474
var second = raw % 100;
433475
if(isNegative) second += 1;
476+
477+
result = (isNegative ? '-' : '') +
478+
zeroPad(hour, hour > 99 ? 3 : 2) + ':' +
479+
zeroPad(minute, 2) + ':' +
480+
zeroPad(second, 2);
481+
break;
482+
case MysqlTypes.TIME2:
483+
var raw = readIntBE(parser._buffer, parser._offset, 3);
484+
parser._offset += 3;
485+
var fraction = readTemporalFraction(parser, columnSchema);
486+
487+
var isNegative = (raw & (1 << 23)) === 0;
488+
if(isNegative) raw = raw ^ ((1 << 24) - 1); // flip all bits
489+
490+
var hour = sliceBits(raw, 12, 22);
491+
var minute = sliceBits(raw, 6, 12);
492+
var second = sliceBits(raw, 0, 6);
493+
494+
if(isNegative && (fraction === false || fraction.value === 0)){
495+
second++;
496+
}
434497

435498
result = (isNegative ? '-' : '') +
436499
zeroPad(hour, hour > 99 ? 3 : 2) + ':' +
437500
zeroPad(minute, 2) + ':' +
438501
zeroPad(second, 2);
502+
503+
if(fraction !== false){
504+
result += '.' + fraction.string;
505+
}
439506
break;
440507
case MysqlTypes.DATETIME:
441508
var raw = exports.parseUInt64(parser);
@@ -450,10 +517,39 @@ exports.readMysqlValue = function(parser, column, columnSchema) {
450517
time % 100 // seconds
451518
);
452519
break;
520+
case MysqlTypes.DATETIME2:
521+
// Overlapping high-low to get all data in 32-bit numbers
522+
var rawHigh = readIntBE(parser._buffer, parser._offset, 4);
523+
var rawLow = readIntBE(parser._buffer, parser._offset + 1, 4);
524+
parser._offset += 5;
525+
var fraction = readTemporalFraction(parser, columnSchema);
526+
527+
var yearMonth = sliceBits(rawHigh, 14, 31);
528+
result = new Date(
529+
Math.floor(yearMonth / 13), // year
530+
(yearMonth % 13) - 1, // month
531+
sliceBits(rawLow, 17, 22), // day
532+
sliceBits(rawLow, 12, 17), // hour
533+
sliceBits(rawLow, 6, 12), // minutes
534+
sliceBits(rawLow, 0, 6) // seconds
535+
);
536+
537+
if(fraction !== false){
538+
// Javascript Date only supports milliseconds
539+
result.setMilliseconds(fraction.milliseconds);
540+
}
541+
break;
453542
case MysqlTypes.TIMESTAMP:
454543
var raw = parser.parseUnsignedNumber(4);
455544
result = new Date(raw * 1000);
456545
break;
546+
case MysqlTypes.TIMESTAMP2:
547+
var raw = readIntBE(parser._buffer, parser._offset, 4);
548+
parser._offset += 4;
549+
var fraction = readTemporalFraction(parser, columnSchema);
550+
var milliseconds = fraction !== false ? fraction.milliseconds : 0;
551+
result = new Date((raw * 1000) + milliseconds);
552+
break;
457553
case MysqlTypes.YEAR:
458554
var raw = parser.parseUnsignedNumber(1);
459555
result = raw + 1900;

lib/rows_event.js

+15
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,21 @@ var Version2Events = [
88
0x20, // DELETE_ROWS_EVENT_V2
99
];
1010

11+
var CHECKSUM_SIZE = 4;
12+
1113
/**
1214
* Generic RowsEvent class
1315
* Attributes:
1416
* position: Position inside next binlog
1517
* binlogName: Name of next binlog file
18+
* zongji: ZongJi instance
1619
**/
1720

1821
function RowsEvent(parser, options, zongji) {
1922
BinlogEvent.apply(this, arguments);
2023
this._readTableId(parser);
2124
this.flags = parser.parseUnsignedNumber(2);
25+
this.useChecksum = zongji.useChecksum;
2226

2327
// Version 2 Events
2428
if (Version2Events.indexOf(options.eventType) !== -1) {
@@ -49,10 +53,21 @@ function RowsEvent(parser, options, zongji) {
4953
parser._offset += columnsPresentBitmapSize;
5054
}
5155

56+
if(this.useChecksum){
57+
// Ignore the checksum at the end of this packet
58+
parser._packetEnd -= CHECKSUM_SIZE;
59+
}
60+
5261
this.rows = [];
5362
while (!parser.reachedPacketEnd()) {
5463
this.rows.push(this._fetchOneRow(parser));
5564
}
65+
66+
if(this.useChecksum){
67+
// Skip past the checksum at the end of the packet
68+
parser._packetEnd += CHECKSUM_SIZE;
69+
parser._offset += CHECKSUM_SIZE;
70+
}
5671
}
5772
}
5873

test/events.js

+13-10
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ var conn = process.testZongJi || {};
88

99
var checkTableMatches = function(tableName){
1010
return function(test, event){
11-
var tableDetails = event.tableMap[event.tableId];
11+
var tableDetails = event.tableMap[event.tableId];
1212
test.strictEqual(tableDetails.parentSchema, settings.database);
1313
test.strictEqual(tableDetails.tableName, tableName);
1414
};
@@ -65,15 +65,18 @@ module.exports = {
6565
includeEvents: ['tablemap', 'writerows']
6666
});
6767

68-
querySequence(conn.db, [
69-
'INSERT INTO ' + conn.escId(testTable) + ' (col) VALUES (10)',
70-
], function(results){
71-
// Should only have 2 events since ZongJi start
72-
test.equal(events.length, 2);
73-
test.equal(events[1].rows[0].col, 10);
74-
zongji.stop();
75-
test.done();
76-
});
68+
// Give enough time to initialize
69+
setTimeout(function(){
70+
querySequence(conn.db, [
71+
'INSERT INTO ' + conn.escId(testTable) + ' (col) VALUES (10)',
72+
], function(results){
73+
// Should only have 2 events since ZongJi start
74+
test.equal(events.length, 2);
75+
test.equal(events[1].rows[0].col, 10);
76+
zongji.stop();
77+
test.done();
78+
});
79+
}, 200);
7780

7881
});
7982
},

test/helpers/connector.js

+10-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,16 @@ module.exports = function(settings, callback){
1818
'CREATE DATABASE ' + escId(settings.database),
1919
'USE ' + escId(settings.database),
2020
'RESET MASTER',
21-
], function(){
21+
'SELECT VERSION() AS version'
22+
], function(results){
23+
24+
self.mysqlVersion = results[results.length - 1][0].version
25+
.split('-')[0]
26+
.split('.')
27+
.map(function(part){
28+
return parseInt(part, 10);
29+
});
30+
2231
var zongji = self.zongji = new ZongJi(settings.connection);
2332

2433
zongji.on('binlog', function(event) {

test/helpers/querySequence.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ module.exports = function(connection, debug, queries, callback){
1919
if(index < sequence.length - 1){
2020
sequence[index + 1]();
2121
}else{
22-
callback(results);
22+
setTimeout(function(){
23+
callback(results);
24+
}, 200);
2325
}
2426
});
2527
}

0 commit comments

Comments
 (0)