Skip to content

Commit

Permalink
airbyte-to-flow: use write_all instead of write for responses
Browse files Browse the repository at this point in the history
This fixes an issue that was seen in production where the output of
airbyte-to-flow could not be parsed as JSON.  The source of the issue seems
likely to be that ATF is calling `write` instead of `write_all` when it writes
the responses to stdout (the return value of `write` was ignored).
  • Loading branch information
psFried committed Nov 2, 2023
1 parent b70c0e5 commit 06b7fba
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions airbyte-to-flow/src/connector_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ async fn streaming_all(
response_stream_writer: Arc<Mutex<Pin<Box<dyn AsyncWrite + Sync + Send>>>>,
response_finished_sender: oneshot::Sender<bool>,
) -> Result<(), Error> {
let mut request_stream_reader =
StreamReader::new(request_stream);
let mut request_stream_reader = StreamReader::new(request_stream);

let request_stream_copy = async move {
copy(&mut request_stream_reader, &mut request_stream_writer).await?;
Expand All @@ -263,8 +262,8 @@ async fn streaming_all(
while let Some(result) = response_stream.next().await {
match result {
Ok(bytes) => {
writer.write(&bytes).await?;
},
writer.write_all(&bytes).await?;
}
// This error usually happens because there is an underlying error
// in the connector. We don't want this error to obscure the real error
// so we just log it as a debug and let the last output error
Expand All @@ -274,7 +273,7 @@ async fn streaming_all(
}
Err(e) => Err::<(), std::io::Error>(e.into())?,
}
};
}

response_finished_sender
.send(true)
Expand Down

0 comments on commit 06b7fba

Please sign in to comment.