diff --git a/Cargo.lock b/Cargo.lock index 49360ee23a..f721d872ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2333,7 +2333,7 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.24.5" +version = "0.25.0" dependencies = [ "anyhow", "async-channel 1.9.0", diff --git a/Cargo.toml b/Cargo.toml index 01ef6a52a5..e27f7182a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,7 +173,7 @@ k8-diff = { version = "0.1.2" } trybuild = { branch = "check_option", git = "https://github.com/infinyon/trybuild" } # Internal fluvio dependencies -fluvio = { version = "0.24.0", path = "crates/fluvio" } +fluvio = { version = "0.25.0", path = "crates/fluvio" } fluvio-auth = { path = "crates/fluvio-auth" } fluvio-benchmark = { path = "crates/fluvio-benchmark" } fluvio-channel = { path = "crates/fluvio-channel" } diff --git a/crates/fluvio-cli/src/client/produce/mod.rs b/crates/fluvio-cli/src/client/produce/mod.rs index 84ec2f2eb4..8426a75a0e 100644 --- a/crates/fluvio-cli/src/client/produce/mod.rs +++ b/crates/fluvio-cli/src/client/produce/mod.rs @@ -199,47 +199,27 @@ mod cmd { fluvio: &Fluvio, ) -> Result<()> { init_monitoring(fluvio.metrics()); - let config_builder = if self.interactive_mode() { - TopicProducerConfigBuilder::default().linger(std::time::Duration::from_millis(10)) - } else { - Default::default() - }; - + let mut config_builder = TopicProducerConfigBuilder::default(); // Compression - let config_builder = if let Some(compression) = self.compression { - config_builder.compression(compression) - } else { - config_builder - }; - + if let Some(compression) = self.compression { + config_builder.compression(compression); + } // Linger - let config_builder = if let Some(linger) = self.linger { - config_builder.linger(linger) - } else { - config_builder - }; - + if let Some(linger) = self.linger { + config_builder.linger(linger); + } // Batch size - let config_builder = if let Some(batch_size) = self.batch_size { - config_builder.batch_size(batch_size) - } else { - config_builder - }; - + if let Some(batch_size) = self.batch_size { + config_builder.batch_size(batch_size); + } // Max request size - let config_builder = if let Some(max_request_size) = self.max_request_size { - config_builder.max_request_size(max_request_size) - } else { - config_builder - }; - + if let Some(max_request_size) = self.max_request_size { + config_builder.max_request_size(max_request_size); + } // Isolation - let config_builder = if let Some(isolation) = self.isolation { - config_builder.isolation(isolation) - } else { - config_builder - }; - + if let Some(isolation) = self.isolation { + config_builder.isolation(isolation); + } // Delivery Semantic if self.delivery_semantic == DeliverySemantic::AtMostOnce && self.isolation.is_some() { warn!("Isolation is ignored for AtMostOnce delivery semantic"); diff --git a/crates/fluvio-connector-common/src/producer.rs b/crates/fluvio-connector-common/src/producer.rs index 9f4d8b52f9..29939fe656 100644 --- a/crates/fluvio-connector-common/src/producer.rs +++ b/crates/fluvio-connector-common/src/producer.rs @@ -9,7 +9,7 @@ pub async fn producer_from_config(config: &ConnectorConfig) -> Result<(Fluvio, T let fluvio = Fluvio::connect_with_config(&cluster_config).await?; ensure_topic_exists(config).await?; - let mut config_builder = TopicProducerConfigBuilder::default(); + let mut config_builder = &mut TopicProducerConfigBuilder::default(); if let Some(producer_params) = &config.meta().producer() { // Linger diff --git a/crates/fluvio-test/src/tests/data_generator/producer.rs b/crates/fluvio-test/src/tests/data_generator/producer.rs index d5b070bcae..af0ab70ffe 100644 --- a/crates/fluvio-test/src/tests/data_generator/producer.rs +++ b/crates/fluvio-test/src/tests/data_generator/producer.rs @@ -33,46 +33,6 @@ pub async fn producer( let mut producers = Vec::new(); for topic_id in 0..option.environment.topic { - let maybe_builder = match ( - option.environment.producer_linger, - option.environment.producer_batch_size, - option.environment.producer_compression, - ) { - (Some(linger), Some(batch), Some(compression)) => Some( - TopicProducerConfigBuilder::default() - .linger(Duration::from_millis(linger)) - .batch_size(batch) - .compression(compression), - ), - (Some(linger), Some(batch), None) => Some( - TopicProducerConfigBuilder::default() - .linger(Duration::from_millis(linger)) - .batch_size(batch), - ), - (Some(linger), None, None) => { - Some(TopicProducerConfigBuilder::default().linger(Duration::from_millis(linger))) - } - (Some(linger), None, Some(compression)) => Some( - TopicProducerConfigBuilder::default() - .linger(Duration::from_millis(linger)) - .compression(compression), - ), - (None, Some(batch), Some(compression)) => Some( - TopicProducerConfigBuilder::default() - .batch_size(batch) - .compression(compression), - ), - - (None, Some(batch), None) => { - Some(TopicProducerConfigBuilder::default().batch_size(batch)) - } - (None, None, Some(compression)) => { - Some(TopicProducerConfigBuilder::default().compression(compression)) - } - - (None, None, None) => None, - }; - let env_opts = option.environment.clone(); let test_topic_name = if env_opts.topic > 1 { @@ -81,16 +41,41 @@ pub async fn producer( env_opts.base_topic_name() }; - if let Some(producer_config) = maybe_builder { - let config = producer_config.build().expect("producer builder"); - producers.push( - test_driver - .create_producer_with_config(&test_topic_name, config) - .await, - ) - } else { - producers.push(test_driver.create_producer(&test_topic_name).await) + let mut builder = TopicProducerConfigBuilder::default(); + match ( + option.environment.producer_linger, + option.environment.producer_batch_size, + option.environment.producer_compression, + ) { + (Some(linger), Some(batch), Some(compression)) => builder + .linger(Duration::from_millis(linger)) + .batch_size(batch) + .compression(compression), + (Some(linger), Some(batch), None) => builder + .linger(Duration::from_millis(linger)) + .batch_size(batch), + (Some(linger), None, None) => builder.linger(Duration::from_millis(linger)), + (Some(linger), None, Some(compression)) => builder + .linger(Duration::from_millis(linger)) + .compression(compression), + (None, Some(batch), Some(compression)) => { + builder.batch_size(batch).compression(compression) + } + + (None, Some(batch), None) => builder.batch_size(batch), + (None, None, Some(compression)) => builder.compression(compression), + (None, None, None) => { + producers.push(test_driver.create_producer(&test_topic_name).await); + continue; + } }; + + let config = builder.build().expect("producer builder"); + producers.push( + test_driver + .create_producer_with_config(&test_topic_name, config) + .await, + ) } // Create the syncing producer/consumer diff --git a/crates/fluvio/Cargo.toml b/crates/fluvio/Cargo.toml index 0e4e138df7..458c2c3a90 100644 --- a/crates/fluvio/Cargo.toml +++ b/crates/fluvio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio" -version = "0.24.5" +version = "0.25.0" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] diff --git a/crates/fluvio/src/producer/config.rs b/crates/fluvio/src/producer/config.rs index cd4764798a..8e1c5eb01e 100644 --- a/crates/fluvio/src/producer/config.rs +++ b/crates/fluvio/src/producer/config.rs @@ -1,5 +1,6 @@ use std::fmt::{self, Debug, Display, Formatter}; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use derive_builder::Builder; @@ -44,8 +45,8 @@ fn default_linger_duration() -> Duration { Duration::from_millis(DEFAULT_LINGER_MS) } -fn default_partitioner() -> Box { - Box::new(SiphashRoundRobinPartitioner::new()) +fn default_partitioner() -> Arc { + Arc::new(SiphashRoundRobinPartitioner::new()) } fn default_timeout() -> Duration { @@ -71,8 +72,7 @@ impl fmt::Debug for Box { /// Create this struct with [`TopicProducerConfigBuilder`]. /// /// Create a producer with a custom config with [`crate::Fluvio::topic_producer_with_config()`]. -#[derive(Builder)] -#[builder(pattern = "owned")] +#[derive(Builder, Clone)] pub struct TopicProducerConfig { /// Maximum amount of bytes accumulated by the records before sending the batch. #[builder(default = "default_batch_size()")] @@ -88,7 +88,7 @@ pub struct TopicProducerConfig { pub(crate) linger: Duration, /// Partitioner assigns the partition to each record that needs to be send #[builder(default = "default_partitioner()")] - pub(crate) partitioner: Box, + pub(crate) partitioner: Arc, /// Compression algorithm used by Fluvio producer to compress data. /// If there is a topic level compression and it is not compatible with this setting, the producer @@ -125,8 +125,8 @@ pub struct TopicProducerConfig { } impl TopicProducerConfigBuilder { - pub fn set_specific_partitioner(self, partition_id: PartitionId) -> Self { - self.partitioner(Box::new(SpecificPartitioner::new(partition_id))) + pub fn set_specific_partitioner(&mut self, partition_id: PartitionId) -> &mut Self { + self.partitioner(Arc::new(SpecificPartitioner::new(partition_id))) } }