diff --git a/lib/connection.js b/lib/connection.js index d7c56682..14ecde28 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -6,7 +6,8 @@ var connection = function(options){ options[i] = defaults[i]; } } - self.options = options; + self.options = options; + self.connected = false; } connection.prototype.defaults = function(){ @@ -20,10 +21,35 @@ connection.prototype.defaults = function(){ } } +connection.prototype.ensureConnected = function(parentCallback, callack){ + var self = this; + if(self.connected === false){ + var err = new Error('not connected to redis'); + if(typeof parentCallback === 'function'){ + parentCallback( new Error('not connected to redis') ); + }else{ + throw err; + } + }else{ + callack(); + } +} + connection.prototype.connect = function(callback){ var self = this; var options = self.options; self.redis = options.redis || options.package.createClient(options.port, options.host, options.options); + + self.redis.on('error', function(err){ + // catch to prevent bubble up of error + }); + self.redis.on('connect', function(){ + self.connected = true; + }); + self.redis.on('end', function(){ + self.connected = false; + }); + if(options.password != null && options.password != "" && self.options.fake != true){ self.redis.auth(options.password, function(err){ self.redis.select(options.database, function(err){ diff --git a/lib/queue.js b/lib/queue.js index 3ebcae58..358fccf4 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -16,7 +16,7 @@ var queue = function(options, jobs, callback){ self.runPlugins = pluginRunner.runPlugins self.connection = new connection(options.connection); - self.connection.connect(function(){ + self.connection.connect(function(err){ if(typeof callback == 'function'){ callback(); } }); } @@ -44,10 +44,12 @@ queue.prototype.enqueue = function(q, func, args, callback){ if(toRun == false){ if(typeof callback == "function"){ callback(err, toRun); } }else{ - self.connection.redis.sadd(self.connection.key('queues'), q, function(){ - self.connection.redis.rpush(self.connection.key('queue', q), self.encode(q, func, args), function(){ - self.runPlugins('after_enqueue', func, q, job, args, function(){ - if(typeof callback == "function"){ callback(err, toRun); } + self.connection.ensureConnected(callback, function(){ + self.connection.redis.sadd(self.connection.key('queues'), q, function(){ + self.connection.redis.rpush(self.connection.key('queue', q), self.encode(q, func, args), function(){ + self.runPlugins('after_enqueue', func, q, job, args, function(){ + if(typeof callback == "function"){ callback(err, toRun); } + }); }); }); }); @@ -58,12 +60,14 @@ queue.prototype.enqueue = function(q, func, args, callback){ queue.prototype.enqueueAt = function(timestamp, q, func, args, callback){ // Don't run plugins here, they should be run by scheduler at the enqueue step var self = this; - var item = self.encode(q, func, args); - var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms - self.connection.redis.rpush(self.connection.key("delayed:" + rTimestamp), item, function(){ - self.connection.redis.sadd(self.connection.key("timestamps:" + item), self.connection.key("delayed:" + rTimestamp), function(){ - self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){ - if(typeof callback == "function"){ callback(); } + self.connection.ensureConnected(callback, function(){ + var item = self.encode(q, func, args); + var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms + self.connection.redis.rpush(self.connection.key("delayed:" + rTimestamp), item, function(){ + self.connection.redis.sadd(self.connection.key("timestamps:" + item), self.connection.key("delayed:" + rTimestamp), function(){ + self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){ + if(typeof callback == "function"){ callback(); } + }); }); }); }); @@ -79,15 +83,19 @@ queue.prototype.enqueueIn = function(time, q, func, args, callback){ queue.prototype.queues = function(callback){ var self = this; - self.connection.redis.smembers(self.connection.key('queues'), function(err, queues){ - callback(err, queues); + self.connection.ensureConnected(callback, function(){ + self.connection.redis.smembers(self.connection.key('queues'), function(err, queues){ + callback(err, queues); + }); }); } queue.prototype.length = function(q, callback){ var self = this; - self.connection.redis.llen(self.connection.key('queue', q), function(err, length){ - callback(err, length); + self.connection.ensureConnected(callback, function(){ + self.connection.redis.llen(self.connection.key('queue', q), function(err, length){ + callback(err, length); + }); }); } @@ -97,46 +105,52 @@ queue.prototype.del = function(q, func, args, count, callback){ callback = count; count = 0; // remove first enqueued items that match } - self.connection.redis.lrem(self.connection.key('queue', q), count, self.encode(q, func, args), function(err, count){ - if(typeof callback == "function"){ callback(err, count); } + self.connection.ensureConnected(callback, function(){ + self.connection.redis.lrem(self.connection.key('queue', q), count, self.encode(q, func, args), function(err, count){ + if(typeof callback == "function"){ callback(err, count); } + }); }); } queue.prototype.delDelayed = function(q, func, args, callback){ var self = this; var search = self.encode(q, func, args); - var timestamps = self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){ - if(members.length == 0 ){ if(typeof callback == "function"){ callback(err, []); } } - else{ - var started = 0; - var timestamps = []; - members.forEach(function(key){ - started++; - self.connection.redis.lrem(key, 0, search, function(){ - self.connection.redis.srem(self.connection.key("timestamps:" + search), key, function(){ - timestamps.push(key.split(":")[key.split(":").length - 1]); - started--; - if(started == 0){ - if(typeof callback == "function"){ callback(err, timestamps); } - } + self.connection.ensureConnected(callback, function(){ + var timestamps = self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){ + if(members.length == 0 ){ if(typeof callback == "function"){ callback(err, []); } } + else{ + var started = 0; + var timestamps = []; + members.forEach(function(key){ + started++; + self.connection.redis.lrem(key, 0, search, function(){ + self.connection.redis.srem(self.connection.key("timestamps:" + search), key, function(){ + timestamps.push(key.split(":")[key.split(":").length - 1]); + started--; + if(started == 0){ + if(typeof callback == "function"){ callback(err, timestamps); } + } + }) }) - }) - }); - } - }) + }); + } + }); + }); } queue.prototype.scheduledAt = function(q, func, args, callback){ var self = this; var search = self.encode(q, func, args); - self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){ - var timestamps = []; - if(members != null){ - members.forEach(function(key){ - timestamps.push(key.split(":")[key.split(":").length - 1]); - }) - } - if(typeof callback == "function"){ callback(err, timestamps); } + self.connection.ensureConnected(callback, function(){ + self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){ + var timestamps = []; + if(members != null){ + members.forEach(function(key){ + timestamps.push(key.split(":")[key.split(":").length - 1]); + }) + } + if(typeof callback == "function"){ callback(err, timestamps); } + }); }); } diff --git a/lib/scheduler.js b/lib/scheduler.js index 7dc8fb53..054951b1 100644 --- a/lib/scheduler.js +++ b/lib/scheduler.js @@ -21,7 +21,6 @@ var scheduler = function(options, jobs, callback){ self.connection = new connection(options.connection); self.running = false; self.connection.connect(function(err){ - if(err != null){ self.emit('error', err); } self.queue = new queue({connection: options.connection}, jobs, function(){ if(typeof callback == 'function'){ callback(); } }); diff --git a/package.json b/package.json index cea6c181..19cabf39 100755 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "author": "Evan Tahler ", "name": "node-resque", "description": "an opinionated implementation of resque in node", - "version": "0.3.7", + "version": "0.4.0", "homepage": "http://github.com/taskrabbit/node-resque", "repository": { "type": "git",