Skip to content

Commit

Permalink
chore: make offset_commit async (#4402)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev authored Feb 19, 2025
1 parent 9c4d47f commit 5533332
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ k8-diff = { version = "0.1.2" }
trybuild = { branch = "check_option", git = "https://github.com/infinyon/trybuild" }

# Internal fluvio dependencies
fluvio = { version = "0.25.0", path = "crates/fluvio" }
fluvio = { version = "0.26.0", path = "crates/fluvio" }
fluvio-auth = { path = "crates/fluvio-auth" }
fluvio-benchmark = { path = "crates/fluvio-benchmark" }
fluvio-channel = { path = "crates/fluvio-channel" }
Expand Down
2 changes: 1 addition & 1 deletion connector/sink-test-connector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn process_item(
stream: &mut impl ConsumerStream,
) -> Result<()> {
sink.send(str).await?;
stream.offset_commit()?;
stream.offset_commit().await?;
stream.offset_flush().await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-cli/src/client/consume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ mod cmd {
}

if self.consumer.is_some() {
stream.get_mut().offset_commit()?;
stream.get_mut().offset_commit().await?;
stream.get_mut().offset_flush().await?;
}

Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-test/src/tests/consumer_offsets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn test_strategy_none(client: &Fluvio, topic: &str, partitions: usize) ->
for _ in 0..RECORDS_COUNT {
ensure_read(&mut stream, &mut counts).await?;
}
ensure!(stream.offset_commit().is_err());
ensure!(stream.offset_commit().await.is_err());
ensure!(stream.offset_flush().await.is_err());
Ok(())
}
Expand All @@ -125,7 +125,7 @@ async fn test_strategy_manual(client: &Fluvio, topic: &str, partitions: usize) -
for _ in chunk {
ensure_read(&mut stream, &mut counts).await?;
}
stream.offset_commit()?;
stream.offset_commit().await?;
stream.offset_flush().await?;
}
// no more records for this consumer
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.25.0"
version = "0.26.0"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down
31 changes: 17 additions & 14 deletions crates/fluvio/src/consumer/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use super::{offset::OffsetLocalStore, StreamToServer};
pub trait ConsumerStream: Stream<Item = Result<Record, ErrorCode>> + Unpin {
/// Mark the offset of the last yelded record as committed. Depending on [`OffsetManagementStrategy`]
/// it may require a subsequent `offset_flush()` call to take any effect.
fn offset_commit(&mut self) -> Result<(), ErrorCode>;
fn offset_commit(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>>;

/// Send the committed offset to the server. The method waits for the server's acknowledgment before it finishes.
fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>>;
Expand Down Expand Up @@ -114,7 +114,7 @@ impl<T> ConsumerStream for futures_util::stream::TakeUntil<T, async_channel::Rec
where
T: ConsumerStream,
{
fn offset_commit(&mut self) -> Result<(), ErrorCode> {
fn offset_commit(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> {
self.get_mut().offset_commit()
}

Expand All @@ -126,8 +126,8 @@ where
impl<T: Stream<Item = Result<Record, ErrorCode>> + Unpin> ConsumerStream
for SinglePartitionConsumerStream<T>
{
fn offset_commit(&mut self) -> Result<(), ErrorCode> {
self.offset_mngt.commit()
fn offset_commit(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> {
Box::pin(async { self.offset_mngt.commit() })
}

fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> {
Expand All @@ -138,11 +138,14 @@ impl<T: Stream<Item = Result<Record, ErrorCode>> + Unpin> ConsumerStream
impl<T: Stream<Item = Result<Record, ErrorCode>> + Unpin> ConsumerStream
for MultiplePartitionConsumerStream<T>
{
fn offset_commit(&mut self) -> Result<(), ErrorCode> {
fn offset_commit(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> {
for partition in &self.offset_mgnts {
partition.commit()?;
if let Err(err) = partition.commit() {
return Box::pin(async { Err(err) });
}
}
Ok(())

Box::pin(async { Ok(()) })
}

fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> {
Expand Down Expand Up @@ -350,7 +353,7 @@ mod tests {
);

//when
let res = partition_stream.offset_commit();
let res = partition_stream.offset_commit().await;

//then
assert_eq!(res, Err(ErrorCode::OffsetManagementDisabled));
Expand Down Expand Up @@ -388,9 +391,9 @@ mod tests {
//when
assert!(partition_stream.next().await.is_some()); // seen = 0
assert!(partition_stream.next().await.is_some()); // seen = 1
let _ = partition_stream.offset_commit(); // comitted = 1
let _ = partition_stream.offset_commit().await; // comitted = 1
assert!(partition_stream.next().await.is_some()); // seen = 2
let _ = partition_stream.offset_commit(); // comitted = 2
let _ = partition_stream.offset_commit().await; // comitted = 2

//then
fluvio_future::task::spawn(async move {
Expand Down Expand Up @@ -436,9 +439,9 @@ mod tests {
//when
assert!(multi_stream.next().await.is_some()); // p1 seen = 0
assert!(multi_stream.next().await.is_some()); // p2 seen = 0
let _ = multi_stream.offset_commit(); // both comitted = 0
let _ = multi_stream.offset_commit().await; // both comitted = 0
assert!(multi_stream.next().await.is_some()); // p2 seen = 1
let _ = multi_stream.offset_commit(); // comitted p1 = 0, p2 = 1
let _ = multi_stream.offset_commit().await; // comitted p1 = 0, p2 = 1

//then
fluvio_future::task::spawn(async move {
Expand Down Expand Up @@ -700,7 +703,7 @@ mod tests {
});

assert!(partition_stream.next().await.is_some()); // seen = 0
let _ = partition_stream.offset_commit();
let _ = partition_stream.offset_commit().await;
let flush_res = partition_stream.offset_flush().await;

//then
Expand Down Expand Up @@ -730,7 +733,7 @@ mod tests {
//when
assert!(multi_stream.next().await.is_some()); // p1 seen = 0
assert!(multi_stream.next().await.is_some()); // p2 seen = 0
let _ = multi_stream.offset_commit();
let _ = multi_stream.offset_commit().await;
fluvio_future::task::spawn(async move {
let message = rx1.recv().await;
assert!(
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/src/fluvio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl Fluvio {
/// .await?;
/// while let Some(Ok(record)) = stream.next().await {
/// println!("{}", String::from_utf8_lossy(record.as_ref()));
/// stream.offset_commit()?;
/// stream.offset_commit().await?;
/// stream.offset_flush().await?;
/// }
/// Ok(())
Expand Down
2 changes: 1 addition & 1 deletion rfc/offset-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn offset_example() -> Result<()> {
if some_condition {
break;
}
stream.offset_commit();
stream.offset_commit().await?;
}

// synchronously flush for shutdown (or none if intentionally ending processing)
Expand Down

0 comments on commit 5533332

Please sign in to comment.