Skip to content

Commit

Permalink
source-kafka: do not use async stdout
Browse files Browse the repository at this point in the history
Using the regular stdout is a pretty huge performance increase, so use that
instead of the async version.
  • Loading branch information
williamhbaker committed Nov 6, 2024
1 parent 0ac7a0e commit 253ee5e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 24 deletions.
31 changes: 14 additions & 17 deletions source-kafka/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::io::Write;

use anyhow::{Context, Result};
use configuration::{schema_for, EndpointConfig, Resource};
use discover::do_discover;
Expand All @@ -10,7 +12,6 @@ use proto_flow::capture::{
};
use pull::do_pull;
use rdkafka::consumer::Consumer;
use tokio::io::AsyncWriteExt;
use tokio::io::{self, AsyncBufReadExt};

pub mod configuration;
Expand All @@ -23,7 +24,7 @@ const KAFKA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

pub async fn run_connector(
mut stdin: io::BufReader<io::Stdin>,
mut stdout: io::Stdout,
mut stdout: std::io::Stdout,
) -> Result<(), anyhow::Error> {
tracing::info!("running connector");

Expand All @@ -47,7 +48,7 @@ pub async fn run_connector(
..Default::default()
};

write_capture_response(res, &mut stdout).await?;
write_capture_response(res, &mut stdout)?;
} else if let Some(req) = request.discover {
let res = Response {
discovered: Some(Discovered {
Expand All @@ -56,7 +57,7 @@ pub async fn run_connector(
..Default::default()
};

write_capture_response(res, &mut stdout).await?;
write_capture_response(res, &mut stdout)?;
} else if let Some(req) = request.validate {
let res = Response {
validated: Some(Validated {
Expand All @@ -65,7 +66,7 @@ pub async fn run_connector(
..Default::default()
};

write_capture_response(res, &mut stdout).await?;
write_capture_response(res, &mut stdout)?;
} else if request.apply.is_some() {
let res = Response {
applied: Some(Applied {
Expand All @@ -74,7 +75,7 @@ pub async fn run_connector(
..Default::default()
};

write_capture_response(res, &mut stdout).await?;
write_capture_response(res, &mut stdout)?;
} else if let Some(req) = request.open {
write_capture_response(
Response {
Expand All @@ -84,8 +85,7 @@ pub async fn run_connector(
..Default::default()
},
&mut stdout,
)
.await?;
)?;

let eof = tokio::spawn(async move {
match stdin.read_line(&mut line).await? {
Expand All @@ -110,18 +110,15 @@ pub async fn run_connector(
Ok(())
}

pub async fn write_capture_response(
pub fn write_capture_response(
response: Response,
stdout: &mut io::Stdout,
stdout: &mut std::io::Stdout,
) -> anyhow::Result<()> {
let resp = serde_json::to_vec(&response).context("serializing response")?;
stdout.write_all(&resp).await.context("writing response")?;
stdout
.write_u8(b'\n')
.await
.context("writing response newline")?;
serde_json::to_writer(&mut *stdout, &response).context("serializing response")?;
writeln!(stdout).context("writing response newline")?;

if response.captured.is_none() {
stdout.flush().await?;
stdout.flush().context("flushing stdout")?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion source-kafka/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ fn main() -> anyhow::Result<()> {
let runtime = start_runtime()?;

let stdin = io::BufReader::new(io::stdin());
let stdout = io::stdout();
let stdout = std::io::stdout();

let result = runtime.block_on(run_connector(stdin, stdout));

Expand Down
9 changes: 3 additions & 6 deletions source-kafka/src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use serde::{Deserialize, Serialize};
use serde_json::{json, Map};
use std::collections::{hash_map::Entry, HashMap};
use time::{format_description, OffsetDateTime};
use tokio::io::{self};

#[derive(Debug, Deserialize, Serialize, Default)]
struct CaptureState {
Expand Down Expand Up @@ -74,7 +73,7 @@ enum MetaTimestamp {
LogAppendTime(String),
}

pub async fn do_pull(req: Open, mut stdout: io::Stdout) -> Result<()> {
pub async fn do_pull(req: Open, mut stdout: std::io::Stdout) -> Result<()> {
let spec = req.capture.expect("open must contain a capture spec");

let state = if req.state_json == "{}" {
Expand Down Expand Up @@ -187,8 +186,7 @@ pub async fn do_pull(req: Open, mut stdout: io::Stdout) -> Result<()> {
..Default::default()
},
&mut stdout,
)
.await?;
)?;

write_capture_response(
Response {
Expand All @@ -201,8 +199,7 @@ pub async fn do_pull(req: Open, mut stdout: io::Stdout) -> Result<()> {
..Default::default()
},
&mut stdout,
)
.await?;
)?;
}
}

Expand Down

0 comments on commit 253ee5e

Please sign in to comment.