From 112b0945f7faa66c03facdc5361aac64d96dc97f Mon Sep 17 00:00:00 2001 From: Max Loginov Date: Sat, 19 Aug 2017 12:05:10 -0700 Subject: [PATCH] Global hook to determine if Workers are allowed to fetch new jobs from Redis for execution. Hook function can be set up with kue.createQueue (through the options object): let kuePaused = true; const kuePostponeJobsPickupForExecutionStrategy = function(jobType: string) { const pause = kuePaused || memoryUsageSummaryMB >= (MEMORY_AVAILABLE_MB - 60); console.log( 'Kue strategy to postpone new jobs pickup for execution, if memory is low.', 'Memory usage (MB)', memoryUsageSummaryMB, 'out of', MEMORY_AVAILABLE_MB, pause ? 'KUE PAUSED' : '' ); return pause ? 5000 : 0; }; const q = kue.createQueue({ ... postponeWorker: kuePostponeJobsPickupForExecutionStrategy, ... }); Hook function should return a number: 0 if fetching a job is allowed immediately (with zero delay), or a positive integer representing a timeout in milliseconds to postpone this Worker's attempts to fetch new jobs for execution by this timeout. After the timeout, Worker is going to make an attempt to fetch a new job again, and will request postponeWorker hook function again, to determine if this time it is allowed to pick up a job for execution, or should delay again. And so on and on. Possible use case for this is to globally prevent Kue from uncontrollably grabbing new jobs for execution (for all job types or for a certain job type), if JS node is hitting upper memory limit, or if it's too much CPU used. With this feature, adaptive dynamic balancers can be implemented for better control over stability and scalability of Kue-based applications between many nodes (there is a good usage for this feature for single-noded applications as well, to control if the application is staying within host's quotas). --- lib/queue/worker.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/queue/worker.js b/lib/queue/worker.js index 2213dfcf..c5f8fdfa 100644 --- a/lib/queue/worker.js +++ b/lib/queue/worker.js @@ -43,6 +43,8 @@ function Worker( queue, type ) { this.client = Worker.client || (Worker.client = redis.createClient()); this.running = true; this.job = null; + this.postpone = (queue && queue._options && queue._options.postponeWorker && typeof queue._options.postponeWorker === 'function') ? + queue._options.postponeWorker : function() { return 0 }; } /** @@ -268,6 +270,10 @@ Worker.prototype.getJob = function( fn ) { if( !self.running ) { return fn('Already Shutdown'); } + var postponeTimeoutMs = self.postpone(self.type); + if ( postponeTimeoutMs > 0 ) { + return setTimeout(fn, postponeTimeoutMs); + } // alloc a client for this job type var client = clients[ self.type ] || (clients[ self.type ] = redis.createClient()); // BLPOP indicates we have a new inactive job to process