Skip to content

Commit

Permalink
chore: produce config builder as mutable (#4375)
Browse files Browse the repository at this point in the history
* chore: produce config cloneable

* chore: remove linger 10 when interactive mode
  • Loading branch information
fraidev authored Feb 4, 2025
1 parent 8f6a1cf commit 29d9030
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 96 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ k8-diff = { version = "0.1.2" }
trybuild = { branch = "check_option", git = "https://github.com/infinyon/trybuild" }

# Internal fluvio dependencies
fluvio = { version = "0.24.0", path = "crates/fluvio" }
fluvio = { version = "0.25.0", path = "crates/fluvio" }
fluvio-auth = { path = "crates/fluvio-auth" }
fluvio-benchmark = { path = "crates/fluvio-benchmark" }
fluvio-channel = { path = "crates/fluvio-channel" }
Expand Down
52 changes: 16 additions & 36 deletions crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,47 +199,27 @@ mod cmd {
fluvio: &Fluvio,
) -> Result<()> {
init_monitoring(fluvio.metrics());
let config_builder = if self.interactive_mode() {
TopicProducerConfigBuilder::default().linger(std::time::Duration::from_millis(10))
} else {
Default::default()
};

let mut config_builder = TopicProducerConfigBuilder::default();
// Compression
let config_builder = if let Some(compression) = self.compression {
config_builder.compression(compression)
} else {
config_builder
};

if let Some(compression) = self.compression {
config_builder.compression(compression);
}
// Linger
let config_builder = if let Some(linger) = self.linger {
config_builder.linger(linger)
} else {
config_builder
};

if let Some(linger) = self.linger {
config_builder.linger(linger);
}
// Batch size
let config_builder = if let Some(batch_size) = self.batch_size {
config_builder.batch_size(batch_size)
} else {
config_builder
};

if let Some(batch_size) = self.batch_size {
config_builder.batch_size(batch_size);
}
// Max request size
let config_builder = if let Some(max_request_size) = self.max_request_size {
config_builder.max_request_size(max_request_size)
} else {
config_builder
};

if let Some(max_request_size) = self.max_request_size {
config_builder.max_request_size(max_request_size);
}
// Isolation
let config_builder = if let Some(isolation) = self.isolation {
config_builder.isolation(isolation)
} else {
config_builder
};

if let Some(isolation) = self.isolation {
config_builder.isolation(isolation);
}
// Delivery Semantic
if self.delivery_semantic == DeliverySemantic::AtMostOnce && self.isolation.is_some() {
warn!("Isolation is ignored for AtMostOnce delivery semantic");
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-connector-common/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub async fn producer_from_config(config: &ConnectorConfig) -> Result<(Fluvio, T

let fluvio = Fluvio::connect_with_config(&cluster_config).await?;
ensure_topic_exists(config).await?;
let mut config_builder = TopicProducerConfigBuilder::default();
let mut config_builder = &mut TopicProducerConfigBuilder::default();

if let Some(producer_params) = &config.meta().producer() {
// Linger
Expand Down
83 changes: 34 additions & 49 deletions crates/fluvio-test/src/tests/data_generator/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,46 +33,6 @@ pub async fn producer(
let mut producers = Vec::new();

for topic_id in 0..option.environment.topic {
let maybe_builder = match (
option.environment.producer_linger,
option.environment.producer_batch_size,
option.environment.producer_compression,
) {
(Some(linger), Some(batch), Some(compression)) => Some(
TopicProducerConfigBuilder::default()
.linger(Duration::from_millis(linger))
.batch_size(batch)
.compression(compression),
),
(Some(linger), Some(batch), None) => Some(
TopicProducerConfigBuilder::default()
.linger(Duration::from_millis(linger))
.batch_size(batch),
),
(Some(linger), None, None) => {
Some(TopicProducerConfigBuilder::default().linger(Duration::from_millis(linger)))
}
(Some(linger), None, Some(compression)) => Some(
TopicProducerConfigBuilder::default()
.linger(Duration::from_millis(linger))
.compression(compression),
),
(None, Some(batch), Some(compression)) => Some(
TopicProducerConfigBuilder::default()
.batch_size(batch)
.compression(compression),
),

(None, Some(batch), None) => {
Some(TopicProducerConfigBuilder::default().batch_size(batch))
}
(None, None, Some(compression)) => {
Some(TopicProducerConfigBuilder::default().compression(compression))
}

(None, None, None) => None,
};

let env_opts = option.environment.clone();

let test_topic_name = if env_opts.topic > 1 {
Expand All @@ -81,16 +41,41 @@ pub async fn producer(
env_opts.base_topic_name()
};

if let Some(producer_config) = maybe_builder {
let config = producer_config.build().expect("producer builder");
producers.push(
test_driver
.create_producer_with_config(&test_topic_name, config)
.await,
)
} else {
producers.push(test_driver.create_producer(&test_topic_name).await)
let mut builder = TopicProducerConfigBuilder::default();
match (
option.environment.producer_linger,
option.environment.producer_batch_size,
option.environment.producer_compression,
) {
(Some(linger), Some(batch), Some(compression)) => builder
.linger(Duration::from_millis(linger))
.batch_size(batch)
.compression(compression),
(Some(linger), Some(batch), None) => builder
.linger(Duration::from_millis(linger))
.batch_size(batch),
(Some(linger), None, None) => builder.linger(Duration::from_millis(linger)),
(Some(linger), None, Some(compression)) => builder
.linger(Duration::from_millis(linger))
.compression(compression),
(None, Some(batch), Some(compression)) => {
builder.batch_size(batch).compression(compression)
}

(None, Some(batch), None) => builder.batch_size(batch),
(None, None, Some(compression)) => builder.compression(compression),
(None, None, None) => {
producers.push(test_driver.create_producer(&test_topic_name).await);
continue;
}
};

let config = builder.build().expect("producer builder");
producers.push(
test_driver
.create_producer_with_config(&test_topic_name, config)
.await,
)
}

// Create the syncing producer/consumer
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.24.5"
version = "0.25.0"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down
14 changes: 7 additions & 7 deletions crates/fluvio/src/producer/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::{self, Debug, Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use derive_builder::Builder;
Expand Down Expand Up @@ -44,8 +45,8 @@ fn default_linger_duration() -> Duration {
Duration::from_millis(DEFAULT_LINGER_MS)
}

fn default_partitioner() -> Box<dyn Partitioner + Send + Sync> {
Box::new(SiphashRoundRobinPartitioner::new())
fn default_partitioner() -> Arc<dyn Partitioner + Send + Sync> {
Arc::new(SiphashRoundRobinPartitioner::new())
}

fn default_timeout() -> Duration {
Expand All @@ -71,8 +72,7 @@ impl fmt::Debug for Box<dyn Partitioner + Send + Sync> {
/// Create this struct with [`TopicProducerConfigBuilder`].
///
/// Create a producer with a custom config with [`crate::Fluvio::topic_producer_with_config()`].
#[derive(Builder)]
#[builder(pattern = "owned")]
#[derive(Builder, Clone)]
pub struct TopicProducerConfig {
/// Maximum amount of bytes accumulated by the records before sending the batch.
#[builder(default = "default_batch_size()")]
Expand All @@ -88,7 +88,7 @@ pub struct TopicProducerConfig {
pub(crate) linger: Duration,
/// Partitioner assigns the partition to each record that needs to be send
#[builder(default = "default_partitioner()")]
pub(crate) partitioner: Box<dyn Partitioner + Send + Sync>,
pub(crate) partitioner: Arc<dyn Partitioner + Send + Sync>,

/// Compression algorithm used by Fluvio producer to compress data.
/// If there is a topic level compression and it is not compatible with this setting, the producer
Expand Down Expand Up @@ -125,8 +125,8 @@ pub struct TopicProducerConfig {
}

impl TopicProducerConfigBuilder {
pub fn set_specific_partitioner(self, partition_id: PartitionId) -> Self {
self.partitioner(Box::new(SpecificPartitioner::new(partition_id)))
pub fn set_specific_partitioner(&mut self, partition_id: PartitionId) -> &mut Self {
self.partitioner(Arc::new(SpecificPartitioner::new(partition_id)))
}
}

Expand Down

0 comments on commit 29d9030

Please sign in to comment.