From 13b210c37189b1719dd74f9dba4014ab28884fc4 Mon Sep 17 00:00:00 2001 From: glendc Date: Mon, 20 Nov 2023 15:10:24 +0100 Subject: [PATCH] syn trace body with tower-http (david's work) now on_eos is called again, woohoo --- tower-async-http/src/trace/body.rs | 64 +++++++++++++++--------------- tower-async-http/src/trace/mod.rs | 5 +-- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/tower-async-http/src/trace/body.rs b/tower-async-http/src/trace/body.rs index a7acdc8..6b4b142 100644 --- a/tower-async-http/src/trace/body.rs +++ b/tower-async-http/src/trace/body.rs @@ -46,33 +46,51 @@ where ) -> Poll, Self::Error>>> { let this = self.project(); let _guard = this.span.enter(); - - let result = if let Some(result) = ready!(this.inner.poll_frame(cx)) { - result - } else { - return Poll::Ready(None); - }; + let result = ready!(this.inner.poll_frame(cx)); let latency = this.start.elapsed(); *this.start = Instant::now(); - match &result { - Ok(frame) => { - if let Some(data) = frame.data_ref() { - this.on_body_chunk.on_body_chunk(data, latency, this.span); - } + match result { + Some(Ok(frame)) => { + let frame = match frame.into_data() { + Ok(chunk) => { + this.on_body_chunk.on_body_chunk(&chunk, latency, this.span); + Frame::data(chunk) + } + Err(frame) => frame, + }; + + let frame = match frame.into_trailers() { + Ok(trailers) => { + if let Some((on_eos, stream_start)) = this.on_eos.take() { + on_eos.on_eos(Some(&trailers), stream_start.elapsed(), this.span); + } + Frame::trailers(trailers) + } + Err(frame) => frame, + }; + + Poll::Ready(Some(Ok(frame))) } - Err(err) => { + Some(Err(err)) => { if let Some((classify_eos, on_failure)) = this.classify_eos.take().zip(this.on_failure.take()) { - let failure_class = classify_eos.classify_error(err); + let failure_class = classify_eos.classify_error(&err); on_failure.on_failure(failure_class, latency, this.span); } + + Poll::Ready(Some(Err(err))) } - } + None => { + if let Some((on_eos, stream_start)) = this.on_eos.take() { + on_eos.on_eos(None, stream_start.elapsed(), this.span); + } - Poll::Ready(Some(result)) + Poll::Ready(None) + } + } } fn is_end_stream(&self) -> bool { @@ -83,19 +101,3 @@ where self.inner.size_hint() } } - -impl Default - for ResponseBody -{ - fn default() -> Self { - Self { - inner: Default::default(), - classify_eos: Default::default(), - on_eos: Default::default(), - on_body_chunk: Default::default(), - on_failure: Default::default(), - start: Instant::now(), - span: Span::current(), - } - } -} diff --git a/tower-async-http/src/trace/mod.rs b/tower-async-http/src/trace/mod.rs index d86748d..3695ee5 100644 --- a/tower-async-http/src/trace/mod.rs +++ b/tower-async-http/src/trace/mod.rs @@ -219,8 +219,7 @@ //! ### `on_eos` //! //! The `on_eos` callback is called when a streaming response body ends, that is -//! when `http_body::Body::poll_trailers` returns `Poll::Ready(Ok(trailers))`. -//! TODO: is this still used?! +//! when `http_body::Body::poll_frame` returns `Poll::Ready(None)`. //! //! `on_eos` is called even if the trailers produced are `None`. //! @@ -231,7 +230,6 @@ //! - The inner [`Service`]'s response future resolves to an error. //! - A response is classified as a failure. //! - [`http_body::Body::poll_frame`] returns an error. -//! // TODO: is poll_trailers correctly transitioned?! //! - An end-of-stream is classified as a failure. //! //! # Recording fields on the span @@ -378,7 +376,6 @@ //! [`TraceLayer::make_span_with`]: crate::trace::TraceLayer::make_span_with //! [`Span`]: tracing::Span //! [`ServerErrorsAsFailures`]: crate::classify::ServerErrorsAsFailures -//! TODO: is on_eos still ever called? use std::{fmt, time::Duration};