diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index 916537c02..f5ede7dfe 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -28,7 +28,7 @@ keywords = [ default = ["env-filter"] parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"] env-filter = ["tracing-subscriber/env-filter"] -grpc-web = ["tonic-web", "tower-http"] +grpc-web = ["tonic-web"] [dependencies] crossbeam-utils = "0.8.7" @@ -55,13 +55,13 @@ crossbeam-channel = "0.5" # Only for the web feature: tonic-web = { version = "0.10.2", optional = true } -tower-http = { version = "0.4", features = ["cors"], optional = true } [dev-dependencies] tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] } tower = { version = "0.4", default-features = false } futures = "0.3" http = "0.2" +tower-http = { version = "0.4", features = ["cors"] } [package.metadata.docs.rs] all-features = true diff --git a/console-subscriber/examples/grpc_web.rs b/console-subscriber/examples/grpc_web.rs index e55bab6c3..3d90ac819 100644 --- a/console-subscriber/examples/grpc_web.rs +++ b/console-subscriber/examples/grpc_web.rs @@ -4,101 +4,80 @@ //! ```sh //! cargo run --example grpc_web --features grpc-web //! ``` -use std::time::Duration; +use std::{thread, time::Duration}; -use console_subscriber::ConsoleLayer; +use console_subscriber::{ConsoleLayer, ServerParts}; use http::header::HeaderName; +use tonic_web::GrpcWebLayer; use tower_http::cors::{AllowOrigin, CorsLayer}; - -static HELP: &str = r#" -Example console-instrumented app with gRPC-Web support. - -USAGE: - app [OPTIONS] - -OPTIONS: - -h, help prints this message - blocks Includes a (misbehaving) blocking task - burn Includes a (misbehaving) task that spins CPU with self-wakes - coma Includes a (misbehaving) task that forgets to register a waker - noyield Includes a (misbehaving) task that spawns tasks that never yield -"#; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60); const DEFAULT_EXPOSED_HEADERS: [&str; 3] = ["grpc-status", "grpc-message", "grpc-status-details-bin"]; -const DEFAULT_ALLOW_HEADERS: [&str; 4] = - ["x-grpc-web", "content-type", "x-user-agent", "grpc-timeout"]; +const DEFAULT_ALLOW_HEADERS: [&str; 5] = [ + "x-grpc-web", + "content-type", + "x-user-agent", + "grpc-timeout", + "user-agent", +]; #[tokio::main] async fn main() -> Result<(), Box> { - let cors = CorsLayer::new() - .allow_origin(AllowOrigin::mirror_request()) - .allow_credentials(true) - .max_age(DEFAULT_MAX_AGE) - .expose_headers( - DEFAULT_EXPOSED_HEADERS - .iter() - .cloned() - .map(HeaderName::from_static) - .collect::>(), - ) - .allow_headers( - DEFAULT_ALLOW_HEADERS - .iter() - .cloned() - .map(HeaderName::from_static) - .collect::>(), - ); - ConsoleLayer::builder() - .with_default_env() - .with_cors(cors) - .init(); - // spawn optional extras from CLI args - // skip first which is command name - for opt in std::env::args().skip(1) { - match &*opt { - "blocks" => { - tokio::task::Builder::new() - .name("blocks") - .spawn(double_sleepy(1, 10)) - .unwrap(); - } - "coma" => { - tokio::task::Builder::new() - .name("coma") - .spawn(std::future::pending::<()>()) - .unwrap(); - } - "burn" => { - tokio::task::Builder::new() - .name("burn") - .spawn(burn(1, 10)) - .unwrap(); - } - "noyield" => { - tokio::task::Builder::new() - .name("noyield") - .spawn(no_yield(20)) - .unwrap(); - } - "blocking" => { - tokio::task::Builder::new() - .name("spawns_blocking") - .spawn(spawn_blocking(5)) - .unwrap(); - } - "help" | "-h" => { - eprintln!("{}", HELP); - return Ok(()); - } - wat => { - return Err( - format!("unknown option: {:?}, run with '-h' to see options", wat).into(), + let (console_layer, server) = ConsoleLayer::builder().with_default_env().build(); + thread::Builder::new() + .name("subscriber".into()) + .spawn(move || { + // Do not trace anything in this thread. + let _subscriber_guard; + _subscriber_guard = + tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default()); + // Custom CORS configuration. + let cors = CorsLayer::new() + .allow_origin(AllowOrigin::mirror_request()) + .allow_credentials(true) + .max_age(DEFAULT_MAX_AGE) + .expose_headers( + DEFAULT_EXPOSED_HEADERS + .iter() + .cloned() + .map(HeaderName::from_static) + .collect::>(), ) - } - } - } + .allow_headers( + DEFAULT_ALLOW_HEADERS + .iter() + .cloned() + .map(HeaderName::from_static) + .collect::>(), + ); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("console subscriber runtime initialization failed"); + runtime.block_on(async move { + let ServerParts { + instrument_server, + aggregator, + .. + } = server.into_parts(); + tokio::spawn(aggregator.run()); + let router = tonic::transport::Server::builder() + // Accept gRPC-Web requests and enable CORS. + .accept_http1(true) + .layer(cors) + .layer(GrpcWebLayer::new()) + .add_service(instrument_server); + let serve = router.serve(std::net::SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), + 9999, + )); + serve.await.expect("console subscriber server failed"); + }); + }) + .expect("console subscriber could not spawn thread"); + tracing_subscriber::registry().with(console_layer).init(); let task1 = tokio::task::Builder::new() .name("task1") @@ -141,51 +120,3 @@ async fn wait(seconds: u64) { tokio::time::sleep(Duration::from_secs(seconds)).await; tracing::trace!("done!"); } - -#[tracing::instrument] -async fn double_sleepy(min: u64, max: u64) { - loop { - for i in min..max { - // woops! - std::thread::sleep(Duration::from_secs(i)); - tokio::time::sleep(Duration::from_secs(max - i)).await; - } - } -} - -#[tracing::instrument] -async fn burn(min: u64, max: u64) { - loop { - for i in min..max { - for _ in 0..i { - tokio::task::yield_now().await; - } - tokio::time::sleep(Duration::from_secs(i - min)).await; - } - } -} - -#[tracing::instrument] -async fn no_yield(seconds: u64) { - loop { - let handle = tokio::task::Builder::new() - .name("greedy") - .spawn(async move { - std::thread::sleep(Duration::from_secs(seconds)); - }) - .expect("Couldn't spawn greedy task"); - - _ = handle.await; - } -} - -#[tracing::instrument] -async fn spawn_blocking(seconds: u64) { - loop { - let seconds = seconds; - _ = tokio::task::spawn_blocking(move || { - std::thread::sleep(Duration::from_secs(seconds)); - }) - .await; - } -} diff --git a/console-subscriber/src/builder.rs b/console-subscriber/src/builder.rs index e7348b915..15ca149fa 100644 --- a/console-subscriber/src/builder.rs +++ b/console-subscriber/src/builder.rs @@ -58,8 +58,8 @@ pub struct Builder { pub(super) scheduled_duration_max: Duration, #[cfg(feature = "grpc-web")] - /// Cors layer for grpc-web. - cors_layer: Option, + /// Whether to enable the grpc-web support. + enable_grpc_web: bool, } impl Default for Builder { @@ -76,7 +76,7 @@ impl Default for Builder { filter_env_var: "RUST_LOG".to_string(), self_trace: false, #[cfg(feature = "grpc-web")] - cors_layer: None, + enable_grpc_web: false, } } } @@ -275,9 +275,9 @@ impl Builder { } #[cfg(feature = "grpc-web")] - pub fn with_cors(self, cors: tower_http::cors::CorsLayer) -> Self { + pub fn enable_grpc_web(self, enable_grpc_web: bool) -> Self { Self { - cors_layer: Some(cors), + enable_grpc_web, ..self } } @@ -496,7 +496,7 @@ impl Builder { let self_trace = self.self_trace; #[cfg(feature = "grpc-web")] - let cors_layer = self.cors_layer.clone(); + let enable_grpc_web = self.enable_grpc_web; let (layer, server) = self.build(); let filter = @@ -519,12 +519,9 @@ impl Builder { .expect("console subscriber runtime initialization failed"); runtime.block_on(async move { #[cfg(feature = "grpc-web")] - if cors_layer.is_some() { + if enable_grpc_web { server - .serve_with_grpc_web( - tonic::transport::Server::builder(), - cors_layer.unwrap(), - ) + .serve_with_grpc_web(tonic::transport::Server::builder()) .await .expect("console subscriber server failed"); return; diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 839e6bd69..c524aed64 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -923,28 +923,30 @@ impl Server { /// ``` /// [`serve_with`]: Server::serve_with pub async fn serve(self) -> Result<(), Box> { - self.serve_with(tonic::transport::Server::builder()).await + self.serve_with(tonic::transport::Server::default()).await } - #[cfg(feature = "grpc-web")] - /// Starts the gRPC service with the default gRPC settings and gRPC-Web - /// support. - pub async fn serve_with_grpc_web( + /// Starts the gRPC service with the given [`tonic`] gRPC transport server + /// `builder`. + /// + /// The `builder` parameter may be used to configure gRPC-specific settings + /// prior to starting the server. + /// + /// This spawns both the server task and the event aggregation worker + /// task on the current async runtime. + /// + /// [`tonic`]: https://docs.rs/tonic/ + pub async fn serve_with( self, - builder: tonic::transport::Server, - cors: tower_http::cors::CorsLayer, + mut builder: tonic::transport::Server, ) -> Result<(), Box> { let addr = self.addr.clone(); let ServerParts { instrument_server, aggregator, } = self.into_parts(); - let router = builder - .accept_http1(true) - .layer(cors) - .layer(tonic_web::GrpcWebLayer::new()) - .add_service(instrument_server); let aggregate = spawn_named(aggregator.run(), "console::aggregate"); + let router = builder.add_service(instrument_server); let res = match addr { ServerAddr::Tcp(addr) => { let serve = router.serve(addr); @@ -961,27 +963,22 @@ impl Server { res?.map_err(Into::into) } - /// Starts the gRPC service with the given [`tonic`] gRPC transport server - /// `builder`. - /// - /// The `builder` parameter may be used to configure gRPC-specific settings - /// prior to starting the server. - /// - /// This spawns both the server task and the event aggregation worker - /// task on the current async runtime. - /// - /// [`tonic`]: https://docs.rs/tonic/ - pub async fn serve_with( + #[cfg(feature = "grpc-web")] + /// Starts the gRPC service with the default gRPC settings and gRPC-Web + /// support. + pub async fn serve_with_grpc_web( self, - mut builder: tonic::transport::Server, + builder: tonic::transport::Server, ) -> Result<(), Box> { let addr = self.addr.clone(); let ServerParts { instrument_server, aggregator, } = self.into_parts(); + let router = builder + .accept_http1(true) + .add_service(tonic_web::enable(instrument_server)); let aggregate = spawn_named(aggregator.run(), "console::aggregate"); - let router = builder.add_service(instrument_server); let res = match addr { ServerAddr::Tcp(addr) => { let serve = router.serve(addr);