Skip to content

Commit

Permalink
Protobuf 3 support (#615)
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Barman <[email protected]>
  • Loading branch information
lb034582341 authored Aug 9, 2023
1 parent 90dad05 commit 2821787
Show file tree
Hide file tree
Showing 59 changed files with 11,987 additions and 303 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ jobs:
- run: cargo xtask bindgen
- run: cargo build --no-default-features
- run: cargo build --no-default-features --features protobuf-codec
- run: cargo build --no-default-features --features protobufv3-codec
- run: cargo build --no-default-features --features prost-codec
- run: cd proto && cargo build --no-default-features --features prost-codec
- run: cargo build
Expand Down Expand Up @@ -80,6 +81,7 @@ jobs:
- run: cargo xtask submodule
- run: cargo build --no-default-features
- run: cargo build --no-default-features --features protobuf-codec
- run: cargo build --no-default-features --features protobufv3-codec
- run: cargo build --no-default-features --features prost-codec
- run: cargo build
- run: cargo test --all
Expand All @@ -98,6 +100,7 @@ jobs:
- run: cargo xtask bindgen
- run: cargo build --no-default-features
- run: cargo build --no-default-features --features "protobuf-codec"
- run: cargo build --no-default-features --features "protobufv3-codec"
- run: cargo build --no-default-features --features "prost-codec"
- run: cargo build
- run: cargo test --all
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ libc = "0.2"
futures-executor = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
protobuf = { version = "2.0", optional = true }
protobufv3 = { package = "protobuf", version = "3.2", optional = true }
prost = { version = "0.11", optional = true }
bytes = { version = "1.0", optional = true }
log = "0.4"
Expand All @@ -44,6 +45,7 @@ exclude = ["xtask"]
default = ["protobuf-codec", "boringssl"]
_secure = []
protobuf-codec = ["protobuf"]
protobufv3-codec = ["protobufv3"]
prost-codec = ["prost", "bytes"]
nightly = []
boringssl = ["grpcio-sys/boringssl", "_secure"]
Expand Down
7 changes: 5 additions & 2 deletions benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ publish = false

[features]
default = ["protobuf-codec"]
protobuf-codec = ["grpcio/protobuf-codec", "grpcio-proto/protobuf-codec"]
protobuf-codec = ["grpcio/protobuf-codec", "grpcio-proto/protobuf-codec", "dep:protobuf"]
protobufv3-codec = ["grpcio/protobufv3-codec", "grpcio-proto/protobufv3-codec", "dep:protobufv3"]
prost-codec = ["grpcio/prost-codec", "grpcio-proto/prost-codec"]

[dependencies]
grpcio = { path = ".." }
grpcio = { path = "..", default-features = false, features = ["boringssl"] }
grpcio-proto = { path = "../proto", default-features = false }
futures-channel = "0.3"
futures-executor = "0.3"
Expand All @@ -28,6 +29,8 @@ slog-async = "2.1"
slog-stdlog = "4.0"
slog-scope = "4.0"
slog-term = "2.2"
protobuf = { version = "2", optional = true }
protobufv3 = { package = "protobuf", version = "3.2", optional = true }

[[bin]]
name = "qps_worker"
Expand Down
9 changes: 5 additions & 4 deletions benchmark/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use grpc_proto::util;
use grpcio::GrpcSlice;

fn gen_resp(req: &SimpleRequest) -> SimpleResponse {
let payload = util::new_payload(req.get_response_size() as usize);
let mut resp = SimpleResponse::default();
resp.set_payload(payload);
resp
let payload = util::new_payload(req.response_size as usize);
SimpleResponse {
payload: Some(payload).into(),
..SimpleResponse::default()
}
}

#[derive(Clone)]
Expand Down
142 changes: 107 additions & 35 deletions benchmark/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ use futures_util::{
use grpcio::{
CallOption, Channel, ChannelBuilder, Client as GrpcClient, EnvBuilder, Environment, WriteFlags,
};
use grpcio_proto::testing::control::SecurityParams;
use grpcio_proto::testing::control::{ClientConfig, ClientType, RpcType};
use grpcio_proto::testing::messages::SimpleRequest;
use grpcio_proto::testing::payloads::PayloadConfig;
use grpcio_proto::testing::services_grpc::BenchmarkServiceClient;
use grpcio_proto::testing::stats::ClientStats;
use grpcio_proto::util as proto_util;
Expand All @@ -27,14 +29,23 @@ use rand_xorshift::XorShiftRng;
use crate::bench;
use crate::util::{self, CpuRecorder, Histogram};

#[cfg(feature = "protobuf-codec")]
fn gen_req(cfg: &ClientConfig) -> SimpleRequest {
let mut req = SimpleRequest::default();
let payload_config = cfg.get_payload_config();
let simple_params = payload_config.get_simple_params();
req.set_payload(proto_util::new_payload(
simple_params.get_req_size() as usize
));
req.set_response_size(simple_params.get_resp_size());
req.payload = Some(proto_util::new_payload(simple_params.req_size as usize)).into();
req.response_size = simple_params.resp_size;
req
}

#[cfg(feature = "protobufv3-codec")]
fn gen_req(cfg: &ClientConfig) -> SimpleRequest {
let mut req = SimpleRequest::default();
let payload_config = &cfg.payload_config;
let simple_params = payload_config.simple_params();
req.payload = Some(proto_util::new_payload(simple_params.req_size as usize)).into();
req.response_size = simple_params.resp_size;
req
}

Expand Down Expand Up @@ -138,7 +149,11 @@ struct GenericExecutor<B> {

impl<B: Backoff + Send + 'static> GenericExecutor<B> {
fn new(ctx: ExecutorContext<B>, channel: Channel, cfg: &ClientConfig) -> GenericExecutor<B> {
#[cfg(feature = "protobuf-codec")]
let cap = cfg.get_payload_config().get_bytebuf_params().get_req_size();
#[cfg(feature = "protobufv3-codec")]
let cap = cfg.payload_config.bytebuf_params().req_size;

let req = vec![0; cap as usize];
GenericExecutor {
ctx,
Expand Down Expand Up @@ -297,6 +312,23 @@ impl<B: Backoff + Send + 'static> RequestExecutor<B> {
}
}

#[cfg(feature = "protobuf-codec")]
fn get_payload_cfg(cfg: &ClientConfig) -> &PayloadConfig {
cfg.get_payload_config()
}
#[cfg(feature = "protobufv3-codec")]
fn get_payload_cfg(cfg: &ClientConfig) -> &PayloadConfig {
&cfg.payload_config
}
#[cfg(feature = "protobuf-codec")]
fn get_rpc_type(cfg: &ClientConfig) -> RpcType {
cfg.get_rpc_type()
}
#[cfg(feature = "protobufv3-codec")]
fn get_rpc_type(cfg: &ClientConfig) -> RpcType {
cfg.rpc_type.enum_value().unwrap()
}

fn execute<B: Backoff + Send + 'static>(
ctx: ExecutorContext<B>,
ch: Channel,
Expand All @@ -305,27 +337,27 @@ fn execute<B: Backoff + Send + 'static>(
) {
match client_type {
ClientType::SYNC_CLIENT => {
if cfg.get_payload_config().has_bytebuf_params() {
if get_payload_cfg(cfg).has_bytebuf_params() {
panic!("only async_client is supported for generic service.");
}
RequestExecutor::new(ctx, ch, cfg).execute_unary()
}
ClientType::ASYNC_CLIENT => match cfg.get_rpc_type() {
ClientType::ASYNC_CLIENT => match get_rpc_type(cfg) {
RpcType::UNARY => {
if cfg.get_payload_config().has_bytebuf_params() {
if get_payload_cfg(cfg).has_bytebuf_params() {
panic!("only ping pong streaming is supported for generic service.");
}
RequestExecutor::new(ctx, ch, cfg).execute_unary_async()
}
RpcType::STREAMING => {
if cfg.get_payload_config().has_bytebuf_params() {
if get_payload_cfg(cfg).has_bytebuf_params() {
GenericExecutor::new(ctx, ch, cfg).execute_stream()
} else {
RequestExecutor::new(ctx, ch, cfg).execute_stream_ping_pong()
}
}
RpcType::STREAMING_FROM_CLIENT => {
if cfg.get_payload_config().has_bytebuf_params() {
if get_payload_cfg(cfg).has_bytebuf_params() {
panic!("only ping pong streaming is supported for generic service.");
}
RequestExecutor::new(ctx, ch, cfg).execute_stream_from_client()
Expand All @@ -344,68 +376,108 @@ pub struct Client {
running_reqs: Option<Vec<Receiver<()>>>,
}

#[cfg(feature = "protobuf-codec")]
fn get_security_params(cfg: &ClientConfig) -> Option<&SecurityParams> {
match cfg.has_security_params() {
true => Some(cfg.get_security_params()),
false => None,
}
}
#[cfg(feature = "protobufv3-codec")]
fn get_security_params(cfg: &ClientConfig) -> Option<&SecurityParams> {
cfg.security_params.0.as_deref()
}

impl Client {
pub fn new(cfg: &ClientConfig) -> Client {
let mut builder = EnvBuilder::new();
let thd_cnt = cfg.get_async_client_threads() as usize;
let thd_cnt = cfg.async_client_threads as usize;
if thd_cnt != 0 {
builder = builder.cq_count(thd_cnt);
}
let env = Arc::new(builder.build());
if cfg.get_core_limit() > 0 {
if cfg.core_limit > 0 {
error!("client config core limit is set but ignored");
}

let ch_env = env.clone();
let channels = (0..cfg.get_client_channels())
.zip(cfg.get_server_targets().iter().cycle())
let channels = (0..cfg.client_channels)
.zip(cfg.server_targets.iter().cycle())
.map(|(_, addr)| {
let mut builder = ChannelBuilder::new(ch_env.clone());
for arg in cfg.get_channel_args() {
let key = CString::new(arg.get_name()).unwrap();
for arg in &cfg.channel_args {
let key = CString::new(arg.name.clone()).unwrap();
#[cfg(feature = "protobuf-codec")]
if arg.has_str_value() {
builder =
builder.raw_cfg_string(key, CString::new(arg.get_str_value()).unwrap());
} else if arg.has_int_value() {
builder = builder.raw_cfg_int(key, arg.get_int_value());
}
#[cfg(feature = "protobufv3-codec")]
if arg.has_str_value() {
builder =
builder.raw_cfg_string(key, CString::new(arg.str_value()).unwrap());
} else if arg.has_int_value() {
builder = builder.raw_cfg_int(key, arg.int_value());
}
}
// Check https://github.com/grpc/grpc/issues/31465.
builder = builder.enable_retry(false);
if cfg.has_security_params() {
let params = cfg.get_security_params();
if !params.get_server_host_override().is_empty() {
builder = builder
.override_ssl_target(params.get_server_host_override().to_owned());
if let Some(params) = get_security_params(cfg) {
if !params.server_host_override.is_empty() {
builder =
builder.override_ssl_target(params.server_host_override.to_owned());
}
builder =
builder.set_credentials(proto_util::create_test_channel_credentials());
}
builder.connect(addr)
});

let client_type = cfg.get_client_type();
let load_params = cfg.get_load_params();
let client_channels = cfg.get_client_channels() as usize;
let outstanding_rpcs_per_channel = cfg.get_outstanding_rpcs_per_channel() as usize;

let recorder = CpuRecorder::new();
#[cfg(feature = "protobuf-codec")]
let client_type = cfg.client_type;
#[cfg(feature = "protobuf-codec")]
let his_param = cfg.get_histogram_params();
#[cfg(feature = "protobuf-codec")]
let his = Arc::new(Mutex::new(Histogram::new(
his_param.get_resolution(),
his_param.get_max_possible(),
)));
#[cfg(feature = "protobuf-codec")]
let has_poisson = cfg.get_load_params().has_poisson();

#[cfg(feature = "protobufv3-codec")]
let client_type = cfg.client_type.enum_value().unwrap();
#[cfg(feature = "protobufv3-codec")]
let his_param = &cfg.histogram_params;
#[cfg(feature = "protobufv3-codec")]
let his = Arc::new(Mutex::new(Histogram::new(
his_param.resolution,
his_param.max_possible,
)));
#[cfg(feature = "protobufv3-codec")]
let has_poisson = cfg.load_params.has_poisson();

let client_channels = cfg.client_channels as usize;
let outstanding_rpcs_per_channel = cfg.outstanding_rpcs_per_channel as usize;

let recorder = CpuRecorder::new();
let keep_running = Arc::new(AtomicBool::new(true));
let mut running_reqs = Vec::with_capacity(client_channels * outstanding_rpcs_per_channel);

for ch in channels {
for _ in 0..cfg.get_outstanding_rpcs_per_channel() {
for _ in 0..cfg.outstanding_rpcs_per_channel {
let his = his.clone();
let ch = ch.clone();
let rx = if load_params.has_poisson() {
let lambda = load_params.get_poisson().get_offered_load()
/ client_channels as f64
/ outstanding_rpcs_per_channel as f64;
let rx = if has_poisson {
#[cfg(feature = "protobuf-codec")]
let offered_load = cfg.get_load_params().get_poisson().get_offered_load();
#[cfg(feature = "protobufv3-codec")]
let offered_load = cfg.load_params.poisson().offered_load;

let lambda =
offered_load / client_channels as f64 / outstanding_rpcs_per_channel as f64;
let poisson = Poisson::new(lambda);
let (ctx, rx) = ExecutorContext::new(his, keep_running.clone(), poisson);
execute(ctx, ch, client_type, cfg);
Expand All @@ -432,13 +504,13 @@ impl Client {
let mut stats = ClientStats::default();

let sample = self.recorder.cpu_time(reset);
stats.set_time_elapsed(sample.real_time);
stats.set_time_user(sample.user_time);
stats.set_time_system(sample.sys_time);
stats.time_elapsed = sample.real_time;
stats.time_user = sample.user_time;
stats.time_system = sample.sys_time;

{
let mut his = self.histogram.lock().unwrap();
stats.set_latencies(his.report(reset));
stats.latencies = Some(his.report(reset)).into();
}

stats
Expand Down
Loading

0 comments on commit 2821787

Please sign in to comment.