Skip to content

Commit

Permalink
feat(telemeter): support gzip
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Apr 5, 2024
1 parent 9da83d4 commit f332b6b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- coop: expose `coop::consume_budget()` to call in long computations. See documentation of the `coop` module for details.
- coop: prefer a time-based budgeting if the telemetry is enabled.
- proxy: add `Proxy::try_send_to()` and `Proxy::request_to()`.
- telemeter: support gzip.

### Fixed
- telemetry: now `elfo_message_handling_time_seconds` doesn't include the time of task switching if an actor is preempted due to elfo's budget system.
Expand Down
3 changes: 3 additions & 0 deletions elfo-telemeter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ elfo-core = { version = "0.2.0-alpha.14", path = "../elfo-core", features = ["un
tokio = "1"
hyper = { version = "1.0.1", features = ["server", "http1"] }
hyper-util = { version = "0.1.1", features = ["tokio"] }
http-body-util = "0.1"
bytes = "1"
pin-project-lite = "0.2"
serde = { version = "1.0.120", features = ["derive"] }
metrics = "0.17"
Expand All @@ -29,3 +31,4 @@ fxhash = "0.2.1"
humantime-serde = "1"
cow-utils = "0.1.2"
stability = "0.1.1"
flate2 = "1"
72 changes: 65 additions & 7 deletions elfo-telemeter/src/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use std::{
convert::Infallible,
future::Future,
io::{self, Write},
net::SocketAddr,
pin::Pin,
string::ToString,
task::Poll,
time::{Duration, Instant},
};

use hyper::{body::Body, rt, server::conn, service, Method, Request, Response, StatusCode};
use http_body_util::Full;
use hyper::{
body::Body,
header::{HeaderMap, ACCEPT_ENCODING, CONTENT_ENCODING},
rt,
server::conn,
service, Method, Request, Response, StatusCode,
};
use hyper_util::rt::TokioIo;
use pin_project_lite::pin_project;
use tokio::{net::TcpListener, time::timeout};
Expand All @@ -23,9 +31,9 @@ const SERVE_TIMEOUT: Duration = Duration::from_secs(10);

/// Runs a simple HTTP server that responds to `GET /metrics` requests.
/// * It supports only HTTP/1.
/// * It supports gzip compression.
/// * It doesn't support keep-alive connections.
/// * It doesn't support TLS.
/// * It doesn't support compression.
/// * It handles requests one by one with some reasonable timeouts.
pub(crate) async fn server(addr: SocketAddr, ctx: Context) -> ServerFailed {
let listener = match TcpListener::bind(addr).await {
Expand Down Expand Up @@ -68,36 +76,86 @@ pub(crate) async fn server(addr: SocketAddr, ctx: Context) -> ServerFailed {
}
}

type ResBody = Full<io::Cursor<Vec<u8>>>;

// Supports only `GET /metrics` requests.
async fn handle(req: Request<impl Body>, ctx: Context) -> Result<Response<String>, Infallible> {
async fn handle(req: Request<impl Body>, ctx: Context) -> Result<Response<ResBody>, Infallible> {
if req.method() != Method::GET {
return Ok(Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(String::new())
.body(<_>::default())
.unwrap());
}

if req.uri().path() != "/metrics" {
return Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(String::new())
.body(<_>::default())
.unwrap());
}

let use_gzip = use_gzip(req.headers());

ctx.request_to(ctx.addr(), Render)
.resolve()
.await
.map(|Rendered(text)| Response::new(text))
.map(|Rendered(text)| {
let builder = Response::builder();

let gzipped = if use_gzip {
match try_gzip(text.as_bytes()) {
Ok(gzipped) => Some(gzipped),
Err(err) => {
warn!(error = %err, "failed to gzip metrics, sending uncompressed");
None
}
}
} else {
None
};

if let Some(gzipped) = gzipped {
builder
.header(CONTENT_ENCODING, "gzip")
.body(into_res_body(gzipped))
} else {
builder.body(into_res_body(text.into_bytes()))
}
.unwrap()
})
.or_else(|err| {
warn!(error = %err, "failed to render metrics for HTTP response");

Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(String::new())
.body(<_>::default())
.unwrap())
})
}

fn use_gzip(headers: &HeaderMap) -> bool {
let Some(encoding) = headers.get(ACCEPT_ENCODING) else {
return false;
};

let Ok(encoding) = encoding.to_str() else {
return false;
};

encoding.contains("gzip")
}

fn try_gzip(data: &[u8]) -> io::Result<Vec<u8>> {
let out = Vec::with_capacity(data.len() / 4); // good enough estimation
let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::default());
encoder.write_all(data)?;
encoder.finish()
}

fn into_res_body(data: Vec<u8>) -> ResBody {
Full::new(io::Cursor::new(data))
}

fn flat_error(res: Result<Result<(), impl ToString>, impl ToString>) -> Result<(), String> {
match res {
Ok(Ok(())) => Ok(()),
Expand Down

0 comments on commit f332b6b

Please sign in to comment.