Skip to content

Commit

Permalink
syn trace body with tower-http (david's work)
Browse files Browse the repository at this point in the history
now on_eos is called again, woohoo
  • Loading branch information
glendc committed Nov 20, 2023
1 parent c3df76a commit 13b210c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 35 deletions.
64 changes: 33 additions & 31 deletions tower-async-http/src/trace/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,51 @@ where
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.project();
let _guard = this.span.enter();

let result = if let Some(result) = ready!(this.inner.poll_frame(cx)) {
result
} else {
return Poll::Ready(None);
};
let result = ready!(this.inner.poll_frame(cx));

let latency = this.start.elapsed();
*this.start = Instant::now();

match &result {
Ok(frame) => {
if let Some(data) = frame.data_ref() {
this.on_body_chunk.on_body_chunk(data, latency, this.span);
}
match result {
Some(Ok(frame)) => {
let frame = match frame.into_data() {
Ok(chunk) => {
this.on_body_chunk.on_body_chunk(&chunk, latency, this.span);
Frame::data(chunk)
}
Err(frame) => frame,
};

let frame = match frame.into_trailers() {
Ok(trailers) => {
if let Some((on_eos, stream_start)) = this.on_eos.take() {
on_eos.on_eos(Some(&trailers), stream_start.elapsed(), this.span);
}
Frame::trailers(trailers)
}
Err(frame) => frame,
};

Poll::Ready(Some(Ok(frame)))
}
Err(err) => {
Some(Err(err)) => {
if let Some((classify_eos, on_failure)) =
this.classify_eos.take().zip(this.on_failure.take())
{
let failure_class = classify_eos.classify_error(err);
let failure_class = classify_eos.classify_error(&err);
on_failure.on_failure(failure_class, latency, this.span);
}

Poll::Ready(Some(Err(err)))
}
}
None => {
if let Some((on_eos, stream_start)) = this.on_eos.take() {
on_eos.on_eos(None, stream_start.elapsed(), this.span);
}

Poll::Ready(Some(result))
Poll::Ready(None)
}
}
}

fn is_end_stream(&self) -> bool {
Expand All @@ -83,19 +101,3 @@ where
self.inner.size_hint()
}
}

impl<B: Default, C: Default, OnBodyChunk: Default, OnEos: Default, OnFailure: Default> Default
for ResponseBody<B, C, OnBodyChunk, OnEos, OnFailure>
{
fn default() -> Self {
Self {
inner: Default::default(),
classify_eos: Default::default(),
on_eos: Default::default(),
on_body_chunk: Default::default(),
on_failure: Default::default(),
start: Instant::now(),
span: Span::current(),
}
}
}
5 changes: 1 addition & 4 deletions tower-async-http/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@
//! ### `on_eos`
//!
//! The `on_eos` callback is called when a streaming response body ends, that is
//! when `http_body::Body::poll_trailers` returns `Poll::Ready(Ok(trailers))`.
//! TODO: is this still used?!
//! when `http_body::Body::poll_frame` returns `Poll::Ready(None)`.
//!
//! `on_eos` is called even if the trailers produced are `None`.
//!
Expand All @@ -231,7 +230,6 @@
//! - The inner [`Service`]'s response future resolves to an error.
//! - A response is classified as a failure.
//! - [`http_body::Body::poll_frame`] returns an error.
//! // TODO: is poll_trailers correctly transitioned?!
//! - An end-of-stream is classified as a failure.
//!
//! # Recording fields on the span
Expand Down Expand Up @@ -378,7 +376,6 @@
//! [`TraceLayer::make_span_with`]: crate::trace::TraceLayer::make_span_with
//! [`Span`]: tracing::Span
//! [`ServerErrorsAsFailures`]: crate::classify::ServerErrorsAsFailures
//! TODO: is on_eos still ever called?

use std::{fmt, time::Duration};

Expand Down

0 comments on commit 13b210c

Please sign in to comment.