From 338aacceb62c99dc0470c9436afc2ae1d6f8bc95 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 9 Jan 2013 08:22:43 -0500 Subject: [PATCH 1/7] Adding not_queues option The not_queues option was added to the command via 0e3371e in DJ Master not_queues allows instructs the worker to NOT work on specified queues --- lib/delayed/backend/active_record.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index 64f36953..86cd20c9 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -60,6 +60,7 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority scope = scope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any? + scope = scope.scoped(:conditions => ["queues NOT IN (?)", Worker.not_queues]) if Worker.not_queues.any? ::ActiveRecord::Base.silence do scope.by_priority.all(:limit => limit) From ed81a3c6fdb5380b64c8a69818eb272007969ee2 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 9 Jan 2013 16:52:33 -0500 Subject: [PATCH 2/7] Fixing typo --- lib/delayed/backend/active_record.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index 86cd20c9..b3b8cd78 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -60,7 +60,7 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority scope = scope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any? - scope = scope.scoped(:conditions => ["queues NOT IN (?)", Worker.not_queues]) if Worker.not_queues.any? + scope = scope.scoped(:conditions => ["queue NOT IN (?)", Worker.not_queues]) if Worker.not_queues.any? ::ActiveRecord::Base.silence do scope.by_priority.all(:limit => limit) From 9b4af3d8743c3ba04317e08fb0a9e17f47498a7f Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Sun, 6 Oct 2013 17:24:45 -0400 Subject: [PATCH 3/7] Adding check for deadlock --- lib/delayed/backend/active_record.rb | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index b3b8cd78..f6608ff5 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -73,7 +73,15 @@ def lock_exclusively!(max_run_time, worker) now = self.class.db_time_now affected_rows = if locked_by != worker # We don't own this job so we will update the locked_by name and the locked_at - self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) + begin + self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) + rescue ActiveRecord::StatementInvalid => e + if e.message =~ /Deadlock found when trying to get lock/ + 0 + else + raise e + end + end else # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at From 803735a9f3623fda783ed1209fec424cf7d6964f Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Sun, 6 Oct 2013 20:36:41 -0400 Subject: [PATCH 4/7] Making generic AR catch --- lib/delayed/backend/active_record.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index f6608ff5..1f10e660 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -75,7 +75,7 @@ def lock_exclusively!(max_run_time, worker) # We don't own this job so we will update the locked_by name and the locked_at begin self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) - rescue ActiveRecord::StatementInvalid => e + rescue Exception => e if e.message =~ /Deadlock found when trying to get lock/ 0 else From 62cec7de604b1caab7ea5b377343930cb396b633 Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Tue, 19 Nov 2013 15:20:38 -0500 Subject: [PATCH 5/7] Shuffleing the records as they are returned to reduce contention for the first job --- lib/delayed/backend/active_record.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index 1f10e660..ef39d5c0 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -63,7 +63,7 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti scope = scope.scoped(:conditions => ["queue NOT IN (?)", Worker.not_queues]) if Worker.not_queues.any? ::ActiveRecord::Base.silence do - scope.by_priority.all(:limit => limit) + scope.by_priority.all(:limit => limit).shuffle end end From f25dc7b2fb00a4300e319b9665d132e476f31969 Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Wed, 8 Jan 2014 08:38:25 -0500 Subject: [PATCH 6/7] removing shuffle option --- lib/delayed/backend/active_record.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index ef39d5c0..1f10e660 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -63,7 +63,7 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti scope = scope.scoped(:conditions => ["queue NOT IN (?)", Worker.not_queues]) if Worker.not_queues.any? ::ActiveRecord::Base.silence do - scope.by_priority.all(:limit => limit).shuffle + scope.by_priority.all(:limit => limit) end end From 3e5f5f5439139f833d22833538ab760dc8c6449a Mon Sep 17 00:00:00 2001 From: Brian Malinconico Date: Sat, 1 Mar 2014 09:15:55 -0500 Subject: [PATCH 7/7] Revert "removing shuffle option" This reverts commit f25dc7b2fb00a4300e319b9665d132e476f31969. --- lib/delayed/backend/active_record.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index 1f10e660..ef39d5c0 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -63,7 +63,7 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti scope = scope.scoped(:conditions => ["queue NOT IN (?)", Worker.not_queues]) if Worker.not_queues.any? ::ActiveRecord::Base.silence do - scope.by_priority.all(:limit => limit) + scope.by_priority.all(:limit => limit).shuffle end end