Skip to content

Commit

Permalink
ref(project-cache): Instrument task duration (#3117)
Browse files Browse the repository at this point in the history
We have observability into how long the project cache service takes to
process each message, but not each task. In other words, we know how
long it takes to process _one_ branch of the `tokio::select` but not the
rest. This PR adds observability there.
  • Loading branch information
iker-barriocanal authored Feb 15, 2024
1 parent ac1ff14 commit 37d2bbd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
44 changes: 33 additions & 11 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,20 +1113,42 @@ impl Service for ProjectCacheService {
biased;

Ok(()) = subscription.changed() => {
match subscription.borrow().clone() {
global_config::Status::Ready(_) => broker.set_global_config_ready(),
// The watch should only be updated if it gets a new value.
// This would imply a logical bug.
global_config::Status::Pending => relay_log::error!("still waiting for the global config"),
}
metric!(timer(RelayTimers::EventProcessingDeserialize), task = "update_global_config", {
match subscription.borrow().clone() {
global_config::Status::Ready(_) => broker.set_global_config_ready(),
// The watch should only be updated if it gets a new value.
// This would imply a logical bug.
global_config::Status::Pending => relay_log::error!("still waiting for the global config"),
}
})
},
Some(message) = state_rx.recv() => broker.merge_state(message),
Some(message) = state_rx.recv() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "merge_state", {
broker.merge_state(message)
})
}
// Buffer will not dequeue the envelopes from the spool if there is not enough
// permits in `BufferGuard` available. Currently this is 50%.
Some(UnspooledEnvelope{managed_envelope, key}) = buffer_rx.recv() => broker.handle_processing(key, managed_envelope),
_ = ticker.tick() => broker.evict_stale_project_caches(),
() = &mut broker.buffer_unspool_handle => broker.handle_periodic_unspool(),
Some(message) = rx.recv() => broker.handle_message(message),
Some(UnspooledEnvelope{managed_envelope, key}) = buffer_rx.recv() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_processing", {
broker.handle_processing(key, managed_envelope)
})
},
_ = ticker.tick() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "evict_project_caches", {
broker.evict_stale_project_caches()
})
}
() = &mut broker.buffer_unspool_handle => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "periodic_unspool", {
broker.handle_periodic_unspool()
})
}
Some(message) = rx.recv() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_message", {
broker.handle_message(message)
})
}
else => break,
}
}
Expand Down
9 changes: 9 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,14 @@ pub enum RelayTimers {
///
/// - `message`: The type of message that was processed.
BufferMessageProcessDuration,
/// Timing in milliseconds for processing a task in the project cache service.
///
/// A task is a unit of work the service does. Each branch of the
/// `tokio::select` is a different task type.
///
/// This metric is tagged with:
/// - `task`: The type of the task the processor does.
ProjectCacheTaskDuration,
}

impl TimerMetric for RelayTimers {
Expand Down Expand Up @@ -396,6 +404,7 @@ impl TimerMetric for RelayTimers {
RelayTimers::ProcessMessageDuration => "processor.message.duration",
RelayTimers::ProjectCacheMessageDuration => "project_cache.message.duration",
RelayTimers::BufferMessageProcessDuration => "buffer.message.duration",
RelayTimers::ProjectCacheTaskDuration => "project_cache.task.duration",
}
}
}
Expand Down

0 comments on commit 37d2bbd

Please sign in to comment.