diff --git a/Cargo.lock b/Cargo.lock index 34317baeb0bc..a4f63ea2a72b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11196,6 +11196,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", + "flate2", "h2", "http", "http-body", @@ -11213,6 +11214,7 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "zstd 0.12.4", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8390a4f0f710..13a62f6682c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,7 +169,7 @@ tokio = { version = "1.36", features = ["full"] } tokio-stream = { version = "0.1" } tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.8.8" -tonic = { version = "0.11", features = ["tls"] } +tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] } uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] } zstd = "0.13" diff --git a/Makefile b/Makefile index cb8e0ce273bb..6fa1f02f87dc 100644 --- a/Makefile +++ b/Makefile @@ -199,7 +199,7 @@ config-docs: ## Generate configuration documentation from toml files. docker run --rm \ -v ${PWD}:/greptimedb \ -w /greptimedb/config \ - toml2docs/toml2docs:latest \ + toml2docs/toml2docs:v0.1.1 \ -p '##' \ -t ./config-docs-template.md \ -o ./config.md diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 5e82295c16f6..ea072a822d39 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -22,6 +22,7 @@ use arrow_flight::flight_service_client::FlightServiceClient; use common_grpc::channel_manager::ChannelManager; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt}; +use tonic::codec::CompressionEncoding; use tonic::transport::Channel; use crate::load_balance::{LoadBalance, Loadbalancer}; @@ -151,24 +152,34 @@ impl Client { pub fn make_flight_client(&self) -> Result { let (addr, channel) = self.find_channel()?; - Ok(FlightClient { - addr, - client: FlightServiceClient::new(channel) - .max_decoding_message_size(self.max_grpc_recv_message_size()) - .max_encoding_message_size(self.max_grpc_send_message_size()), - }) + + let client = FlightServiceClient::new(channel) + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size()) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Zstd); + + Ok(FlightClient { addr, client }) } pub(crate) fn raw_region_client(&self) -> Result> { let (_, channel) = self.find_channel()?; - Ok(PbRegionClient::new(channel) + let client = PbRegionClient::new(channel) .max_decoding_message_size(self.max_grpc_recv_message_size()) - .max_encoding_message_size(self.max_grpc_send_message_size())) + .max_encoding_message_size(self.max_grpc_send_message_size()) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Zstd); + Ok(client) } pub fn make_prometheus_gateway_client(&self) -> Result> { let (_, channel) = self.find_channel()?; - Ok(PrometheusGatewayClient::new(channel)) + let client = PrometheusGatewayClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd); + Ok(client) } pub async fn health_check(&self) -> Result<()> { diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 042c31a78741..8ef02b411d04 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -27,6 +27,7 @@ pub mod mock { use common_runtime::{Builder as RuntimeBuilder, Runtime}; use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler}; use tokio::sync::mpsc; + use tonic::codec::CompressionEncoding; use tonic::transport::Server; use tower::service_fn; @@ -57,7 +58,13 @@ pub mod mock { tokio::spawn(async move { Server::builder() - .add_service(RegionServer::new(handler)) + .add_service( + RegionServer::new(handler) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd), + ) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)])) .await }); diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs index e0e8e813748f..2155b36462a2 100644 --- a/src/servers/src/grpc/builder.rs +++ b/src/servers/src/grpc/builder.rs @@ -23,6 +23,7 @@ use common_runtime::Runtime; use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer; use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; use tokio::sync::Mutex; +use tonic::codec::CompressionEncoding; use tonic::transport::server::RoutesBuilder; use tower::ServiceBuilder; @@ -45,11 +46,15 @@ macro_rules! add_service { let max_recv_message_size = $builder.config().max_recv_message_size; let max_send_message_size = $builder.config().max_send_message_size; - $builder.routes_builder_mut().add_service( - $service - .max_decoding_message_size(max_recv_message_size) - .max_encoding_message_size(max_send_message_size), - ) + let service_builder = $service + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd); + + $builder.routes_builder_mut().add_service(service_builder); }; } @@ -123,16 +128,26 @@ impl GrpcServerBuilder { otlp_handler: OpenTelemetryProtocolHandlerRef, user_provider: Option, ) -> Self { + let tracing_service = TraceServiceServer::new(OtlpService::new(otlp_handler.clone())) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd); + let trace_server = ServiceBuilder::new() .layer(AuthMiddlewareLayer::with(user_provider.clone())) - .service(TraceServiceServer::new(OtlpService::new( - otlp_handler.clone(), - ))); + .service(tracing_service); self.routes_builder.add_service(trace_server); + let metrics_service = MetricsServiceServer::new(OtlpService::new(otlp_handler)) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd); + let metrics_server = ServiceBuilder::new() .layer(AuthMiddlewareLayer::with(user_provider)) - .service(MetricsServiceServer::new(OtlpService::new(otlp_handler))); + .service(metrics_service); self.routes_builder.add_service(metrics_server); self diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index 183fabb5d44e..4155f5eac738 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -34,6 +34,7 @@ use table::TableRef; use tests_integration::database::Database; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; +use tonic::codec::CompressionEncoding; use crate::{create_testing_grpc_query_handler, LOCALHOST_WITH_0}; @@ -64,6 +65,10 @@ impl MockGrpcServer { ) .into(); FlightServiceServer::new(service) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd) } } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 4131d36b30d0..bfa59966ae8e 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -53,6 +53,7 @@ use servers::grpc::region_server::RegionServerRequestHandler; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; use tempfile::TempDir; +use tonic::codec::CompressionEncoding; use tonic::transport::Server; use tower::service_fn; use uuid::Uuid; @@ -436,8 +437,20 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { let _handle = tokio::spawn(async move { Server::builder() - .add_service(FlightServiceServer::new(flight_handler)) - .add_service(RegionServer::new(region_server_handler)) + .add_service( + FlightServiceServer::new(flight_handler) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd), + ) + .add_service( + RegionServer::new(region_server_handler) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd), + ) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await }); diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 5a14751bbc45..b6a56aace7dd 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -70,6 +70,7 @@ macro_rules! grpc_tests { test_insert_and_select, test_dbname, test_grpc_message_size_ok, + test_grpc_zstd_compression, test_grpc_message_size_limit_recv, test_grpc_message_size_limit_send, test_grpc_auth, @@ -142,6 +143,25 @@ pub async fn test_grpc_message_size_ok(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_grpc_zstd_compression(store_type: StorageType) { + // server and client both support gzip + let config = GrpcServerConfig { + max_recv_message_size: 1024, + max_send_message_size: 1024, + }; + let (addr, mut guard, fe_grpc_server) = + setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await; + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new_with_dbname( + format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + grpc_client, + ); + db.sql("show tables;").await.unwrap(); + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + pub async fn test_grpc_message_size_limit_send(store_type: StorageType) { let config = GrpcServerConfig { max_recv_message_size: 1024,