From c28f6a3c35c53f88cea04b9ca050acbd7eb76780 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 29 Jan 2025 15:55:40 +0100 Subject: [PATCH] Fix rate limit metric distinction --- quickwit/quickwit-ingest/src/error.rs | 5 +- .../quickwit-ingest/src/ingest_v2/metrics.rs | 8 +- .../quickwit-ingest/src/ingest_v2/router.rs | 24 +- .../src/ingest_v2/routing_table.rs | 2 + .../src/ingest_v2/workbench.rs | 11 +- .../protos/quickwit/router.proto | 3 + .../quickwit/quickwit.ingest.router.rs | 286 ++++++++---------- quickwit/quickwit-proto/src/ingest/mod.rs | 6 +- 8 files changed, 162 insertions(+), 183 deletions(-) diff --git a/quickwit/quickwit-ingest/src/error.rs b/quickwit/quickwit-ingest/src/error.rs index 4a60bdfceaa..7ee65ec2467 100644 --- a/quickwit/quickwit-ingest/src/error.rs +++ b/quickwit/quickwit-ingest/src/error.rs @@ -114,7 +114,10 @@ impl From for IngestServiceError { IngestServiceError::Unavailable("no shards available".to_string()) } IngestFailureReason::ShardRateLimited => { - IngestServiceError::RateLimited(RateLimitingCause::ShardRateLimiting) + IngestServiceError::RateLimited(RateLimitingCause::AttemptedShardsRateLimited) + } + IngestFailureReason::AllShardsRateLimited => { + IngestServiceError::RateLimited(RateLimitingCause::AllShardsRateLimited) } IngestFailureReason::WalFull => { IngestServiceError::RateLimited(RateLimitingCause::WalFull) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index d86cff7c8c4..01b4adf5a88 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -31,7 +31,8 @@ pub(crate) struct IngestResultMetrics { pub source_not_found: IntCounter, pub internal: IntCounter, pub no_shards_available: IntCounter, - pub shard_rate_limited: IntCounter, + pub attempted_shards_rate_limited: IntCounter, + pub all_shards_rate_limited: IntCounter, pub wal_full: IntCounter, pub timeout: IntCounter, pub router_timeout: IntCounter, @@ -58,7 +59,10 @@ impl Default for IngestResultMetrics { source_not_found: ingest_result_total_vec.with_label_values(["source_not_found"]), internal: ingest_result_total_vec.with_label_values(["internal"]), no_shards_available: ingest_result_total_vec.with_label_values(["no_shards_available"]), - shard_rate_limited: ingest_result_total_vec.with_label_values(["shard_rate_limited"]), + attempted_shards_rate_limited: ingest_result_total_vec + .with_label_values(["attempted_shards_rate_limited"]), + all_shards_rate_limited: ingest_result_total_vec + .with_label_values(["all_shards_rate_limited"]), wal_full: ingest_result_total_vec.with_label_values(["wal_full"]), timeout: ingest_result_total_vec.with_label_values(["timeout"]), router_timeout: ingest_result_total_vec.with_label_values(["router_timeout"]), diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index d1823e0754a..db991d60e09 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -366,8 +366,8 @@ impl IngestRouter { // Subrequests for which no shards are available to route the subrequests to. let mut no_shards_available_subrequest_ids: Vec = Vec::new(); - // Subrequests for which the shards are rate limited. - let mut rate_limited_subrequest_ids: Vec = Vec::new(); + // Subrequests for which all the shards are rate limited. + let mut all_shards_rate_limited_subrequest_ids: Vec = Vec::new(); let mut per_leader_persist_subrequests: HashMap<&LeaderId, Vec> = HashMap::new(); @@ -385,7 +385,7 @@ impl IngestRouter { let next_open_shard = match next_open_shard_res_opt { Some(Ok(next_open_shard)) => next_open_shard, Some(Err(NextOpenShardError::RateLimited)) => { - rate_limited_subrequest_ids.push(subrequest.subrequest_id); + all_shards_rate_limited_subrequest_ids.push(subrequest.subrequest_id); continue; } Some(Err(NextOpenShardError::NoShardsAvailable)) | None => { @@ -450,8 +450,8 @@ impl IngestRouter { for subrequest_id in no_shards_available_subrequest_ids { workbench.record_no_shards_available(subrequest_id); } - for subrequest_id in rate_limited_subrequest_ids { - workbench.record_rate_limited(subrequest_id); + for subrequest_id in all_shards_rate_limited_subrequest_ids { + workbench.record_all_shards_rate_limited(subrequest_id); } self.process_persist_results(workbench, persist_futures) .await; @@ -539,7 +539,10 @@ fn update_ingest_metrics(ingest_result: &IngestV2Result, num_s ingest_results_metrics.no_shards_available.inc() } IngestFailureReason::ShardRateLimited => { - ingest_results_metrics.shard_rate_limited.inc() + ingest_results_metrics.attempted_shards_rate_limited.inc() + } + IngestFailureReason::AllShardsRateLimited => { + ingest_results_metrics.all_shards_rate_limited.inc(); } IngestFailureReason::WalFull => ingest_results_metrics.wal_full.inc(), IngestFailureReason::Timeout => ingest_results_metrics.timeout.inc(), @@ -568,9 +571,14 @@ fn update_ingest_metrics(ingest_result: &IngestV2Result, num_s .circuit_breaker .inc_by(num_subrequests); } - RateLimitingCause::ShardRateLimiting => { + RateLimitingCause::AttemptedShardsRateLimited => { + ingest_results_metrics + .attempted_shards_rate_limited + .inc_by(num_subrequests); + } + RateLimitingCause::AllShardsRateLimited => { ingest_results_metrics - .shard_rate_limited + .all_shards_rate_limited .inc_by(num_subrequests); } RateLimitingCause::Unknown => { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index b9c9864ca48..09ed205a3a8 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -330,7 +330,9 @@ impl RoutingTableEntry { #[derive(Debug, PartialEq, Eq)] pub(super) enum NextOpenShardError { + /// no open shard NoShardsAvailable, + /// all open shards are rate limited RateLimited, } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 686c2fd9c0a..b2e0376276e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -240,10 +240,10 @@ impl IngestWorkbench { self.record_failure(subrequest_id, SubworkbenchFailure::NoShardsAvailable); } - pub fn record_rate_limited(&mut self, subrequest_id: SubrequestId) { + pub fn record_all_shards_rate_limited(&mut self, subrequest_id: SubrequestId) { self.record_failure( subrequest_id, - SubworkbenchFailure::RateLimited(RateLimitingCause::ShardRateLimiting), + SubworkbenchFailure::RateLimited(RateLimitingCause::AllShardsRateLimited), ); } @@ -359,7 +359,12 @@ impl SubworkbenchFailure { RateLimitingCause::LoadShedding => IngestFailureReason::RouterLoadShedding, RateLimitingCause::WalFull => IngestFailureReason::WalFull, RateLimitingCause::CircuitBreaker => IngestFailureReason::CircuitBreaker, - RateLimitingCause::ShardRateLimiting => IngestFailureReason::ShardRateLimited, + RateLimitingCause::AttemptedShardsRateLimited => { + IngestFailureReason::ShardRateLimited + } + RateLimitingCause::AllShardsRateLimited => { + IngestFailureReason::AllShardsRateLimited + } RateLimitingCause::Unknown => IngestFailureReason::Unspecified, }, Self::Persist(persist_failure_reason) => (*persist_failure_reason).into(), diff --git a/quickwit/quickwit-proto/protos/quickwit/router.proto b/quickwit/quickwit-proto/protos/quickwit/router.proto index 11f6d2bdf63..2ff88c79903 100644 --- a/quickwit/quickwit-proto/protos/quickwit/router.proto +++ b/quickwit/quickwit-proto/protos/quickwit/router.proto @@ -62,12 +62,15 @@ enum IngestFailureReason { INGEST_FAILURE_REASON_SOURCE_NOT_FOUND = 2; INGEST_FAILURE_REASON_INTERNAL = 3; INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE = 4; + // the shards we tried to write to are rate limited INGEST_FAILURE_REASON_SHARD_RATE_LIMITED = 5; INGEST_FAILURE_REASON_WAL_FULL = 6; INGEST_FAILURE_REASON_TIMEOUT = 7; INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING = 8; INGEST_FAILURE_REASON_LOAD_SHEDDING = 9; INGEST_FAILURE_REASON_CIRCUIT_BREAKER = 10; + // all the known open shards are rate limited + INGEST_FAILURE_REASON_ALL_SHARDS_RATE_LIMITED = 11; } message IngestFailure { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs index 1f43bd342ca..84186fb3165 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs @@ -75,12 +75,14 @@ pub enum IngestFailureReason { SourceNotFound = 2, Internal = 3, NoShardsAvailable = 4, + /// the shards we attempted to write to are rate limited ShardRateLimited = 5, WalFull = 6, Timeout = 7, RouterLoadShedding = 8, LoadShedding = 9, CircuitBreaker = 10, + AllShardsRateLimited = 11, } impl IngestFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -91,24 +93,17 @@ impl IngestFailureReason { match self { IngestFailureReason::Unspecified => "INGEST_FAILURE_REASON_UNSPECIFIED", IngestFailureReason::IndexNotFound => "INGEST_FAILURE_REASON_INDEX_NOT_FOUND", - IngestFailureReason::SourceNotFound => { - "INGEST_FAILURE_REASON_SOURCE_NOT_FOUND" - } + IngestFailureReason::SourceNotFound => "INGEST_FAILURE_REASON_SOURCE_NOT_FOUND", IngestFailureReason::Internal => "INGEST_FAILURE_REASON_INTERNAL", - IngestFailureReason::NoShardsAvailable => { - "INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE" - } - IngestFailureReason::ShardRateLimited => { - "INGEST_FAILURE_REASON_SHARD_RATE_LIMITED" - } + IngestFailureReason::NoShardsAvailable => "INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE", + IngestFailureReason::ShardRateLimited => "INGEST_FAILURE_REASON_SHARD_RATE_LIMITED", IngestFailureReason::WalFull => "INGEST_FAILURE_REASON_WAL_FULL", IngestFailureReason::Timeout => "INGEST_FAILURE_REASON_TIMEOUT", - IngestFailureReason::RouterLoadShedding => { - "INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING" - } + IngestFailureReason::RouterLoadShedding => "INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING", IngestFailureReason::LoadShedding => "INGEST_FAILURE_REASON_LOAD_SHEDDING", - IngestFailureReason::CircuitBreaker => { - "INGEST_FAILURE_REASON_CIRCUIT_BREAKER" + IngestFailureReason::CircuitBreaker => "INGEST_FAILURE_REASON_CIRCUIT_BREAKER", + IngestFailureReason::AllShardsRateLimited => { + "INGEST_FAILURE_REASON_ALL_SHARDS_RATE_LIMITED" } } } @@ -123,11 +118,10 @@ impl IngestFailureReason { "INGEST_FAILURE_REASON_SHARD_RATE_LIMITED" => Some(Self::ShardRateLimited), "INGEST_FAILURE_REASON_WAL_FULL" => Some(Self::WalFull), "INGEST_FAILURE_REASON_TIMEOUT" => Some(Self::Timeout), - "INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING" => { - Some(Self::RouterLoadShedding) - } + "INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING" => Some(Self::RouterLoadShedding), "INGEST_FAILURE_REASON_LOAD_SHEDDING" => Some(Self::LoadShedding), "INGEST_FAILURE_REASON_CIRCUIT_BREAKER" => Some(Self::CircuitBreaker), + "INGEST_FAILURE_REASON_ALL_SHARDS_RATE_LIMITED" => Some(Self::AllShardsRateLimited), _ => None, } } @@ -135,8 +129,9 @@ impl IngestFailureReason { /// BEGIN quickwit-codegen #[allow(unused_imports)] use std::str::FromStr; -use tower::{Layer, Service, ServiceExt}; + use quickwit_common::tower::RpcName; +use tower::{Layer, Service, ServiceExt}; impl RpcName for IngestRequestV2 { fn rpc_name() -> &'static str { "ingest" @@ -160,14 +155,12 @@ pub struct IngestRouterServiceClient { struct InnerIngestRouterServiceClient(std::sync::Arc); impl IngestRouterServiceClient { pub fn new(instance: T) -> Self - where - T: IngestRouterService, - { + where T: IngestRouterService { #[cfg(any(test, feature = "testsuite"))] assert!( - std::any::TypeId::of:: < T > () != std::any::TypeId::of:: < - MockIngestRouterService > (), - "`MockIngestRouterService` must be wrapped in a `MockIngestRouterServiceWrapper`: use `IngestRouterServiceClient::from_mock(mock)` to instantiate the client" + std::any::TypeId::of::() != std::any::TypeId::of::(), + "`MockIngestRouterService` must be wrapped in a `MockIngestRouterServiceWrapper`: use \ + `IngestRouterServiceClient::from_mock(mock)` to instantiate the client" ); Self { inner: InnerIngestRouterServiceClient(std::sync::Arc::new(instance)), @@ -189,18 +182,12 @@ impl IngestRouterServiceClient { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, ) -> Self { - let (_, connection_keys_watcher) = tokio::sync::watch::channel( - std::collections::HashSet::from_iter([addr]), - ); - let client = ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::new( - channel, - ) + let (_, connection_keys_watcher) = + tokio::sync::watch::channel(std::collections::HashSet::from_iter([addr])); + let client = ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::new(channel) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); - let adapter = IngestRouterServiceGrpcClientAdapter::new( - client, - connection_keys_watcher, - ); + let adapter = IngestRouterServiceGrpcClientAdapter::new(client, connection_keys_watcher); Self::new(adapter) } pub fn from_balance_channel( @@ -208,15 +195,11 @@ impl IngestRouterServiceClient { max_message_size: bytesize::ByteSize, ) -> IngestRouterServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let client = ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::new( - balance_channel, - ) - .max_decoding_message_size(max_message_size.0 as usize) - .max_encoding_message_size(max_message_size.0 as usize); - let adapter = IngestRouterServiceGrpcClientAdapter::new( - client, - connection_keys_watcher, - ); + let client = + ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::new(balance_channel) + .max_decoding_message_size(max_message_size.0 as usize) + .max_encoding_message_size(max_message_size.0 as usize); + let adapter = IngestRouterServiceGrpcClientAdapter::new(client, connection_keys_watcher); Self::new(adapter) } pub fn from_mailbox(mailbox: quickwit_actors::Mailbox) -> Self @@ -267,9 +250,8 @@ pub mod mock_ingest_router_service { } } } -pub type BoxFuture = std::pin::Pin< - Box> + Send + 'static>, ->; +pub type BoxFuture = + std::pin::Pin> + Send + 'static>>; impl tower::Service for InnerIngestRouterServiceClient { type Response = IngestResponseV2; type Error = crate::ingest::IngestV2Error; @@ -329,7 +311,10 @@ impl IngestRouterServiceTowerLayerStack { IngestResponseV2, crate::ingest::IngestV2Error, >, - > + Clone + Send + Sync + 'static, + > + Clone + + Send + + Sync + + 'static, + Clone + Send + Sync + 'static, + > + Clone + + Send + + Sync + + 'static, <, >>::Service as tower::Service>::Future: Send + 'static, { - self.ingest_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.ingest_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } pub fn stack_ingest_layer(mut self, layer: L) -> Self @@ -360,21 +349,25 @@ impl IngestRouterServiceTowerLayerStack { IngestResponseV2, crate::ingest::IngestV2Error, >, - > + Send + Sync + 'static, + > + Send + + Sync + + 'static, L::Service: tower::Service< IngestRequestV2, Response = IngestResponseV2, Error = crate::ingest::IngestV2Error, - > + Clone + Send + Sync + 'static, + > + Clone + + Send + + Sync + + 'static, >::Future: Send + 'static, { - self.ingest_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self.ingest_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn build(self, instance: T) -> IngestRouterServiceClient - where - T: IngestRouterService, - { + where T: IngestRouterService { let inner_client = InnerIngestRouterServiceClient(std::sync::Arc::new(instance)); self.build_from_inner_client(inner_client) } @@ -384,11 +377,7 @@ impl IngestRouterServiceTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, ) -> IngestRouterServiceClient { - let client = IngestRouterServiceClient::from_channel( - addr, - channel, - max_message_size, - ); + let client = IngestRouterServiceClient::from_channel(addr, channel, max_message_size); let inner_client = client.inner; self.build_from_inner_client(inner_client) } @@ -397,10 +386,8 @@ impl IngestRouterServiceTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, ) -> IngestRouterServiceClient { - let client = IngestRouterServiceClient::from_balance_channel( - balance_channel, - max_message_size, - ); + let client = + IngestRouterServiceClient::from_balance_channel(balance_channel, max_message_size); let inner_client = client.inner; self.build_from_inner_client(inner_client) } @@ -412,16 +399,13 @@ impl IngestRouterServiceTowerLayerStack { A: quickwit_actors::Actor + std::fmt::Debug + Send + 'static, IngestRouterServiceMailbox: IngestRouterService, { - let inner_client = InnerIngestRouterServiceClient( - std::sync::Arc::new(IngestRouterServiceMailbox::new(mailbox)), - ); + let inner_client = InnerIngestRouterServiceClient(std::sync::Arc::new( + IngestRouterServiceMailbox::new(mailbox), + )); self.build_from_inner_client(inner_client) } #[cfg(any(test, feature = "testsuite"))] - pub fn build_from_mock( - self, - mock: MockIngestRouterService, - ) -> IngestRouterServiceClient { + pub fn build_from_mock(self, mock: MockIngestRouterService) -> IngestRouterServiceClient { let client = IngestRouterServiceClient::from_mock(mock); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -430,14 +414,10 @@ impl IngestRouterServiceTowerLayerStack { self, inner_client: InnerIngestRouterServiceClient, ) -> IngestRouterServiceClient { - let ingest_svc = self - .ingest_layers - .into_iter() - .rev() - .fold( - quickwit_common::tower::BoxService::new(inner_client.clone()), - |svc, layer| layer.layer(svc), - ); + let ingest_svc = self.ingest_layers.into_iter().rev().fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); let tower_svc_stack = IngestRouterServiceTowerServiceStack { inner: inner_client, ingest_svc, @@ -451,8 +431,7 @@ struct MailboxAdapter { phantom: std::marker::PhantomData, } impl std::ops::Deref for MailboxAdapter -where - A: quickwit_actors::Actor, +where A: quickwit_actors::Actor { type Target = quickwit_actors::Mailbox; fn deref(&self) -> &Self::Target { @@ -484,7 +463,8 @@ impl Clone for IngestRouterServiceMailbox { impl tower::Service for IngestRouterServiceMailbox where A: quickwit_actors::Actor - + quickwit_actors::DeferableReplyHandler> + Send + + quickwit_actors::DeferableReplyHandler> + + Send + 'static, M: std::fmt::Debug + Send + 'static, T: Send + 'static, @@ -506,7 +486,10 @@ where fn call(&mut self, message: M) -> Self::Future { let mailbox = self.inner.clone(); let fut = async move { - mailbox.ask_for_res(message).await.map_err(|error| error.into()) + mailbox + .ask_for_res(message) + .await + .map_err(|error| error.into()) }; Box::pin(fut) } @@ -515,9 +498,7 @@ where impl IngestRouterService for IngestRouterServiceMailbox where A: quickwit_actors::Actor + std::fmt::Debug, - IngestRouterServiceMailbox< - A, - >: tower::Service< + IngestRouterServiceMailbox: tower::Service< IngestRequestV2, Response = IngestResponseV2, Error = crate::ingest::IngestV2Error, @@ -535,9 +516,8 @@ where pub struct IngestRouterServiceGrpcClientAdapter { inner: T, #[allow(dead_code)] - connection_addrs_rx: tokio::sync::watch::Receiver< - std::collections::HashSet, - >, + connection_addrs_rx: + tokio::sync::watch::Receiver>, } impl IngestRouterServiceGrpcClientAdapter { pub fn new( @@ -554,15 +534,18 @@ impl IngestRouterServiceGrpcClientAdapter { } #[async_trait::async_trait] impl IngestRouterService -for IngestRouterServiceGrpcClientAdapter< - ingest_router_service_grpc_client::IngestRouterServiceGrpcClient, -> + for IngestRouterServiceGrpcClientAdapter< + ingest_router_service_grpc_client::IngestRouterServiceGrpcClient, + > where - T: tonic::client::GrpcService + std::fmt::Debug + Clone + Send - + Sync + 'static, + T: tonic::client::GrpcService + + std::fmt::Debug + + Clone + + Send + + Sync + + 'static, T::ResponseBody: tonic::codegen::Body + Send + 'static, - ::Error: Into - + Send, + ::Error: Into + Send, T::Future: Send, { async fn ingest( @@ -574,10 +557,9 @@ where .ingest(request) .await .map(|response| response.into_inner()) - .map_err(|status| crate::error::grpc_status_to_service_error( - status, - IngestRequestV2::rpc_name(), - )) + .map_err(|status| { + crate::error::grpc_status_to_service_error(status, IngestRequestV2::rpc_name()) + }) } } #[derive(Debug)] @@ -586,9 +568,7 @@ pub struct IngestRouterServiceGrpcServerAdapter { } impl IngestRouterServiceGrpcServerAdapter { pub fn new(instance: T) -> Self - where - T: IngestRouterService, - { + where T: IngestRouterService { Self { inner: InnerIngestRouterServiceClient(std::sync::Arc::new(instance)), } @@ -596,7 +576,8 @@ impl IngestRouterServiceGrpcServerAdapter { } #[async_trait::async_trait] impl ingest_router_service_grpc_server::IngestRouterServiceGrpc -for IngestRouterServiceGrpcServerAdapter { + for IngestRouterServiceGrpcServerAdapter +{ async fn ingest( &self, request: tonic::Request, @@ -612,8 +593,8 @@ for IngestRouterServiceGrpcServerAdapter { /// Generated client implementations. pub mod ingest_router_service_grpc_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; #[derive(Debug, Clone)] pub struct IngestRouterServiceGrpcClient { inner: tonic::client::Grpc, @@ -657,13 +638,10 @@ pub mod ingest_router_service_grpc_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { - IngestRouterServiceGrpcClient::new( - InterceptedService::new(inner, interceptor), - ) + IngestRouterServiceGrpcClient::new(InterceptedService::new(inner, interceptor)) } /// Compress requests with the given encoding. /// @@ -701,31 +679,22 @@ pub mod ingest_router_service_grpc_client { pub async fn ingest( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/quickwit.ingest.router.IngestRouterService/Ingest", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "quickwit.ingest.router.IngestRouterService", - "Ingest", - ), - ); + req.extensions_mut().insert(GrpcMethod::new( + "quickwit.ingest.router.IngestRouterService", + "Ingest", + )); self.inner.unary(req, path, codec).await } } @@ -734,7 +703,8 @@ pub mod ingest_router_service_grpc_client { pub mod ingest_router_service_grpc_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with IngestRouterServiceGrpcServer. + /// Generated trait containing gRPC methods that should be implemented for use with + /// IngestRouterServiceGrpcServer. #[async_trait] pub trait IngestRouterServiceGrpc: Send + Sync + 'static { /// Ingests batches of documents for one or multiple indexes. @@ -742,10 +712,7 @@ pub mod ingest_router_service_grpc_server { async fn ingest( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct IngestRouterServiceGrpcServer { @@ -770,13 +737,8 @@ pub mod ingest_router_service_grpc_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService - where - F: tonic::service::Interceptor, - { + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + where F: tonic::service::Interceptor { InterceptedService::new(Self::new(inner), interceptor) } /// Enable decompressing requests with the given encoding. @@ -808,8 +770,7 @@ pub mod ingest_router_service_grpc_server { self } } - impl tonic::codegen::Service> - for IngestRouterServiceGrpcServer + impl tonic::codegen::Service> for IngestRouterServiceGrpcServer where T: IngestRouterServiceGrpc, B: Body + Send + 'static, @@ -830,15 +791,11 @@ pub mod ingest_router_service_grpc_server { "/quickwit.ingest.router.IngestRouterService/Ingest" => { #[allow(non_camel_case_types)] struct IngestSvc(pub Arc); - impl< - T: IngestRouterServiceGrpc, - > tonic::server::UnaryService - for IngestSvc { + impl + tonic::server::UnaryService for IngestSvc + { type Response = super::IngestResponseV2; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -871,18 +828,14 @@ pub mod ingest_router_service_grpc_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } @@ -908,8 +861,7 @@ pub mod ingest_router_service_grpc_server { write!(f, "{:?}", self.0) } } - impl tonic::server::NamedService - for IngestRouterServiceGrpcServer { + impl tonic::server::NamedService for IngestRouterServiceGrpcServer { const NAME: &'static str = "quickwit.ingest.router.IngestRouterService"; } } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index 3afb0c5e36f..8b65d0db9b4 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -42,8 +42,10 @@ pub enum RateLimitingCause { WalFull, #[error("circuit breaker")] CircuitBreaker, - #[error("shard rate limiting")] - ShardRateLimiting, + #[error("attempted shards rate limited")] + AttemptedShardsRateLimited, + #[error("all shards rate limited")] + AllShardsRateLimited, #[error("unknown")] Unknown, }