Skip to content

Commit

Permalink
saftey with dropping redis connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Tahler committed Mar 20, 2014
1 parent 3c6cf72 commit 207171b
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 46 deletions.
28 changes: 27 additions & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ var connection = function(options){
options[i] = defaults[i];
}
}
self.options = options;
self.options = options;
self.connected = false;
}

connection.prototype.defaults = function(){
Expand All @@ -20,10 +21,35 @@ connection.prototype.defaults = function(){
}
}

connection.prototype.ensureConnected = function(parentCallback, callack){
var self = this;
if(self.connected === false){
var err = new Error('not connected to redis');
if(typeof parentCallback === 'function'){
parentCallback( new Error('not connected to redis') );
}else{
throw err;
}
}else{
callack();
}
}

connection.prototype.connect = function(callback){
var self = this;
var options = self.options;
self.redis = options.redis || options.package.createClient(options.port, options.host, options.options);

self.redis.on('error', function(err){
// catch to prevent bubble up of error
});
self.redis.on('connect', function(){
self.connected = true;
});
self.redis.on('end', function(){
self.connected = false;
});

if(options.password != null && options.password != "" && self.options.fake != true){
self.redis.auth(options.password, function(err){
self.redis.select(options.database, function(err){
Expand Down
100 changes: 57 additions & 43 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var queue = function(options, jobs, callback){
self.runPlugins = pluginRunner.runPlugins

self.connection = new connection(options.connection);
self.connection.connect(function(){
self.connection.connect(function(err){
if(typeof callback == 'function'){ callback(); }
});
}
Expand Down Expand Up @@ -44,10 +44,12 @@ queue.prototype.enqueue = function(q, func, args, callback){
if(toRun == false){
if(typeof callback == "function"){ callback(err, toRun); }
}else{
self.connection.redis.sadd(self.connection.key('queues'), q, function(){
self.connection.redis.rpush(self.connection.key('queue', q), self.encode(q, func, args), function(){
self.runPlugins('after_enqueue', func, q, job, args, function(){
if(typeof callback == "function"){ callback(err, toRun); }
self.connection.ensureConnected(callback, function(){
self.connection.redis.sadd(self.connection.key('queues'), q, function(){
self.connection.redis.rpush(self.connection.key('queue', q), self.encode(q, func, args), function(){
self.runPlugins('after_enqueue', func, q, job, args, function(){
if(typeof callback == "function"){ callback(err, toRun); }
});
});
});
});
Expand All @@ -58,12 +60,14 @@ queue.prototype.enqueue = function(q, func, args, callback){
queue.prototype.enqueueAt = function(timestamp, q, func, args, callback){
// Don't run plugins here, they should be run by scheduler at the enqueue step
var self = this;
var item = self.encode(q, func, args);
var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms
self.connection.redis.rpush(self.connection.key("delayed:" + rTimestamp), item, function(){
self.connection.redis.sadd(self.connection.key("timestamps:" + item), self.connection.key("delayed:" + rTimestamp), function(){
self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){
if(typeof callback == "function"){ callback(); }
self.connection.ensureConnected(callback, function(){
var item = self.encode(q, func, args);
var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms
self.connection.redis.rpush(self.connection.key("delayed:" + rTimestamp), item, function(){
self.connection.redis.sadd(self.connection.key("timestamps:" + item), self.connection.key("delayed:" + rTimestamp), function(){
self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){
if(typeof callback == "function"){ callback(); }
});
});
});
});
Expand All @@ -79,15 +83,19 @@ queue.prototype.enqueueIn = function(time, q, func, args, callback){

queue.prototype.queues = function(callback){
var self = this;
self.connection.redis.smembers(self.connection.key('queues'), function(err, queues){
callback(err, queues);
self.connection.ensureConnected(callback, function(){
self.connection.redis.smembers(self.connection.key('queues'), function(err, queues){
callback(err, queues);
});
});
}

queue.prototype.length = function(q, callback){
var self = this;
self.connection.redis.llen(self.connection.key('queue', q), function(err, length){
callback(err, length);
self.connection.ensureConnected(callback, function(){
self.connection.redis.llen(self.connection.key('queue', q), function(err, length){
callback(err, length);
});
});
}

Expand All @@ -97,46 +105,52 @@ queue.prototype.del = function(q, func, args, count, callback){
callback = count;
count = 0; // remove first enqueued items that match
}
self.connection.redis.lrem(self.connection.key('queue', q), count, self.encode(q, func, args), function(err, count){
if(typeof callback == "function"){ callback(err, count); }
self.connection.ensureConnected(callback, function(){
self.connection.redis.lrem(self.connection.key('queue', q), count, self.encode(q, func, args), function(err, count){
if(typeof callback == "function"){ callback(err, count); }
});
});
}

queue.prototype.delDelayed = function(q, func, args, callback){
var self = this;
var search = self.encode(q, func, args);
var timestamps = self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){
if(members.length == 0 ){ if(typeof callback == "function"){ callback(err, []); } }
else{
var started = 0;
var timestamps = [];
members.forEach(function(key){
started++;
self.connection.redis.lrem(key, 0, search, function(){
self.connection.redis.srem(self.connection.key("timestamps:" + search), key, function(){
timestamps.push(key.split(":")[key.split(":").length - 1]);
started--;
if(started == 0){
if(typeof callback == "function"){ callback(err, timestamps); }
}
self.connection.ensureConnected(callback, function(){
var timestamps = self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){
if(members.length == 0 ){ if(typeof callback == "function"){ callback(err, []); } }
else{
var started = 0;
var timestamps = [];
members.forEach(function(key){
started++;
self.connection.redis.lrem(key, 0, search, function(){
self.connection.redis.srem(self.connection.key("timestamps:" + search), key, function(){
timestamps.push(key.split(":")[key.split(":").length - 1]);
started--;
if(started == 0){
if(typeof callback == "function"){ callback(err, timestamps); }
}
})
})
})
});
}
})
});
}
});
});
}

queue.prototype.scheduledAt = function(q, func, args, callback){
var self = this;
var search = self.encode(q, func, args);
self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){
var timestamps = [];
if(members != null){
members.forEach(function(key){
timestamps.push(key.split(":")[key.split(":").length - 1]);
})
}
if(typeof callback == "function"){ callback(err, timestamps); }
self.connection.ensureConnected(callback, function(){
self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){
var timestamps = [];
if(members != null){
members.forEach(function(key){
timestamps.push(key.split(":")[key.split(":").length - 1]);
})
}
if(typeof callback == "function"){ callback(err, timestamps); }
});
});
}

Expand Down
1 change: 0 additions & 1 deletion lib/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ var scheduler = function(options, jobs, callback){
self.connection = new connection(options.connection);
self.running = false;
self.connection.connect(function(err){
if(err != null){ self.emit('error', err); }
self.queue = new queue({connection: options.connection}, jobs, function(){
if(typeof callback == 'function'){ callback(); }
});
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"author": "Evan Tahler <[email protected]>",
"name": "node-resque",
"description": "an opinionated implementation of resque in node",
"version": "0.3.7",
"version": "0.4.0",
"homepage": "http://github.com/taskrabbit/node-resque",
"repository": {
"type": "git",
Expand Down

0 comments on commit 207171b

Please sign in to comment.