From 2b20304161198a335352162376ba7929208ad3e8 Mon Sep 17 00:00:00 2001 From: Phil Date: Thu, 2 Nov 2023 10:28:45 -0400 Subject: [PATCH] airbyte-to-flow: use write_all instead of write for responses This fixes an issue that was seen in production where the output of airbyte-to-flow could not be parsed as JSON. The source of the issue seems likely to be that ATF is calling `write` instead of `write_all` when it writes the responses to stdout (the return value of `write` was ignored). --- airbyte-to-flow/src/connector_runner.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/airbyte-to-flow/src/connector_runner.rs b/airbyte-to-flow/src/connector_runner.rs index 847704b9105b..b090378a5e1b 100644 --- a/airbyte-to-flow/src/connector_runner.rs +++ b/airbyte-to-flow/src/connector_runner.rs @@ -248,8 +248,7 @@ async fn streaming_all( response_stream_writer: Arc>>>, response_finished_sender: oneshot::Sender, ) -> Result<(), Error> { - let mut request_stream_reader = - StreamReader::new(request_stream); + let mut request_stream_reader = StreamReader::new(request_stream); let request_stream_copy = async move { copy(&mut request_stream_reader, &mut request_stream_writer).await?; @@ -263,8 +262,8 @@ async fn streaming_all( while let Some(result) = response_stream.next().await { match result { Ok(bytes) => { - writer.write(&bytes).await?; - }, + writer.write_all(&bytes).await?; + } // This error usually happens because there is an underlying error // in the connector. We don't want this error to obscure the real error // so we just log it as a debug and let the last output error @@ -274,7 +273,7 @@ async fn streaming_all( } Err(e) => Err::<(), std::io::Error>(e.into())?, } - }; + } response_finished_sender .send(true)