Skip to content

Improved logging #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions lib/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,55 @@ var termTypes = protodef.Term.TermType;
var datumTypes = protodef.Datum.DatumType;
var net = require('net');

var logLevels = {
error: 0,
warn: 1,
info: 2,
debug: 3,
};

function createLogger(poolMaster, silent) {
return function(message) {
if (silent !== true) {
console.error(message);
function createLogger(poolMaster, logLevel, logger) {
var maxLevel = logLevels[logLevel];
if (maxLevel === undefined) {
throw new Error('Unsupported log level: ' + logLevel);
}

var log = Function.prototype;
if (logger === undefined) {
log = function(level, message) {
if (level === 'error') {
console.error(message);
} else if (level === 'warn') {
console.warn(message);
} else {
console.log(message);
}
};
} else if (logger) {
if (typeof logger === 'function') {
log = logger;
} else if (typeof logger === 'object') {
log = function(level, message) {
logger[level](message);
};
} else {
throw new TypeError('`options.log` must be an object or function');
}
poolMaster.emit('log', message);
}

return {
log: function(level, message) {
if (logLevels[level] <= maxLevel) {
log(level, message);
poolMaster.emit('log', message, level);
}
},
error: function(error) {
log('error', error.stack);
poolMaster.emit('log', error.stack, 'error');
poolMaster.emit('error', error);
}
};
}
module.exports.createLogger = createLogger;

Expand Down
20 changes: 12 additions & 8 deletions lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function Pool(r, options) {
}
}, 0);
this.id = Math.floor(Math.random()*100000);
this._log('Creating a pool connected to '+this.getAddress());
this._log('info', 'Creating a pool connected to: ' + this.getAddress());
}

util.inherits(Pool, events.EventEmitter);
Expand Down Expand Up @@ -152,7 +152,8 @@ Pool.prototype.putConnection = function(connection) {
else if (self._extraConnections > 0) {
self._extraConnections--;
connection.close().error(function(error) {
self._log('Fail to properly close a connection. Error:'+JSON.stringify(error));
self._log('info', 'Failed to close a connection properly');
self._log.error(error);
});
clearTimeout(connection.timeout);
}
Expand All @@ -162,7 +163,8 @@ Pool.prototype.putConnection = function(connection) {
// Note that because we have available connections here, the pool master has no pending
// queries.
connection.close().error(function(error) {
self._log('Fail to properly close a connection. Error:'+JSON.stringify(error));
self._log('info', 'Failed to close a connection properly');
self._log.error(error);
});
clearTimeout(connection.timeout);
}
Expand Down Expand Up @@ -219,7 +221,7 @@ Pool.prototype.createConnection = function() {
}
// Need another flag
else if ((self._slowlyGrowing === true) && (self._slowGrowth === true) && (self._consecutiveFails > 0)) {
self._log('Exiting slow growth mode');
self._log('warn', 'Exiting slow growth mode');
self._consecutiveFails = 0;
self._slowGrowth = false;
self._slowlyGrowing = false;
Expand All @@ -231,7 +233,8 @@ Pool.prototype.createConnection = function() {
connection.on('error', function(error) {
// We are going to close connection, but we don't want another process to use it before
// So we remove it from the pool now (if it's inside)
self._log('Error emitted by a connection: '+JSON.stringify(error));
self._log('info', 'Error emitted by a connection');
self._log.error(error);
for(var i=0; i<self.getAvailableLength(); i++) {
if (self._pool.get(i) === this) {
self._pool.delete(i);
Expand Down Expand Up @@ -292,12 +295,13 @@ Pool.prototype.createConnection = function() {

self._slowGrowth = true;
if (self._slowlyGrowing === false) {
self._log('Entering slow growth mode');
self._log('warn', 'Entering slow growth mode');
}
self._slowlyGrowing = true;

// Log an error
self._log('Fail to create a new connection for the connection pool. Error:'+JSON.stringify(error));
self._log('debug', 'Failed to create a new connection');
self._log.error(error);

if (self._openingConnections === 0) {
self._consecutiveFails++;
Expand Down Expand Up @@ -371,7 +375,7 @@ Pool.prototype.drainLocalhost = function() {
Pool.prototype.drain = function() {
var self = this;
self._draining = true;
self._log('Draining the pool connected to '+this.getAddress());
self._log('debug', 'Draining the pool connected to: ' + this.getAddress());
self.emit('draining');
var p = new Promise(function(resolve, reject) {
var connection = self._pool.pop();
Expand Down
57 changes: 26 additions & 31 deletions lib/pool_master.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ function PoolMaster(r, options) {
self._options = options;
self._options.buffer = options.buffer || 50;
self._options.max = options.max || 1000;
self._log = helper.createLogger(self, options.silent || false);
if (typeof options.log == 'function') {
self.on('log', options.log);
}
self._draining = false;
self._numConnections = 0;
self._numAvailableConnections = 0;
Expand All @@ -39,6 +35,10 @@ function PoolMaster(r, options) {
self._timeoutError = options.timeoutError || 1000; // How long should we wait before recreating a connection that failed?
self._maxExponent = options.maxExponent || 6; // Maximum timeout is 2^maxExponent*timeoutError

var logger = helper.createLogger(self, options.logLevel || 'debug', options.silent ? null : options.log);
self._log = logger.log;
self._log.error = logger.error;

//TODO
//self._usingPool = true; // If we have used the pool
self._seed = 0;
Expand Down Expand Up @@ -168,7 +168,7 @@ PoolMaster.prototype.handleAllServersResponse = function(servers) {
var found = false;
for(var j=0; j<self._pools[UNKNOWN_POOLS].length; j++) {
if (found) break;
var pool = self._pools[UNKNOWN_POOLS][j];
var pool = self._pools[UNKNOWN_POOLS][j];
// If a pool is created with localhost, it will probably match the first server even though it may not the the one
// So it gets an id
for(var k=0; k<server.network.canonical_addresses.length; k++) {
Expand Down Expand Up @@ -211,13 +211,12 @@ PoolMaster.prototype.handleAllServersResponse = function(servers) {
for(var i=0;i<self._pools[UNKNOWN_POOLS].length; i++) {
// These pools does not match any server returned by RethinkDB.
var pool = self._pools[UNKNOWN_POOLS].splice(i, 1)[0];
self._log('Removing pool connected to: '+pool.getAddress())
self._log('info', 'Removing pool connected to: ' + pool.getAddress());
pool.drain().then(function() {
pool.removeAllListeners();
}).error(function(error) {
self._log('Pool connected to: '+self._pools[UNKNOWN_POOLS][i].getAddress()+' could not be properly drained.')
self._log(error.message);
self._log(error.stack);
self._log('debug', 'Pool failed to drain properly: ' + pool.getAddress());
self._log.error(error);
});
}
}
Expand Down Expand Up @@ -264,13 +263,12 @@ PoolMaster.prototype.createPool = function(server) {
PoolMaster.prototype.deletePool = function(key) {
var self = this;
var pool = self._pools[key];
self._log('Removing pool connected to: '+pool.getAddress())
self._log('info', 'Removing pool connected to: ' + pool.getAddress());
pool.drain().then(function() {
pool.removeAllListeners();
}).error(function(error) {
self._log('Pool connected to: '+self._pools[key].getAddress()+' could not be properly drained.')
self._log(error.message);
self._log(error.stack);
self._log('debug', 'Pool failed to drain properly: ' + pool.getAddress());
self._log.error(error);
});
delete self._pools[key];
self.resetBufferParameters();
Expand Down Expand Up @@ -307,9 +305,10 @@ PoolMaster.prototype.fetchServers = function(useSeeds) {
self._feed = feed;
var initializing = true;
var servers = [];
feed.each(function(err, change) {
if (err) {
self._log('The changefeed on server_status returned an error: '+err.toString());
feed.each(function(error, change) {
if (error) {
self._log('warn', 'The changefeed on `server_status` returned an error');
self._log.error(error);
// We have to refetch everything as the server that was serving the feed may
// have died.
if (!self._draining) {
Expand All @@ -329,7 +328,8 @@ PoolMaster.prototype.fetchServers = function(useSeeds) {
self._r.db('rethinkdb').table('server_status').run({cursor: false}).then(function(servers) {
self.handleAllServersResponse(servers);
}).error(function(error) {
self._log('Fail to retrieve a second copy of server_status');
self._log('debug', 'Failed to fetch another copy of `server_status`');
self._log.error(error);
//TODO Retry
});
}, 1000);
Expand Down Expand Up @@ -359,24 +359,21 @@ PoolMaster.prototype.fetchServers = function(useSeeds) {
found = true;

(function (pool) {
self._log('Removing pool connected to: '+pool.getAddress())
self._log('info', 'Removing pool connected to: ' + pool.getAddress());
var pool = self._pools[UNKNOWN_POOLS].splice(i, 1)[0];
pool.drain().then(function() {
pool.removeAllListeners();
}).error(function(error) {
if (self._options.silent !== true) {
self._log('Pool connected to: '+pool.getAddress()+' could not be properly drained.')
self._log(error.message);
self._log(error.stack);
}
self._log('debug', 'Pool failed to drain properly: ' + pool.getAddress());
self._log.error(error);
});
})(self._pools[UNKNOWN_POOLS][i]);
break;
}
}
}
if (found === false) {
self._log('A server was removed but no pool for this server exists...')
self._log('info', 'Removed server has no associated pool');
}
}
// We ignore this change since this it doesn't affect whether the server
Expand All @@ -385,8 +382,9 @@ PoolMaster.prototype.fetchServers = function(useSeeds) {
});
return null;
}).error(function(error) {
self._log('Could not retrieve the data from server_status: '+JSON.stringify(error));

self._log('warn', 'Failed to get data from `server_status` feed');
self._log.error(error);

var timeout;
if (self._consecutiveFails === -1) {
timeout = 0;
Expand Down Expand Up @@ -526,11 +524,8 @@ PoolMaster.prototype.drain = function() {
pools[i].removeAllListeners();
}
}).error(function(error) {
if (self._options.silent !== true) {
self._log('Failed to drain all the pools:');
self._log(error.message);
self._log(error.stack);
}
self._log('debug', 'Failed to drain all the pools');
self._log.error(error);
});
}

Expand Down