Skip to content

Commit

Permalink
fix: do not depend on cors
Browse files Browse the repository at this point in the history
Signed-off-by: hi-rustin <[email protected]>
  • Loading branch information
Rustin170506 committed Feb 4, 2024
1 parent d65a852 commit 8476963
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 170 deletions.
4 changes: 2 additions & 2 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ keywords = [
default = ["env-filter"]
parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"]
env-filter = ["tracing-subscriber/env-filter"]
grpc-web = ["tonic-web", "tower-http"]
grpc-web = ["tonic-web"]

[dependencies]
crossbeam-utils = "0.8.7"
Expand All @@ -55,13 +55,13 @@ crossbeam-channel = "0.5"

# Only for the web feature:
tonic-web = { version = "0.10.2", optional = true }
tower-http = { version = "0.4", features = ["cors"], optional = true }

[dev-dependencies]
tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] }
tower = { version = "0.4", default-features = false }
futures = "0.3"
http = "0.2"
tower-http = { version = "0.4", features = ["cors"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
195 changes: 63 additions & 132 deletions console-subscriber/examples/grpc_web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,101 +4,80 @@
//! ```sh
//! cargo run --example grpc_web --features grpc-web
//! ```
use std::time::Duration;
use std::{thread, time::Duration};

use console_subscriber::ConsoleLayer;
use console_subscriber::{ConsoleLayer, ServerParts};
use http::header::HeaderName;
use tonic_web::GrpcWebLayer;
use tower_http::cors::{AllowOrigin, CorsLayer};

static HELP: &str = r#"
Example console-instrumented app with gRPC-Web support.
USAGE:
app [OPTIONS]
OPTIONS:
-h, help prints this message
blocks Includes a (misbehaving) blocking task
burn Includes a (misbehaving) task that spins CPU with self-wakes
coma Includes a (misbehaving) task that forgets to register a waker
noyield Includes a (misbehaving) task that spawns tasks that never yield
"#;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
["grpc-status", "grpc-message", "grpc-status-details-bin"];
const DEFAULT_ALLOW_HEADERS: [&str; 4] =
["x-grpc-web", "content-type", "x-user-agent", "grpc-timeout"];
const DEFAULT_ALLOW_HEADERS: [&str; 5] = [
"x-grpc-web",
"content-type",
"x-user-agent",
"grpc-timeout",
"user-agent",
];

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cors = CorsLayer::new()
.allow_origin(AllowOrigin::mirror_request())
.allow_credentials(true)
.max_age(DEFAULT_MAX_AGE)
.expose_headers(
DEFAULT_EXPOSED_HEADERS
.iter()
.cloned()
.map(HeaderName::from_static)
.collect::<Vec<HeaderName>>(),
)
.allow_headers(
DEFAULT_ALLOW_HEADERS
.iter()
.cloned()
.map(HeaderName::from_static)
.collect::<Vec<HeaderName>>(),
);
ConsoleLayer::builder()
.with_default_env()
.with_cors(cors)
.init();
// spawn optional extras from CLI args
// skip first which is command name
for opt in std::env::args().skip(1) {
match &*opt {
"blocks" => {
tokio::task::Builder::new()
.name("blocks")
.spawn(double_sleepy(1, 10))
.unwrap();
}
"coma" => {
tokio::task::Builder::new()
.name("coma")
.spawn(std::future::pending::<()>())
.unwrap();
}
"burn" => {
tokio::task::Builder::new()
.name("burn")
.spawn(burn(1, 10))
.unwrap();
}
"noyield" => {
tokio::task::Builder::new()
.name("noyield")
.spawn(no_yield(20))
.unwrap();
}
"blocking" => {
tokio::task::Builder::new()
.name("spawns_blocking")
.spawn(spawn_blocking(5))
.unwrap();
}
"help" | "-h" => {
eprintln!("{}", HELP);
return Ok(());
}
wat => {
return Err(
format!("unknown option: {:?}, run with '-h' to see options", wat).into(),
let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
thread::Builder::new()
.name("subscriber".into())
.spawn(move || {
// Do not trace anything in this thread.
let _subscriber_guard;
_subscriber_guard =
tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default());
// Custom CORS configuration.
let cors = CorsLayer::new()
.allow_origin(AllowOrigin::mirror_request())
.allow_credentials(true)
.max_age(DEFAULT_MAX_AGE)
.expose_headers(
DEFAULT_EXPOSED_HEADERS
.iter()
.cloned()
.map(HeaderName::from_static)
.collect::<Vec<HeaderName>>(),
)
}
}
}
.allow_headers(
DEFAULT_ALLOW_HEADERS
.iter()
.cloned()
.map(HeaderName::from_static)
.collect::<Vec<HeaderName>>(),
);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("console subscriber runtime initialization failed");
runtime.block_on(async move {
let ServerParts {
instrument_server,
aggregator,
..
} = server.into_parts();
tokio::spawn(aggregator.run());
let router = tonic::transport::Server::builder()
// Accept gRPC-Web requests and enable CORS.
.accept_http1(true)
.layer(cors)
.layer(GrpcWebLayer::new())
.add_service(instrument_server);
let serve = router.serve(std::net::SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
9999,
));
serve.await.expect("console subscriber server failed");
});
})
.expect("console subscriber could not spawn thread");
tracing_subscriber::registry().with(console_layer).init();

let task1 = tokio::task::Builder::new()
.name("task1")
Expand Down Expand Up @@ -141,51 +120,3 @@ async fn wait(seconds: u64) {
tokio::time::sleep(Duration::from_secs(seconds)).await;
tracing::trace!("done!");
}

#[tracing::instrument]
async fn double_sleepy(min: u64, max: u64) {
loop {
for i in min..max {
// woops!
std::thread::sleep(Duration::from_secs(i));
tokio::time::sleep(Duration::from_secs(max - i)).await;
}
}
}

#[tracing::instrument]
async fn burn(min: u64, max: u64) {
loop {
for i in min..max {
for _ in 0..i {
tokio::task::yield_now().await;
}
tokio::time::sleep(Duration::from_secs(i - min)).await;
}
}
}

#[tracing::instrument]
async fn no_yield(seconds: u64) {
loop {
let handle = tokio::task::Builder::new()
.name("greedy")
.spawn(async move {
std::thread::sleep(Duration::from_secs(seconds));
})
.expect("Couldn't spawn greedy task");

_ = handle.await;
}
}

#[tracing::instrument]
async fn spawn_blocking(seconds: u64) {
loop {
let seconds = seconds;
_ = tokio::task::spawn_blocking(move || {
std::thread::sleep(Duration::from_secs(seconds));
})
.await;
}
}
19 changes: 8 additions & 11 deletions console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ pub struct Builder {
pub(super) scheduled_duration_max: Duration,

#[cfg(feature = "grpc-web")]
/// Cors layer for grpc-web.
cors_layer: Option<tower_http::cors::CorsLayer>,
/// Whether to enable the grpc-web support.
enable_grpc_web: bool,
}

impl Default for Builder {
Expand All @@ -76,7 +76,7 @@ impl Default for Builder {
filter_env_var: "RUST_LOG".to_string(),
self_trace: false,
#[cfg(feature = "grpc-web")]
cors_layer: None,
enable_grpc_web: false,
}
}
}
Expand Down Expand Up @@ -275,9 +275,9 @@ impl Builder {
}

#[cfg(feature = "grpc-web")]
pub fn with_cors(self, cors: tower_http::cors::CorsLayer) -> Self {
pub fn enable_grpc_web(self, enable_grpc_web: bool) -> Self {
Self {
cors_layer: Some(cors),
enable_grpc_web,
..self
}
}
Expand Down Expand Up @@ -496,7 +496,7 @@ impl Builder {

let self_trace = self.self_trace;
#[cfg(feature = "grpc-web")]
let cors_layer = self.cors_layer.clone();
let enable_grpc_web = self.enable_grpc_web;

let (layer, server) = self.build();
let filter =
Expand All @@ -519,12 +519,9 @@ impl Builder {
.expect("console subscriber runtime initialization failed");
runtime.block_on(async move {
#[cfg(feature = "grpc-web")]
if cors_layer.is_some() {
if enable_grpc_web {
server
.serve_with_grpc_web(
tonic::transport::Server::builder(),
cors_layer.unwrap(),
)
.serve_with_grpc_web(tonic::transport::Server::builder())
.await
.expect("console subscriber server failed");
return;
Expand Down
47 changes: 22 additions & 25 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,28 +923,30 @@ impl Server {
/// ```
/// [`serve_with`]: Server::serve_with
pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
self.serve_with(tonic::transport::Server::builder()).await
self.serve_with(tonic::transport::Server::default()).await
}

#[cfg(feature = "grpc-web")]
/// Starts the gRPC service with the default gRPC settings and gRPC-Web
/// support.
pub async fn serve_with_grpc_web(
/// Starts the gRPC service with the given [`tonic`] gRPC transport server
/// `builder`.
///
/// The `builder` parameter may be used to configure gRPC-specific settings
/// prior to starting the server.
///
/// This spawns both the server task and the event aggregation worker
/// task on the current async runtime.
///
/// [`tonic`]: https://docs.rs/tonic/
pub async fn serve_with(
self,
builder: tonic::transport::Server,
cors: tower_http::cors::CorsLayer,
mut builder: tonic::transport::Server,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let addr = self.addr.clone();
let ServerParts {
instrument_server,
aggregator,
} = self.into_parts();
let router = builder
.accept_http1(true)
.layer(cors)
.layer(tonic_web::GrpcWebLayer::new())
.add_service(instrument_server);
let aggregate = spawn_named(aggregator.run(), "console::aggregate");
let router = builder.add_service(instrument_server);
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
Expand All @@ -961,27 +963,22 @@ impl Server {
res?.map_err(Into::into)
}

/// Starts the gRPC service with the given [`tonic`] gRPC transport server
/// `builder`.
///
/// The `builder` parameter may be used to configure gRPC-specific settings
/// prior to starting the server.
///
/// This spawns both the server task and the event aggregation worker
/// task on the current async runtime.
///
/// [`tonic`]: https://docs.rs/tonic/
pub async fn serve_with(
#[cfg(feature = "grpc-web")]
/// Starts the gRPC service with the default gRPC settings and gRPC-Web
/// support.
pub async fn serve_with_grpc_web(
self,
mut builder: tonic::transport::Server,
builder: tonic::transport::Server,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let addr = self.addr.clone();
let ServerParts {
instrument_server,
aggregator,
} = self.into_parts();
let router = builder
.accept_http1(true)
.add_service(tonic_web::enable(instrument_server));
let aggregate = spawn_named(aggregator.run(), "console::aggregate");
let router = builder.add_service(instrument_server);
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
Expand Down

0 comments on commit 8476963

Please sign in to comment.