diff --git a/CHANGELOG.md b/CHANGELOG.md index f5e1c96b..e68dd508 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate ### Changed -- deps: update `quanta` to v0.12 and `erased-serde` to v0.4. +- deps: update `quanta` to v0.12, `hyper` to v1 and `erased-serde` to v0.4. - context: slightly improve performance of time measurements. - utils: slightly improve performance of `RateLimiter`. +- telemeter: handle only `GET /metrics` requests. +- telemeter: rename the `address` config parameter to `listen` with alias to the old name. + +### Fixed +- telemeter: avoid spawning tasks for a HTTP server, it improves common metrics. ## [0.2.0-alpha.11] - 2023-11-10 ### Added diff --git a/elfo-telemeter/Cargo.toml b/elfo-telemeter/Cargo.toml index 9d3b6a9e..e1cb422a 100644 --- a/elfo-telemeter/Cargo.toml +++ b/elfo-telemeter/Cargo.toml @@ -17,7 +17,9 @@ unstable = [] elfo-core = { version = "0.2.0-alpha.11", path = "../elfo-core", features = ["unstable"] } # TODO: do not need tokio = "1" -hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] } +hyper = { version = "1.0.1", features = ["server", "http1"] } +hyper-util = { version = "0.1.1", features = ["tokio"] } +pin-project-lite = "0.2" serde = { version = "1.0.120", features = ["derive"] } metrics = "0.17" metrics-util = "0.10" diff --git a/elfo-telemeter/src/actor.rs b/elfo-telemeter/src/actor.rs index 21973940..8bda21d6 100644 --- a/elfo-telemeter/src/actor.rs +++ b/elfo-telemeter/src/actor.rs @@ -1,17 +1,17 @@ use std::sync::Arc; use metrics::gauge; -use tokio::task::JoinHandle; use tracing::{error, info}; use elfo_core::{ - message, messages::ConfigUpdated, msg, scope, time::Interval, tracing::TraceId, ActorGroup, - Blueprint, Context, MoveOwnership, + message, messages::ConfigUpdated, msg, stream::Stream, time::Interval, ActorGroup, Blueprint, + Context, SourceHandle, }; use crate::{ config::{Config, Retention, Sink}, - protocol::{GetSnapshot, Snapshot}, + hyper, + protocol::{GetSnapshot, Render, Rendered, ServerFailed, Snapshot}, render::Renderer, storage::Storage, }; @@ -19,23 +19,15 @@ use crate::{ struct Telemeter { ctx: Context, interval: Interval, + server: Option>, storage: Arc, snapshot: Arc, renderer: Renderer, } -#[message(ret = Rendered)] -struct Render; - -#[message] -struct Rendered(#[serde(serialize_with = "elfo_core::dumping::hide")] String); - #[message] struct CompactionTick; -#[message] -struct ServerFailed(MoveOwnership); - pub(crate) fn new(storage: Arc) -> Blueprint { ActorGroup::new() .config::() @@ -50,6 +42,7 @@ impl Telemeter { Self { interval: ctx.attach(Interval::new(CompactionTick)), + server: None, storage, snapshot: Default::default(), renderer, @@ -61,8 +54,8 @@ impl Telemeter { // Now only prometheus is supported. assert_eq!(self.ctx.config().sink, Sink::Prometheus); - let mut address = self.ctx.config().address; - let mut server = start_server(&self.ctx); + let mut listen = self.ctx.config().listen; + self.start_server(); self.interval.start(self.ctx.config().compaction_interval); @@ -71,14 +64,17 @@ impl Telemeter { ConfigUpdated => { let config = self.ctx.config(); - if config.address != address { - info!("address changed, rerun the server"); - server.abort(); - address = config.address; - server = start_server(&self.ctx); - } - self.renderer.configure(config); + + if config.listen != listen { + info!( + message = "listen address changed, rerun the server", + old = %listen, + new = %config.listen, + ); + listen = config.listen; + self.start_server(); + } } (GetSnapshot, token) => { // Rendering includes compaction, skip extra compaction tick. @@ -105,14 +101,12 @@ impl Telemeter { CompactionTick => { self.fill_snapshot(/* only_histograms = */ true); } - ServerFailed(error) => { - error!(error = %&error.take().unwrap(), "server failed"); - panic!("server failed"); + ServerFailed(err) => { + error!(error = %err, "server failed"); + panic!("server failed, cannot continue"); } }); } - - server.abort(); } fn fill_snapshot(&mut self, only_histograms: bool) { @@ -130,59 +124,18 @@ impl Telemeter { let snapshot = Arc::make_mut(&mut self.snapshot); snapshot.distributions_mut().for_each(|d| d.reset()); } -} -fn start_server(ctx: &Context) -> JoinHandle<()> { - use hyper::{ - server::{conn::AddrStream, Server}, - service::{make_service_fn, service_fn}, - Body, Error as HyperError, Response, - }; - - let address = ctx.config().address; - let ctx = Arc::new(ctx.pruned()); - let ctx1 = ctx.clone(); - - let scope = scope::expose(); - let scope1 = scope.clone(); - - let serving = async move { - let server = Server::try_bind(&address)?; - let make_svc = make_service_fn(move |_socket: &AddrStream| { - let ctx = ctx.clone(); - let scope = scope.clone(); - - async move { - Ok::<_, HyperError>(service_fn(move |_| { - let ctx = ctx.clone(); - let scope = scope.clone(); - - let f = async move { - let Rendered(output) = ctx - .request_to(ctx.addr(), Render) - .resolve() - .await - .expect("failed to send to the telemeter"); - - Ok::<_, HyperError>(Response::new(Body::from(output))) - }; - - scope.set_trace_id(TraceId::generate()); - scope.within(f) - })) - } - }); - server.serve(make_svc).await - }; - - tokio::spawn(async move { - if let Err(err) = serving.await { - let f = async { - let _ = ctx1.send_to(ctx1.group(), ServerFailed(err.into())).await; - }; - - scope1.set_trace_id(TraceId::generate()); - scope1.within(f).await; + fn start_server(&mut self) { + // Terminate a running server. + if let Some(source) = self.server.take() { + source.terminate(); } - }) + + // Start a new one. + let listen = self.ctx.config().listen; + let pruned_ctx = self.ctx.pruned(); + let source = Stream::once(hyper::server(listen, pruned_ctx)); + + self.server = Some(self.ctx.attach(source)); + } } diff --git a/elfo-telemeter/src/config.rs b/elfo-telemeter/src/config.rs index acf9c8c7..1962c31b 100644 --- a/elfo-telemeter/src/config.rs +++ b/elfo-telemeter/src/config.rs @@ -7,7 +7,8 @@ pub(crate) struct Config { /// The sink's type. pub(crate) sink: Sink, /// The address to expose for scraping. - pub(crate) address: SocketAddr, + #[serde(alias = "address")] + pub(crate) listen: SocketAddr, /// How long samples should be considered in summaries. #[serde(default)] pub(crate) retention: Retention, diff --git a/elfo-telemeter/src/hyper.rs b/elfo-telemeter/src/hyper.rs new file mode 100644 index 00000000..39ea70c2 --- /dev/null +++ b/elfo-telemeter/src/hyper.rs @@ -0,0 +1,157 @@ +use std::{ + convert::Infallible, + future::Future, + net::SocketAddr, + pin::Pin, + string::ToString, + task::Poll, + time::{Duration, Instant}, +}; + +use hyper::{body::Body, rt, server::conn, service, Method, Request, Response, StatusCode}; +use hyper_util::rt::TokioIo; +use pin_project_lite::pin_project; +use tokio::{net::TcpListener, time::timeout}; +use tracing::{debug, info, warn}; + +use elfo_core::{scope, tracing::TraceId, Context}; + +use crate::protocol::{Render, Rendered, ServerFailed}; + +const HEADER_READ_TIMEOUT: Duration = Duration::from_secs(3); +const SERVE_TIMEOUT: Duration = Duration::from_secs(10); + +/// Runs a simple HTTP server that responds to `GET /metrics` requests. +/// * It supports only HTTP/1. +/// * It doesn't support keep-alive connections. +/// * It doesn't support TLS. +/// * It doesn't support compression. +/// * It handles requests one by one with some reasonable timeouts. +pub(crate) async fn server(addr: SocketAddr, ctx: Context) -> ServerFailed { + let listener = match TcpListener::bind(addr).await { + Ok(listener) => listener, + Err(err) => return ServerFailed(format!("cannot bind a listener: {err}")), + }; + + info!(bind = %addr, "listening TCP connections"); + + loop { + let (stream, peer) = match listener.accept().await { + Ok(pair) => pair, + Err(err) => return ServerFailed(format!("cannot accept a connection: {err}")), + }; + + // The server doesn't support keep-alive connections, so every connection is a + // new request. Thus, we can start a new trace right here. + scope::set_trace_id(TraceId::generate()); + + debug!(peer = %peer, "accepted a TCP connection"); + let ctx = ctx.clone(); + + let serving = conn::http1::Builder::new() + .timer(TokioTimer) + .keep_alive(false) // KA is meaningless for rare requests. + .header_read_timeout(HEADER_READ_TIMEOUT) + .serve_connection( + TokioIo::new(stream), + service::service_fn(move |req| handle(req, ctx.clone())), + ); + + match flat_error(timeout(SERVE_TIMEOUT, serving).await) { + Ok(()) => debug!(peer = %peer, "finished serving a HTTP connection"), + Err(err) => warn!( + message = "failed to serve a HTTP connection", + error = %err, + peer = %peer, + ), + } + } +} + +// Supports only `GET /metrics` requests. +async fn handle(req: Request, ctx: Context) -> Result, Infallible> { + if req.method() != Method::GET { + return Ok(Response::builder() + .status(StatusCode::METHOD_NOT_ALLOWED) + .body(String::new()) + .unwrap()); + } + + if req.uri().path() != "/metrics" { + return Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(String::new()) + .unwrap()); + } + + ctx.request_to(ctx.addr(), Render) + .resolve() + .await + .map(|Rendered(text)| Response::new(text)) + .or_else(|err| { + warn!(error = %err, "failed to render metrics for HTTP response"); + + Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(String::new()) + .unwrap()) + }) +} + +fn flat_error(res: Result, impl ToString>) -> Result<(), String> { + match res { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err.to_string()), + Err(err) => Err(err.to_string()), + } +} + +// === TokioTimer === +// TODO: Replace once https://github.com/hyperium/hyper-util/pull/73 is released. +// Don't forget to remove `pin-project-lite` from `Cargo.toml`. + +#[derive(Clone, Debug)] +struct TokioTimer; + +impl rt::Timer for TokioTimer { + fn sleep(&self, duration: Duration) -> Pin> { + Box::pin(TokioSleep { + inner: tokio::time::sleep(duration), + }) + } + + fn sleep_until(&self, deadline: Instant) -> Pin> { + Box::pin(TokioSleep { + inner: tokio::time::sleep_until(deadline.into()), + }) + } + + fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { + if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { + sleep.reset(new_deadline) + } + } +} + +pin_project! { + struct TokioSleep { + #[pin] + inner: tokio::time::Sleep, + } +} + +impl Future for TokioSleep { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + self.project().inner.poll(cx) + } +} + +impl rt::Sleep for TokioSleep {} + +impl TokioSleep { + fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.project().inner.as_mut().reset(deadline.into()); + } +} diff --git a/elfo-telemeter/src/lib.rs b/elfo-telemeter/src/lib.rs index b95b858f..86ab3529 100644 --- a/elfo-telemeter/src/lib.rs +++ b/elfo-telemeter/src/lib.rs @@ -27,6 +27,7 @@ pub mod protocol; mod actor; mod config; +mod hyper; mod recorder; mod render; mod storage; diff --git a/elfo-telemeter/src/protocol.rs b/elfo-telemeter/src/protocol.rs index f50586e3..06b745d5 100644 --- a/elfo-telemeter/src/protocol.rs +++ b/elfo-telemeter/src/protocol.rs @@ -8,6 +8,15 @@ use metrics_util::Summary; use elfo_core::{message, ActorMeta, Local}; +#[message(ret = Rendered)] +pub(crate) struct Render; + +#[message] +pub(crate) struct Rendered(#[serde(serialize_with = "elfo_core::dumping::hide")] pub(crate) String); + +#[message] +pub(crate) struct ServerFailed(pub(crate) String); + /// A command to get actual snapshot of all metrics. /// The response is restricted to be local only for now. #[message(ret = Local>)] diff --git a/examples/examples/usage/config.toml b/examples/examples/usage/config.toml index 7a56c205..8f8eee2d 100644 --- a/examples/examples/usage/config.toml +++ b/examples/examples/usage/config.toml @@ -31,7 +31,7 @@ [system.telemeters] sink = "Prometheus" -address = "0.0.0.0:9042" +listen = "0.0.0.0:9042" #global_labels = [["label", "value"]] #quantiles = [0.75, 0.9, 0.95, 0.99]