Skip to content

Commit

Permalink
feature: add latency to delayed job metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
martinramirez7 committed Jul 25, 2023
1 parent 8d77c53 commit 320c6a6
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ end
| Type | Name | Description | Labels |
| --- | --- | --- | --- |
| Counter | `delayed_job_duration_seconds` | Total time spent in delayed jobs | `job_name` |
| Counter | `delayed_job_latency_seconds_total` | Total delayed jobs latency | `job_name` |
| Counter | `delayed_jobs_total` | Total number of delayed jobs executed | `job_name` |
| Gauge | `delayed_jobs_enqueued` | Number of enqueued delayed jobs | - |
| Gauge | `delayed_jobs_pending` | Number of pending delayed jobs | - |
Expand All @@ -543,6 +544,7 @@ end
| Summary | `delayed_job_attempts_summary` | Summary of the amount of attempts it takes delayed jobs to succeed | - |

All metrics have labels for `job_name` and `queue_name`.
`delayed_job_latency_seconds_total` is considering delayed job's [sleep_delay](https://github.com/collectiveidea/delayed_job#:~:text=If%20no%20jobs%20are%20found%2C%20the%20worker%20sleeps%20for%20the%20amount%20of%20time%20specified%20by%20the%20sleep%20delay%20option.%20Set%20Delayed%3A%3AWorker.sleep_delay%20%3D%2060%20for%20a%2060%20second%20sleep%20time.) parameter, so please be aware of this in case you are looking for high latency precision.

#### Hutch Message Processing Tracer

Expand Down
2 changes: 2 additions & 0 deletions lib/prometheus_exporter/instrumentation/delayed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def initialize(client: nil)
def call(job, max_attempts, enqueued_count, pending_count, *args, &block)
success = false
start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
latency = Time.current - job.run_at
attempts = job.attempts + 1 # Increment because we're adding the current attempt
result = block.call(job, *args)
success = true
Expand All @@ -44,6 +45,7 @@ def call(job, max_attempts, enqueued_count, pending_count, *args, &block)
queue_name: job.queue,
success: success,
duration: duration,
latency: latency,
attempts: attempts,
max_attempts: max_attempts,
enqueued: enqueued_count,
Expand Down
8 changes: 7 additions & 1 deletion lib/prometheus_exporter/server/delayed_job_collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class DelayedJobCollector < TypeCollector
def initialize
@delayed_jobs_total = nil
@delayed_job_duration_seconds = nil
@delayed_job_latency_seconds_total = nil
@delayed_jobs_total = nil
@delayed_failed_jobs_total = nil
@delayed_jobs_max_attempts_reached_total = nil
Expand All @@ -25,6 +26,7 @@ def collect(obj)

ensure_delayed_job_metrics
@delayed_job_duration_seconds.observe(obj["duration"], counter_labels)
@delayed_job_latency_seconds_total.observe(obj["latency"], counter_labels)
@delayed_jobs_total.observe(1, counter_labels)
@delayed_failed_jobs_total.observe(1, counter_labels) if !obj["success"]
@delayed_jobs_max_attempts_reached_total.observe(1, counter_labels) if obj["attempts"] >= obj["max_attempts"]
Expand All @@ -38,7 +40,7 @@ def collect(obj)

def metrics
if @delayed_jobs_total
[@delayed_job_duration_seconds, @delayed_jobs_total, @delayed_failed_jobs_total,
[@delayed_job_duration_seconds, @delayed_job_latency_seconds_total, @delayed_jobs_total, @delayed_failed_jobs_total,
@delayed_jobs_max_attempts_reached_total, @delayed_job_duration_seconds_summary, @delayed_job_attempts_summary,
@delayed_jobs_enqueued, @delayed_jobs_pending]
else
Expand All @@ -55,6 +57,10 @@ def ensure_delayed_job_metrics
PrometheusExporter::Metric::Counter.new(
"delayed_job_duration_seconds", "Total time spent in delayed jobs.")

@delayed_job_latency_seconds_total =
PrometheusExporter::Metric::Counter.new(
"delayed_job_latency_seconds_total", "Total delayed jobs latency.")

@delayed_jobs_total =
PrometheusExporter::Metric::Counter.new(
"delayed_jobs_total", "Total number of delayed jobs executed.")
Expand Down
22 changes: 17 additions & 5 deletions test/server/collector_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -739,19 +739,24 @@ def test_it_can_collect_delayed_job_metrics

instrument = PrometheusExporter::Instrumentation::DelayedJob.new(client: client)

current_time = Time.current
job = Minitest::Mock.new
job.expect(:handler, "job_class: Class")
job.expect(:queue, "my_queue")
job.expect(:attempts, 0)
job.expect(:run_at, current_time - 10.seconds)

instrument.call(job, 20, 10, 0, nil, "default") do
# nothing
Time.stub(:current, current_time) do
instrument.call(job, 20, 10, 0, nil, "default") do
# nothing
end
end

failed_job = Minitest::Mock.new
failed_job.expect(:handler, "job_class: Object")
failed_job.expect(:queue, "my_queue")
failed_job.expect(:attempts, 1)
failed_job.expect(:run_at, 30.seconds.ago)

begin
instrument.call(failed_job, 25, 10, 0, nil, "default") do
Expand All @@ -765,6 +770,7 @@ def test_it_can_collect_delayed_job_metrics
assert(result.include?("delayed_failed_jobs_total{queue_name=\"my_queue\",job_name=\"Object\"} 1"), "has failed job")
assert(result.include?("delayed_jobs_total{queue_name=\"my_queue\",job_name=\"Class\"} 1"), "has working job")
assert(result.include?("delayed_job_duration_seconds{queue_name=\"my_queue\",job_name=\"Class\"}"), "has duration")
assert(result.include?("delayed_job_latency_seconds_total{queue_name=\"my_queue\",job_name=\"Class\"}"), "has latency")
assert(result.include?("delayed_jobs_enqueued{queue_name=\"my_queue\"} 10"), "has enqueued count")
assert(result.include?("delayed_jobs_pending{queue_name=\"my_queue\"} 0"), "has pending count")
job.verify
Expand All @@ -777,19 +783,23 @@ def test_it_can_collect_delayed_job_metrics_with_custom_labels

instrument = PrometheusExporter::Instrumentation::DelayedJob.new(client: client)

current_time = Time.current
job = Minitest::Mock.new
job.expect(:handler, "job_class: Class")
job.expect(:queue, "my_queue")
job.expect(:attempts, 0)
job.expect(:run_at, current_time - 10.seconds)

instrument.call(job, 25, 10, 0, nil, "default") do
# nothing
Time.stub(:current, current_time) do
instrument.call(job, 25, 10, 0, nil, "default") do
# nothing
end
end

failed_job = Minitest::Mock.new
failed_job.expect(:handler, "job_class: Object")
failed_job.expect(:queue, "my_queue")
failed_job.expect(:attempts, 1)
failed_job.expect(:run_at, 30.seconds.ago)

begin
instrument.call(failed_job, 25, 10, 0, nil, "default") do
Expand All @@ -803,6 +813,7 @@ def test_it_can_collect_delayed_job_metrics_with_custom_labels
assert(result.include?('delayed_failed_jobs_total{queue_name="my_queue",service="service1",job_name="Object"} 1'), "has failed job")
assert(result.include?('delayed_jobs_total{queue_name="my_queue",service="service1",job_name="Class"} 1'), "has working job")
assert(result.include?('delayed_job_duration_seconds{queue_name="my_queue",service="service1",job_name="Class"}'), "has duration")
assert(result.include?('delayed_job_latency_seconds_total{queue_name="my_queue",service="service1",job_name="Class"} 10.0'), "has latency")
assert(result.include?('delayed_jobs_enqueued{queue_name="my_queue",service="service1"} 10'), "has enqueued count")
assert(result.include?('delayed_jobs_pending{queue_name="my_queue",service="service1"} 0'), "has pending count")
job.verify
Expand All @@ -820,6 +831,7 @@ def test_it_can_collect_delayed_job_metrics_in_histogram_mode
job.expect(:handler, "job_class: Class")
job.expect(:queue, "my_queue")
job.expect(:attempts, 0)
job.expect(:run_at, 10.seconds.ago)

instrument.call(job, 20, 10, 0, nil, "default") do
# nothing
Expand Down

0 comments on commit 320c6a6

Please sign in to comment.