Skip to content

Commit

Permalink
feat: support compression on gRPC server (#3961)
Browse files Browse the repository at this point in the history
* feat: enable gzip in grpc server side

* feat: add enable_gzip_compression config

* test: add grpc compression test

* feat: support user configured compression on grpc server

* chore: update doc

* chore: add tests

* fix: make config-docs

* chore: fix cr issue

* chore: add test

* refactor: remove config on server side, auto enable all compression support

* chore: minor update

* chore: remove unused code

* refactor: enable zstd compression internally by default

* chore: minor fix
  • Loading branch information
shuiyisong authored May 20, 2024
1 parent 533ada7 commit 19543f9
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 23 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ tokio = { version = "1.36", features = ["full"] }
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.11", features = ["tls"] }
tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] }
uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] }
zstd = "0.13"

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ config-docs: ## Generate configuration documentation from toml files.
docker run --rm \
-v ${PWD}:/greptimedb \
-w /greptimedb/config \
toml2docs/toml2docs:latest \
toml2docs/toml2docs:v0.1.1 \
-p '##' \
-t ./config-docs-template.md \
-o ./config.md
Expand Down
29 changes: 20 additions & 9 deletions src/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow_flight::flight_service_client::FlightServiceClient;
use common_grpc::channel_manager::ChannelManager;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;

use crate::load_balance::{LoadBalance, Loadbalancer};
Expand Down Expand Up @@ -151,24 +152,34 @@ impl Client {

pub fn make_flight_client(&self) -> Result<FlightClient> {
let (addr, channel) = self.find_channel()?;
Ok(FlightClient {
addr,
client: FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()),
})

let client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);

Ok(FlightClient { addr, client })
}

pub(crate) fn raw_region_client(&self) -> Result<PbRegionClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PbRegionClient::new(channel)
let client = PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()))
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
Ok(client)
}

pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PrometheusGatewayClient::new(channel))
let client = PrometheusGatewayClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);
Ok(client)
}

pub async fn health_check(&self) -> Result<()> {
Expand Down
9 changes: 8 additions & 1 deletion src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod mock {
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
use tokio::sync::mpsc;
use tonic::codec::CompressionEncoding;
use tonic::transport::Server;
use tower::service_fn;

Expand Down Expand Up @@ -57,7 +58,13 @@ pub mod mock {

tokio::spawn(async move {
Server::builder()
.add_service(RegionServer::new(handler))
.add_service(
RegionServer::new(handler)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
.await
});
Expand Down
33 changes: 24 additions & 9 deletions src/servers/src/grpc/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_runtime::Runtime;
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
use tokio::sync::Mutex;
use tonic::codec::CompressionEncoding;
use tonic::transport::server::RoutesBuilder;
use tower::ServiceBuilder;

Expand All @@ -45,11 +46,15 @@ macro_rules! add_service {
let max_recv_message_size = $builder.config().max_recv_message_size;
let max_send_message_size = $builder.config().max_send_message_size;

$builder.routes_builder_mut().add_service(
$service
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size),
)
let service_builder = $service
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

$builder.routes_builder_mut().add_service(service_builder);
};
}

Expand Down Expand Up @@ -123,16 +128,26 @@ impl GrpcServerBuilder {
otlp_handler: OpenTelemetryProtocolHandlerRef,
user_provider: Option<UserProviderRef>,
) -> Self {
let tracing_service = TraceServiceServer::new(OtlpService::new(otlp_handler.clone()))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

let trace_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider.clone()))
.service(TraceServiceServer::new(OtlpService::new(
otlp_handler.clone(),
)));
.service(tracing_service);
self.routes_builder.add_service(trace_server);

let metrics_service = MetricsServiceServer::new(OtlpService::new(otlp_handler))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

let metrics_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider))
.service(MetricsServiceServer::new(OtlpService::new(otlp_handler)));
.service(metrics_service);
self.routes_builder.add_service(metrics_server);

self
Expand Down
5 changes: 5 additions & 0 deletions src/servers/tests/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use table::TableRef;
use tests_integration::database::Database;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::codec::CompressionEncoding;

use crate::{create_testing_grpc_query_handler, LOCALHOST_WITH_0};

Expand Down Expand Up @@ -64,6 +65,10 @@ impl MockGrpcServer {
)
.into();
FlightServiceServer::new(service)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd)
}
}

Expand Down
17 changes: 15 additions & 2 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use servers::grpc::region_server::RegionServerRequestHandler;
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use tempfile::TempDir;
use tonic::codec::CompressionEncoding;
use tonic::transport::Server;
use tower::service_fn;
use uuid::Uuid;
Expand Down Expand Up @@ -436,8 +437,20 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) {

let _handle = tokio::spawn(async move {
Server::builder()
.add_service(FlightServiceServer::new(flight_handler))
.add_service(RegionServer::new(region_server_handler))
.add_service(
FlightServiceServer::new(flight_handler)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
.add_service(
RegionServer::new(region_server_handler)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
Expand Down
20 changes: 20 additions & 0 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ macro_rules! grpc_tests {
test_insert_and_select,
test_dbname,
test_grpc_message_size_ok,
test_grpc_zstd_compression,
test_grpc_message_size_limit_recv,
test_grpc_message_size_limit_send,
test_grpc_auth,
Expand Down Expand Up @@ -142,6 +143,25 @@ pub async fn test_grpc_message_size_ok(store_type: StorageType) {
guard.remove_all().await;
}

pub async fn test_grpc_zstd_compression(store_type: StorageType) {
// server and client both support gzip
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
};
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;

let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new_with_dbname(
format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
grpc_client,
);
db.sql("show tables;").await.unwrap();
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
}

pub async fn test_grpc_message_size_limit_send(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
Expand Down

0 comments on commit 19543f9

Please sign in to comment.