Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing race condition between mkdir and event listener registration in pidfiles and logger plugins. #169

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 80 additions & 81 deletions lib/plugins/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

var fs = require('fs')
, Log = require('log')
, mkdir = require('mkdirp').mkdirp;
, mkdir = require('../utils').mkdirPSync;

/**
* Enable stdout / stderr logs for both the master
Expand Down Expand Up @@ -47,104 +47,103 @@ module.exports = function(dir, level){
return function(master){
dir = master.resolve(dir || 'logs');

mkdir(dir, 0755, function(err){
if (err) throw err;
// master log
var stream = fs.createWriteStream(dir + '/master.log', { flags: 'a' });
var log = master.log = new Log(level || Log.INFO, stream);
mkdir(dir, 0755);

// master events
master.on('start', function(){
log.info('master started');
});
// master log
var stream = fs.createWriteStream(dir + '/master.log', { flags: 'a' });
var log = master.log = new Log(level || Log.INFO, stream);

// master is shutting down
master.on('closing', function(){
log.warning('shutting down master');
});
// master events
master.on('start', function(){
log.info('master started');
});

// master has closed and performed cleanup
master.on('close', function(){
log.info('shutdown complete');
});
// master is shutting down
master.on('closing', function(){
log.warning('shutting down master');
});

// sending signal to all workers
master.on('kill', function(sig){
log.warning('sent kill(%s) to all workers', sig);
});
// master has closed and performed cleanup
master.on('close', function(){
log.info('shutdown complete');
});

// worker was killed
master.on('worker killed', function(worker){
if ('restarting' == master.state) return;
log.error('worker %s died', worker.id);
});
// sending signal to all workers
master.on('kill', function(sig){
log.warning('sent kill(%s) to all workers', sig);
});

// worker exception
master.on('worker exception', function(worker, err){
log.error('worker %s uncaught exception %s', worker.id, err.message);
});
// worker was killed
master.on('worker killed', function(worker){
if ('restarting' == master.state) return;
log.error('worker %s died', worker.id);
});

// worker is waiting on connections to be closed
master.on('worker waiting', function(worker, connections){
log.info('worker %s waiting on %s connections', worker.id, connections);
});
// worker exception
master.on('worker exception', function(worker, err){
log.error('worker %s uncaught exception %s', worker.id, err.message);
});

// worker has timed out
master.on('worker timeout', function(worker, timeout){
log.warning('worker %s timed out after %sms', worker.id, timeout);
});
// worker is waiting on connections to be closed
master.on('worker waiting', function(worker, connections){
log.info('worker %s waiting on %s connections', worker.id, connections);
});

// worker connected to master
master.on('worker connected', function(worker){
log.debug('worker %s connected', worker.id);
});
// worker has timed out
master.on('worker timeout', function(worker, timeout){
log.warning('worker %s timed out after %sms', worker.id, timeout);
});

// cyclic or immediate restart
master.on('cyclic restart', function(){
log.warning('cyclic restart detected, restarting in %sms'
, master.options['restart timeout']);
});
// worker connected to master
master.on('worker connected', function(worker){
log.debug('worker %s connected', worker.id);
});

// restart requested
master.on('restarting', function(){
log.info('restart requested');
});
// cyclic or immediate restart
master.on('cyclic restart', function(){
log.warning('cyclic restart detected, restarting in %sms'
, master.options['restart timeout']);
});

// restart complete
master.on('restart', function(){
log.info('restart complete');
});
// restart requested
master.on('restarting', function(){
log.info('restart requested');
});

// restart complete
master.on('restart', function(){
log.info('restart complete');
});

// repl socket connection established
master.on('repl socket', function(sock){
var from = sock.remoteAddress
? 'from ' + sock.remoteAddress
: '';
sock.on('connect', function(){
log.info('repl connection %s', from);
});
sock.on('close', function(){
log.info('repl disconnect %s', from);
});
// repl socket connection established
master.on('repl socket', function(sock){
var from = sock.remoteAddress
? 'from ' + sock.remoteAddress
: '';
sock.on('connect', function(){
log.info('repl connection %s', from);
});
sock.on('close', function(){
log.info('repl disconnect %s', from);
});
});

// override fds
master.customFds = [-1, -1];
// override fds
master.customFds = [-1, -1];

// children
master.on('worker', function(worker){
var proc = worker.proc;
// children
master.on('worker', function(worker){
var proc = worker.proc;

log.info('spawned worker ' + worker.id);
log.info('spawned worker ' + worker.id);

// worker log streams
var access = fs.createWriteStream(dir + '/workers.access.log', { flags: 'a' })
, error = fs.createWriteStream(dir + '/workers.error.log', { flags: 'a' });
// worker log streams
var access = fs.createWriteStream(dir + '/workers.access.log', { flags: 'a' })
, error = fs.createWriteStream(dir + '/workers.error.log', { flags: 'a' });

// redirect stdout / stderr
proc.stdout.pipe(access);
proc.stderr.pipe(error);
});
// redirect stdout / stderr
proc.stdout.pipe(access);
proc.stderr.pipe(error);
});
}
};
};
32 changes: 15 additions & 17 deletions lib/plugins/pidfiles.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/

var fs = require('fs')
, mkdir = require('mkdirp').mkdirp;
, mkdir = require('../utils').mkdirPSync;

/**
* Save pidfiles to the given `dir` or `./pids`.
Expand Down Expand Up @@ -59,25 +59,23 @@ module.exports = function(dir){
});
};

mkdir(dir, 0755, function(err){
if (err) throw err;
mkdir(dir, 0755);

// save worker pids
master.on('worker', function(worker){
var path = dir + '/worker.' + worker.id + '.pid';
fs.writeFile(path, worker.proc.pid.toString(), 'ascii', function(err){
if (err) throw err;
master.emit('worker pidfile');
});
// save worker pids
master.on('worker', function(worker){
var path = dir + '/worker.' + worker.id + '.pid';
fs.writeFile(path, worker.proc.pid.toString(), 'ascii', function(err){
if (err) throw err;
master.emit('worker pidfile');
});
});

master.on('listening', function(){
// save master pid
fs.writeFile(dir + '/master.pid', process.pid.toString(), 'ascii', function(err){
if (err) throw err;
master.emit('pidfile');
});
master.on('listening', function(){
// save master pid
fs.writeFile(dir + '/master.pid', process.pid.toString(), 'ascii', function(err){
if (err) throw err;
master.emit('pidfile');
});
});
}
};
};
27 changes: 26 additions & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
* MIT Licensed
*/


var path = require('path')
, fs = require('fs');

/**
* Frame the given `obj`.
*
Expand Down Expand Up @@ -95,4 +99,25 @@ exports.unshiftListener = function(obj, event, fn){
} else {
obj._events[event] = [fn, obj._events[event]];
}
};
};


/**
* `mkdir -p`, synchronously
*
* @param {String} dir
* @param {String} mode
* @api private
*/

exports.mkdirPSync = function(dir, mode) {
var buildingPath = [],
components = path.normalize(dir).split('/');

components.slice(1, components.length).forEach(function(component) {
buildingPath = buildingPath.concat(component);
var toCreate = '/' + buildingPath.join('/');
if (!path.existsSync(toCreate))
fs.mkdirSync('/' + buildingPath.join('/'), mode);
});
};