Skip to content

Commit

Permalink
Fix rate limit metric distinction
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jan 29, 2025
1 parent a18e288 commit c28f6a3
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 183 deletions.
5 changes: 4 additions & 1 deletion quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ impl From<IngestFailure> for IngestServiceError {
IngestServiceError::Unavailable("no shards available".to_string())
}
IngestFailureReason::ShardRateLimited => {
IngestServiceError::RateLimited(RateLimitingCause::ShardRateLimiting)
IngestServiceError::RateLimited(RateLimitingCause::AttemptedShardsRateLimited)
}
IngestFailureReason::AllShardsRateLimited => {
IngestServiceError::RateLimited(RateLimitingCause::AllShardsRateLimited)
}
IngestFailureReason::WalFull => {
IngestServiceError::RateLimited(RateLimitingCause::WalFull)
Expand Down
8 changes: 6 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub(crate) struct IngestResultMetrics {
pub source_not_found: IntCounter,
pub internal: IntCounter,
pub no_shards_available: IntCounter,
pub shard_rate_limited: IntCounter,
pub attempted_shards_rate_limited: IntCounter,
pub all_shards_rate_limited: IntCounter,
pub wal_full: IntCounter,
pub timeout: IntCounter,
pub router_timeout: IntCounter,
Expand All @@ -58,7 +59,10 @@ impl Default for IngestResultMetrics {
source_not_found: ingest_result_total_vec.with_label_values(["source_not_found"]),
internal: ingest_result_total_vec.with_label_values(["internal"]),
no_shards_available: ingest_result_total_vec.with_label_values(["no_shards_available"]),
shard_rate_limited: ingest_result_total_vec.with_label_values(["shard_rate_limited"]),
attempted_shards_rate_limited: ingest_result_total_vec
.with_label_values(["attempted_shards_rate_limited"]),
all_shards_rate_limited: ingest_result_total_vec
.with_label_values(["all_shards_rate_limited"]),
wal_full: ingest_result_total_vec.with_label_values(["wal_full"]),
timeout: ingest_result_total_vec.with_label_values(["timeout"]),
router_timeout: ingest_result_total_vec.with_label_values(["router_timeout"]),
Expand Down
24 changes: 16 additions & 8 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ impl IngestRouter {

// Subrequests for which no shards are available to route the subrequests to.
let mut no_shards_available_subrequest_ids: Vec<SubrequestId> = Vec::new();
// Subrequests for which the shards are rate limited.
let mut rate_limited_subrequest_ids: Vec<SubrequestId> = Vec::new();
// Subrequests for which all the shards are rate limited.
let mut all_shards_rate_limited_subrequest_ids: Vec<SubrequestId> = Vec::new();

let mut per_leader_persist_subrequests: HashMap<&LeaderId, Vec<PersistSubrequest>> =
HashMap::new();
Expand All @@ -385,7 +385,7 @@ impl IngestRouter {
let next_open_shard = match next_open_shard_res_opt {
Some(Ok(next_open_shard)) => next_open_shard,
Some(Err(NextOpenShardError::RateLimited)) => {
rate_limited_subrequest_ids.push(subrequest.subrequest_id);
all_shards_rate_limited_subrequest_ids.push(subrequest.subrequest_id);
continue;
}
Some(Err(NextOpenShardError::NoShardsAvailable)) | None => {
Expand Down Expand Up @@ -450,8 +450,8 @@ impl IngestRouter {
for subrequest_id in no_shards_available_subrequest_ids {
workbench.record_no_shards_available(subrequest_id);
}
for subrequest_id in rate_limited_subrequest_ids {
workbench.record_rate_limited(subrequest_id);
for subrequest_id in all_shards_rate_limited_subrequest_ids {
workbench.record_all_shards_rate_limited(subrequest_id);
}
self.process_persist_results(workbench, persist_futures)
.await;
Expand Down Expand Up @@ -539,7 +539,10 @@ fn update_ingest_metrics(ingest_result: &IngestV2Result<IngestResponseV2>, num_s
ingest_results_metrics.no_shards_available.inc()
}
IngestFailureReason::ShardRateLimited => {
ingest_results_metrics.shard_rate_limited.inc()
ingest_results_metrics.attempted_shards_rate_limited.inc()
}
IngestFailureReason::AllShardsRateLimited => {
ingest_results_metrics.all_shards_rate_limited.inc();
}
IngestFailureReason::WalFull => ingest_results_metrics.wal_full.inc(),
IngestFailureReason::Timeout => ingest_results_metrics.timeout.inc(),
Expand Down Expand Up @@ -568,9 +571,14 @@ fn update_ingest_metrics(ingest_result: &IngestV2Result<IngestResponseV2>, num_s
.circuit_breaker
.inc_by(num_subrequests);
}
RateLimitingCause::ShardRateLimiting => {
RateLimitingCause::AttemptedShardsRateLimited => {
ingest_results_metrics
.attempted_shards_rate_limited
.inc_by(num_subrequests);
}
RateLimitingCause::AllShardsRateLimited => {
ingest_results_metrics
.shard_rate_limited
.all_shards_rate_limited
.inc_by(num_subrequests);
}
RateLimitingCause::Unknown => {
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ impl RoutingTableEntry {

#[derive(Debug, PartialEq, Eq)]
pub(super) enum NextOpenShardError {
/// no open shard
NoShardsAvailable,
/// all open shards are rate limited
RateLimited,
}

Expand Down
11 changes: 8 additions & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/workbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ impl IngestWorkbench {
self.record_failure(subrequest_id, SubworkbenchFailure::NoShardsAvailable);
}

pub fn record_rate_limited(&mut self, subrequest_id: SubrequestId) {
pub fn record_all_shards_rate_limited(&mut self, subrequest_id: SubrequestId) {
self.record_failure(
subrequest_id,
SubworkbenchFailure::RateLimited(RateLimitingCause::ShardRateLimiting),
SubworkbenchFailure::RateLimited(RateLimitingCause::AllShardsRateLimited),
);
}

Expand Down Expand Up @@ -359,7 +359,12 @@ impl SubworkbenchFailure {
RateLimitingCause::LoadShedding => IngestFailureReason::RouterLoadShedding,
RateLimitingCause::WalFull => IngestFailureReason::WalFull,
RateLimitingCause::CircuitBreaker => IngestFailureReason::CircuitBreaker,
RateLimitingCause::ShardRateLimiting => IngestFailureReason::ShardRateLimited,
RateLimitingCause::AttemptedShardsRateLimited => {
IngestFailureReason::ShardRateLimited
}
RateLimitingCause::AllShardsRateLimited => {
IngestFailureReason::AllShardsRateLimited
}
RateLimitingCause::Unknown => IngestFailureReason::Unspecified,
},
Self::Persist(persist_failure_reason) => (*persist_failure_reason).into(),
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/router.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ enum IngestFailureReason {
INGEST_FAILURE_REASON_SOURCE_NOT_FOUND = 2;
INGEST_FAILURE_REASON_INTERNAL = 3;
INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE = 4;
// the shards we tried to write to are rate limited
INGEST_FAILURE_REASON_SHARD_RATE_LIMITED = 5;
INGEST_FAILURE_REASON_WAL_FULL = 6;
INGEST_FAILURE_REASON_TIMEOUT = 7;
INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING = 8;
INGEST_FAILURE_REASON_LOAD_SHEDDING = 9;
INGEST_FAILURE_REASON_CIRCUIT_BREAKER = 10;
// all the known open shards are rate limited
INGEST_FAILURE_REASON_ALL_SHARDS_RATE_LIMITED = 11;
}

message IngestFailure {
Expand Down
Loading

0 comments on commit c28f6a3

Please sign in to comment.