From cae942346745b3c726513e19eecc9c71a9fade1f Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 19 Jan 2024 14:45:32 -0500 Subject: [PATCH] Return error instead of panicing. --- src/consumer/stream_consumer.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 5a7f60552..96e9c7cd4 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -207,11 +207,20 @@ where fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult { let native_config = config.create_native_config()?; let poll_interval = { - let millis: u64 = native_config - .get("max.poll.interval.ms")? - .parse() - .expect("librdkafka validated config value is valid u64"); - Duration::from_millis(millis) + let millis = native_config.get("max.poll.interval.ms")?; + match millis.parse() { + Ok(millis) => Duration::from_millis(millis), + Err(e) => { + println!("Config string: '{}'", millis); + println!("Error: '{}'", e); + return Err(KafkaError::ClientConfig( + RDKafkaConfRes::RD_KAFKA_CONF_INVALID, + "max.poll.interval.ms".to_string(), + format!("Invalid integer: {}", e), + millis, + )); + } + } }; let base = Arc::new(BaseConsumer::new(config, native_config, context)?);