Skip to content

Commit

Permalink
feat: consumer stream should be able to reconnect (#4394)
Browse files Browse the repository at this point in the history
* chore: make offset_commit async

* feat: consumer stream should be able to reconnect

* chore: wait commit offset on retry

* chore: wait by notification on retry

* chore: offset_commit as async
  • Loading branch information
fraidev authored Feb 20, 2025
1 parent 5533332 commit 3d29f07
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 43 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/fluvio-cli/src/client/consume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ mod cmd {
continue;
}
*/
Err(ErrorCode::MaxRetryReached) => {
eprintln!("Max limit of retry connection reached");
break;
}
Err(other) => return Err(other.into()),
};

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-protocol"
edition = "2021"
version = "0.12.0"
version = "0.12.1"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Fluvio streaming protocol"
repository = "https://github.com/infinyon/fluvio"
Expand Down
3 changes: 3 additions & 0 deletions crates/fluvio-protocol/src/link/error_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ pub enum ErrorCode {
#[fluvio(tag = 3004)]
#[error("the offset management is disabled for the stream")]
OffsetManagementDisabled,
#[fluvio(tag = 3005)]
#[error("max retry attempts reached")]
MaxRetryReached,

// Managed Connector Errors
#[fluvio(tag = 5000)]
Expand Down
120 changes: 87 additions & 33 deletions crates/fluvio-test/src/tests/reconnection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,85 +1,139 @@
use std::time::Duration;

use futures_lite::stream::StreamExt;
use clap::Parser;

use fluvio::{Offset, RecordKey};
use fluvio_controlplane_metadata::partition::PartitionSpec;
use fluvio_future::timer::sleep;
use clap::Parser;

use fluvio_test_derive::fluvio_test;
use fluvio_test_case_derive::MyTestCase;
use fluvio_test_util::{test_meta::environment::EnvDetail, test_runner::test_driver::TestDriver};

// time to wait for ac
const ACK_WAIT: u64 = 20;
const ACK_WAIT: u64 = 5;

#[derive(Debug, Clone, Parser, Default, Eq, PartialEq, MyTestCase)]
#[command(name = "Fluvio reconnection Test")]
pub struct ReconnectionTestOption {}

#[fluvio_test(topic = "reconnection", async)]
pub async fn reconnection(mut test_driver: TestDriver, mut test_case: TestCase) {
reconnect_producer(&test_driver, &test_case).await;

reconnect_consumer(&test_driver, &test_case).await;
}

async fn reconnect_producer(test_driver: &TestDriver, test_case: &TestCase) {
println!("Starting reconnection test");

// first a create simple message
let topic_name = test_case.environment.base_topic_name();
let producer = test_driver.create_producer(&topic_name).await;
println!("sending first record");

producer
.send(RecordKey::NULL, "msg1")
.send(RecordKey::NULL, "msg1_producer")
.await
.expect("sending");

producer.flush().await.expect("flushing");

let admin = test_driver.client().admin().await;
let leader = get_leader(test_driver).await;
terminate_spu(test_driver, leader).await;
sleep(Duration::from_secs(ACK_WAIT)).await;
start_spu(test_driver, leader).await;
sleep(Duration::from_secs(ACK_WAIT)).await;
producer.clear_errors().await;

let partitions = admin.all::<PartitionSpec>().await.expect("partitions");
println!("sending second record");
producer
.send(RecordKey::NULL, "msg2_producer")
.await
.expect("sending");

let test_topic = &partitions[0];
let leader = test_topic.spec.leader;
println!("spu id is: {leader}");
producer.flush().await.expect("flushing");

let cluster_manager = test_driver
.get_cluster()
.expect("cluster")
.env_driver()
.create_cluster_manager();
let mut stream = test_driver
.get_consumer_with_start(&topic_name, 0, Offset::absolute(0).expect("offset"))
.await;

println!("terminating spu: {}", &leader);
println!("checking msg1");
let records = stream.next().await.expect("get next").expect("next");
assert_eq!(records.value(), "msg1_producer".as_bytes());

cluster_manager.terminate_spu(leader).expect("terminate");
println!("checking msg2");
let records = stream.next().await.expect("get next").expect("next");
assert_eq!(records.value(), "msg2_producer".as_bytes());
}

sleep(Duration::from_secs(ACK_WAIT)).await;
async fn reconnect_consumer(test_driver: &TestDriver, test_case: &TestCase) {
println!("Starting reconnection test");

println!("starting spu again: {}", &leader);
let topic_name = test_case.environment.base_topic_name();
let producer = test_driver.create_producer(&topic_name).await;
println!("sending first record");

let leader_spu = cluster_manager.create_spu_absolute(leader as u16);
leader_spu.start().expect("start");
let mut stream = test_driver
.get_consumer_with_start(&topic_name, 0, Offset::end())
.await;

sleep(Duration::from_secs(ACK_WAIT)).await;
producer
.send(RecordKey::NULL, "msg1_consume")
.await
.expect("sending");

producer.flush().await.expect("flushing");

println!("checking msg1");
let records = stream.next().await.expect("get next").expect("next");
assert_eq!(records.value(), "msg1_consume".as_bytes());

let leader = get_leader(test_driver).await;
terminate_spu(test_driver, leader).await;
sleep(Duration::from_secs(ACK_WAIT)).await;
start_spu(test_driver, leader).await;
sleep(Duration::from_secs(ACK_WAIT)).await;
producer.clear_errors().await;

println!("sending second record");
// Use the same producer
producer
.send(RecordKey::NULL, "msg2")
.send(RecordKey::NULL, "msg2_consume")
.await
.expect("sending");

producer.flush().await.expect("flushing");

let mut stream = test_driver
.get_consumer_with_start(&topic_name, 0, Offset::absolute(0).expect("offset"))
.await;

println!("checking msg1");
let records = stream.next().await.expect("get next").expect("next");
assert_eq!(records.value(), "msg1".as_bytes());

println!("checking msg2");
let records = stream.next().await.expect("get next").expect("next");
assert_eq!(records.value(), "msg2".as_bytes());
assert_eq!(records.value(), "msg2_consume".as_bytes());
}

async fn get_leader(test_driver: &TestDriver) -> i32 {
let admin = test_driver.client().admin().await;
let partitions = admin.all::<PartitionSpec>().await.expect("partitions");
let test_topic = &partitions[0];
let leader = test_topic.spec.leader;
println!("spu id is: {leader}");
leader
}

async fn terminate_spu(test_driver: &TestDriver, leader: i32) {
let cluster_manager = test_driver
.get_cluster()
.expect("cluster")
.env_driver()
.create_cluster_manager();
println!("terminating spu: {}", &leader);
cluster_manager.terminate_spu(leader).expect("terminate");
}

async fn start_spu(test_driver: &TestDriver, leader: i32) {
let cluster_manager = test_driver
.get_cluster()
.expect("cluster")
.env_driver()
.create_cluster_manager();
println!("starting spu again: {}", &leader);
let leader_spu = cluster_manager.create_spu_absolute(leader as u16);
leader_spu.start().expect("start");
}
1 change: 1 addition & 0 deletions crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ nightly = []
unstable = []

[dependencies]
adaptive_backoff = { workspace = true }
async-channel = { workspace = true }
async-lock = { workspace = true }
async-trait = { workspace = true }
Expand Down
14 changes: 13 additions & 1 deletion crates/fluvio/src/consumer/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ pub enum OffsetManagementStrategy {
Auto,
}

#[derive(Debug, Default, Clone, PartialEq)]
pub enum RetryMode {
Disabled,
TryUntil(u32),
#[default]
TryForever,
}

#[derive(Debug, Builder, Clone)]
#[builder(build_fn(private, name = "build_impl"))]
pub struct ConsumerConfigExt {
Expand All @@ -70,13 +78,15 @@ pub struct ConsumerConfigExt {
#[builder(default = "DEFAULT_OFFSET_FLUSH_PERIOD")]
pub offset_flush: Duration,
#[builder(default)]
disable_continuous: bool,
pub disable_continuous: bool,
#[builder(default = "*MAX_FETCH_BYTES")]
pub max_bytes: i32,
#[builder(default)]
pub isolation: Isolation,
#[builder(default)]
pub smartmodule: Vec<SmartModuleInvocation>,
#[builder(default)]
pub retry_mode: RetryMode,
}

impl ConsumerConfigExt {
Expand Down Expand Up @@ -105,6 +115,7 @@ impl ConsumerConfigExt {
smartmodule,
offset_strategy,
offset_flush,
retry_mode: _,
} = self;

let config = ConsumerConfig {
Expand Down Expand Up @@ -162,6 +173,7 @@ impl From<ConsumerConfigExt> for ConsumerConfig {
max_bytes,
isolation,
smartmodule,
retry_mode: _,
} = value;

Self {
Expand Down
4 changes: 3 additions & 1 deletion crates/fluvio/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod config;
mod stream;
mod offset;
mod retry;

use std::sync::Arc;

Expand Down Expand Up @@ -32,9 +33,10 @@ use crate::offset::{Offset, fetch_offsets};
use crate::spu::{SpuDirectory, SpuSocketPool};

pub use config::{ConsumerConfig, ConsumerConfigBuilder};
pub use config::{ConsumerConfigExt, ConsumerConfigExtBuilder, OffsetManagementStrategy};
pub use config::{ConsumerConfigExt, ConsumerConfigExtBuilder, OffsetManagementStrategy, RetryMode};
pub use stream::{ConsumerStream, MultiplePartitionConsumerStream, SinglePartitionConsumerStream};
pub use offset::ConsumerOffset;
pub use retry::ConsumerWithRetry;

pub use fluvio_protocol::record::ConsumerRecord as Record;
pub use fluvio_spu_schema::server::smartmodule::SmartModuleInvocation;
Expand Down
Loading

0 comments on commit 3d29f07

Please sign in to comment.