diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b138e6b..a769d49d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - coop: expose `coop::consume_budget()` to call in long computations. See documentation of the `coop` module for details. - coop: prefer a time-based budgeting if the telemetry is enabled. - proxy: add `Proxy::try_send_to()` and `Proxy::request_to()`. +- telemeter: support gzip. ### Fixed - telemetry: now `elfo_message_handling_time_seconds` doesn't include the time of task switching if an actor is preempted due to elfo's budget system. diff --git a/elfo-telemeter/Cargo.toml b/elfo-telemeter/Cargo.toml index 363afd02..ea4a286e 100644 --- a/elfo-telemeter/Cargo.toml +++ b/elfo-telemeter/Cargo.toml @@ -19,6 +19,8 @@ elfo-core = { version = "0.2.0-alpha.14", path = "../elfo-core", features = ["un tokio = "1" hyper = { version = "1.0.1", features = ["server", "http1"] } hyper-util = { version = "0.1.1", features = ["tokio"] } +http-body-util = "0.1" +bytes = "1" pin-project-lite = "0.2" serde = { version = "1.0.120", features = ["derive"] } metrics = "0.17" @@ -29,3 +31,4 @@ fxhash = "0.2.1" humantime-serde = "1" cow-utils = "0.1.2" stability = "0.1.1" +flate2 = "1" diff --git a/elfo-telemeter/src/hyper.rs b/elfo-telemeter/src/hyper.rs index 39ea70c2..be0d73ce 100644 --- a/elfo-telemeter/src/hyper.rs +++ b/elfo-telemeter/src/hyper.rs @@ -1,6 +1,7 @@ use std::{ convert::Infallible, future::Future, + io::{self, Write}, net::SocketAddr, pin::Pin, string::ToString, @@ -8,7 +9,14 @@ use std::{ time::{Duration, Instant}, }; -use hyper::{body::Body, rt, server::conn, service, Method, Request, Response, StatusCode}; +use http_body_util::Full; +use hyper::{ + body::Body, + header::{HeaderMap, ACCEPT_ENCODING, CONTENT_ENCODING}, + rt, + server::conn, + service, Method, Request, Response, StatusCode, +}; use hyper_util::rt::TokioIo; use pin_project_lite::pin_project; use tokio::{net::TcpListener, time::timeout}; @@ -23,9 +31,9 @@ const SERVE_TIMEOUT: Duration = Duration::from_secs(10); /// Runs a simple HTTP server that responds to `GET /metrics` requests. /// * It supports only HTTP/1. +/// * It supports gzip compression. /// * It doesn't support keep-alive connections. /// * It doesn't support TLS. -/// * It doesn't support compression. /// * It handles requests one by one with some reasonable timeouts. pub(crate) async fn server(addr: SocketAddr, ctx: Context) -> ServerFailed { let listener = match TcpListener::bind(addr).await { @@ -68,36 +76,86 @@ pub(crate) async fn server(addr: SocketAddr, ctx: Context) -> ServerFailed { } } +type ResBody = Full>>; + // Supports only `GET /metrics` requests. -async fn handle(req: Request, ctx: Context) -> Result, Infallible> { +async fn handle(req: Request, ctx: Context) -> Result, Infallible> { if req.method() != Method::GET { return Ok(Response::builder() .status(StatusCode::METHOD_NOT_ALLOWED) - .body(String::new()) + .body(<_>::default()) .unwrap()); } if req.uri().path() != "/metrics" { return Ok(Response::builder() .status(StatusCode::NOT_FOUND) - .body(String::new()) + .body(<_>::default()) .unwrap()); } + let use_gzip = use_gzip(req.headers()); + ctx.request_to(ctx.addr(), Render) .resolve() .await - .map(|Rendered(text)| Response::new(text)) + .map(|Rendered(text)| { + let builder = Response::builder(); + + let gzipped = if use_gzip { + match try_gzip(text.as_bytes()) { + Ok(gzipped) => Some(gzipped), + Err(err) => { + warn!(error = %err, "failed to gzip metrics, sending uncompressed"); + None + } + } + } else { + None + }; + + if let Some(gzipped) = gzipped { + builder + .header(CONTENT_ENCODING, "gzip") + .body(into_res_body(gzipped)) + } else { + builder.body(into_res_body(text.into_bytes())) + } + .unwrap() + }) .or_else(|err| { warn!(error = %err, "failed to render metrics for HTTP response"); Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(String::new()) + .body(<_>::default()) .unwrap()) }) } +fn use_gzip(headers: &HeaderMap) -> bool { + let Some(encoding) = headers.get(ACCEPT_ENCODING) else { + return false; + }; + + let Ok(encoding) = encoding.to_str() else { + return false; + }; + + encoding.contains("gzip") +} + +fn try_gzip(data: &[u8]) -> io::Result> { + let out = Vec::with_capacity(data.len() / 4); // good enough estimation + let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::default()); + encoder.write_all(data)?; + encoder.finish() +} + +fn into_res_body(data: Vec) -> ResBody { + Full::new(io::Cursor::new(data)) +} + fn flat_error(res: Result, impl ToString>) -> Result<(), String> { match res { Ok(Ok(())) => Ok(()),