From 19f32bf824f5c5bd4268d6dcc6adb69a7afc0b08 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Fri, 11 Aug 2023 17:00:15 -0400 Subject: [PATCH 01/24] Move to Event-based API --- src/admin.rs | 2 +- src/client.rs | 242 +++++++++++++++++--------------- src/consumer/base_consumer.rs | 211 ++++++++++++++++++---------- src/consumer/stream_consumer.rs | 5 - src/message.rs | 48 ++++++- src/producer/base_producer.rs | 93 +++++++----- 6 files changed, 362 insertions(+), 239 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index bc642a30d..69dba537b 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -403,7 +403,7 @@ fn start_poll_thread(queue: Arc, should_stop: Arc) -> J .expect("Failed to start polling thread") } -type NativeEvent = NativePtr; +pub(crate) type NativeEvent = NativePtr; unsafe impl KafkaDrop for RDKafkaEvent { const TYPE: &'static str = "event"; diff --git a/src/client.rs b/src/client.rs index c8c31ea3b..635dcb451 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,19 +11,19 @@ //! [`consumer`]: crate::consumer //! [`producer`]: crate::producer -use std::convert::TryFrom; use std::error::Error; use std::ffi::{CStr, CString}; use std::mem::ManuallyDrop; -use std::os::raw::{c_char, c_void}; +use std::os::raw::c_char; use std::ptr; -use std::slice; use std::string::ToString; use std::sync::Arc; +use libc::c_void; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; +use crate::admin::NativeEvent; use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel}; use crate::consumer::RebalanceProtocol; use crate::error::{IsError, KafkaError, KafkaResult}; @@ -239,21 +239,6 @@ impl Client { Arc::as_ptr(&context) as *mut c_void, ) }; - unsafe { rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::)) }; - unsafe { - rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::)) - }; - unsafe { - rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::)) - }; - if C::ENABLE_REFRESH_OAUTH_TOKEN { - unsafe { - rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb( - native_config.ptr(), - Some(native_oauth_refresh_cb::), - ) - }; - } let client_ptr = unsafe { let native_config = ManuallyDrop::new(native_config); @@ -293,6 +278,126 @@ impl Client { &self.context } + pub(crate) fn poll_event( + &self, + queue: Arc, + timeout: Timeout, + ) -> Option { + let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) }; + if let Some(ev) = event { + let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; + match evtype { + rdsys::RD_KAFKA_EVENT_LOG => self.handle_log_event(ev.ptr()), + rdsys::RD_KAFKA_EVENT_STATS => self.handle_stats_event(ev.ptr()), + rdsys::RD_KAFKA_EVENT_ERROR => self.handle_error_event(ev.ptr()), + rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH => { + if C::ENABLE_REFRESH_OAUTH_TOKEN { + self.handle_oauth_refresh_event(ev.ptr()); + } + } + _ => { + return Some(ev); + } + } + } + None + } + + fn handle_log_event(&self, event: *mut RDKafkaEvent) { + let mut fac: *const c_char = std::ptr::null(); + let mut str_: *const c_char = std::ptr::null(); + let mut level: i32 = 0; + let result = unsafe { rdsys::rd_kafka_event_log(event, &mut fac, &mut str_, &mut level) }; + if result == 0 { + let fac = unsafe { CStr::from_ptr(fac).to_string_lossy() }; + let log_message = unsafe { CStr::from_ptr(str_).to_string_lossy() }; + self.context().log( + RDKafkaLogLevel::from_int(level), + fac.trim(), + log_message.trim(), + ); + } + } + + fn handle_stats_event(&self, event: *mut RDKafkaEvent) { + let json = unsafe { CStr::from_ptr(rdsys::rd_kafka_event_stats(event)) }; + self.context().stats_raw(json.to_bytes()); + } + + fn handle_error_event(&self, event: *mut RDKafkaEvent) { + let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event) }; + let error = KafkaError::Global(rdkafka_err.into()); + let reason = + unsafe { CStr::from_ptr(rdsys::rd_kafka_event_error_string(event)).to_string_lossy() }; + self.context().error(error, reason.trim()); + } + + fn handle_oauth_refresh_event(&self, event: *mut RDKafkaEvent) { + let oauthbearer_config = unsafe { rdsys::rd_kafka_event_config_string(event) }; + let res: Result<_, Box> = (|| { + let oauthbearer_config = match oauthbearer_config.is_null() { + true => None, + false => unsafe { Some(util::cstr_to_owned(oauthbearer_config)) }, + }; + let token_info = self + .context() + .generate_oauth_token(oauthbearer_config.as_deref())?; + let token = CString::new(token_info.token)?; + let principal_name = CString::new(token_info.principal_name)?; + Ok((token, principal_name, token_info.lifetime_ms)) + })(); + match res { + Ok((token, principal_name, lifetime_ms)) => { + let mut err_buf = ErrBuf::new(); + let code = unsafe { + rdkafka_sys::rd_kafka_oauthbearer_set_token( + self.native_ptr(), + token.as_ptr(), + lifetime_ms, + principal_name.as_ptr(), + ptr::null_mut(), + 0, + err_buf.as_mut_ptr(), + err_buf.capacity(), + ) + }; + if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR { + debug!("successfully set refreshed OAuth token"); + } else { + debug!( + "failed to set refreshed OAuth token (code {:?}): {}", + code, err_buf + ); + unsafe { + rdkafka_sys::rd_kafka_oauthbearer_set_token_failure( + self.native_ptr(), + err_buf.as_mut_ptr(), + ) + }; + } + } + Err(e) => { + debug!("failed to refresh OAuth token: {}", e); + let message = match CString::new(e.to_string()) { + Ok(message) => message, + Err(e) => { + error!("error message generated while refreshing OAuth token has embedded null character: {}", e); + CString::new( + "error while refreshing OAuth token has embedded null character", + ) + .expect("known to be a valid CString") + } + }; + unsafe { + rdkafka_sys::rd_kafka_oauthbearer_set_token_failure( + self.native_ptr(), + message.as_ptr(), + ) + }; + } + } + } + /// Returns the metadata information for the specified topic, or for all topics in the cluster /// if no topic is specified. pub fn fetch_metadata>( @@ -442,6 +547,11 @@ impl Client { pub(crate) fn consumer_queue(&self) -> Option { unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_consumer(self.native_ptr())) } } + + /// Returns a NativeQueue for the main librdkafka event queue from the current client. + pub(crate) fn main_queue(&self) -> NativeQueue { + unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_main(self.native_ptr())).unwrap() } + } } pub(crate) type NativeTopic = NativePtr; @@ -471,48 +581,6 @@ impl NativeQueue { } } -pub(crate) unsafe extern "C" fn native_log_cb( - client: *const RDKafka, - level: i32, - fac: *const c_char, - buf: *const c_char, -) { - let fac = CStr::from_ptr(fac).to_string_lossy(); - let log_message = CStr::from_ptr(buf).to_string_lossy(); - - let context = &mut *(rdsys::rd_kafka_opaque(client) as *mut C); - context.log( - RDKafkaLogLevel::from_int(level), - fac.trim(), - log_message.trim(), - ); -} - -pub(crate) unsafe extern "C" fn native_stats_cb( - _conf: *mut RDKafka, - json: *mut c_char, - json_len: usize, - opaque: *mut c_void, -) -> i32 { - let context = &mut *(opaque as *mut C); - context.stats_raw(slice::from_raw_parts(json as *mut u8, json_len)); - 0 // librdkafka will free the json buffer -} - -pub(crate) unsafe extern "C" fn native_error_cb( - _client: *mut RDKafka, - err: i32, - reason: *const c_char, - opaque: *mut c_void, -) { - let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t"); - let error = KafkaError::Global(err.into()); - let reason = CStr::from_ptr(reason).to_string_lossy(); - - let context = &mut *(opaque as *mut C); - context.error(error, reason.trim()); -} - /// A generated OAuth token and its associated metadata. /// /// When using the `OAUTHBEARER` SASL authentication method, this type is @@ -529,60 +597,6 @@ pub struct OAuthToken { pub lifetime_ms: i64, } -pub(crate) unsafe extern "C" fn native_oauth_refresh_cb( - client: *mut RDKafka, - oauthbearer_config: *const c_char, - opaque: *mut c_void, -) { - let res: Result<_, Box> = (|| { - let context = &mut *(opaque as *mut C); - let oauthbearer_config = match oauthbearer_config.is_null() { - true => None, - false => Some(util::cstr_to_owned(oauthbearer_config)), - }; - let token_info = context.generate_oauth_token(oauthbearer_config.as_deref())?; - let token = CString::new(token_info.token)?; - let principal_name = CString::new(token_info.principal_name)?; - Ok((token, principal_name, token_info.lifetime_ms)) - })(); - match res { - Ok((token, principal_name, lifetime_ms)) => { - let mut err_buf = ErrBuf::new(); - let code = rdkafka_sys::rd_kafka_oauthbearer_set_token( - client, - token.as_ptr(), - lifetime_ms, - principal_name.as_ptr(), - ptr::null_mut(), - 0, - err_buf.as_mut_ptr(), - err_buf.capacity(), - ); - if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR { - debug!("successfully set refreshed OAuth token"); - } else { - debug!( - "failed to set refreshed OAuth token (code {:?}): {}", - code, err_buf - ); - rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, err_buf.as_mut_ptr()); - } - } - Err(e) => { - debug!("failed to refresh OAuth token: {}", e); - let message = match CString::new(e.to_string()) { - Ok(message) => message, - Err(e) => { - error!("error message generated while refreshing OAuth token has embedded null character: {}", e); - CString::new("error while refreshing OAuth token has embedded null character") - .expect("known to be a valid CString") - } - }; - rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, message.as_ptr()); - } - } -} - #[cfg(test)] mod tests { // Just call everything to test there no panics by default, behavior diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index ee03b906b..1b3bcdfe9 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -1,16 +1,17 @@ //! Low-level consumers. -use std::cmp; -use std::ffi::CString; +use std::ffi::{CStr, CString}; use std::mem::ManuallyDrop; use std::os::raw::c_void; use std::ptr; use std::sync::Arc; +use std::time::Duration; +use log::warn; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -use crate::client::{Client, NativeClient, NativeQueue}; +use crate::client::{Client, NativeQueue}; use crate::config::{ ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig, }; @@ -26,41 +27,6 @@ use crate::metadata::Metadata; use crate::topic_partition_list::{Offset, TopicPartitionList}; use crate::util::{cstr_to_owned, NativePtr, Timeout}; -pub(crate) unsafe extern "C" fn native_commit_cb( - _conf: *mut RDKafka, - err: RDKafkaRespErr, - offsets: *mut RDKafkaTopicPartitionList, - opaque_ptr: *mut c_void, -) { - let context = &mut *(opaque_ptr as *mut C); - let commit_error = if err.is_error() { - Err(KafkaError::ConsumerCommit(err.into())) - } else { - Ok(()) - }; - if offsets.is_null() { - let tpl = TopicPartitionList::new(); - context.commit_callback(commit_error, &tpl); - } else { - let tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(offsets)); - context.commit_callback(commit_error, &tpl); - } -} - -/// Native rebalance callback. This callback will run on every rebalance, and it will call the -/// rebalance method defined in the current `Context`. -unsafe extern "C" fn native_rebalance_cb( - rk: *mut RDKafka, - err: RDKafkaRespErr, - native_tpl: *mut RDKafkaTopicPartitionList, - opaque_ptr: *mut c_void, -) { - let context = &mut *(opaque_ptr as *mut C); - let native_client = ManuallyDrop::new(NativeClient::from_ptr(rk)); - let mut tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(native_tpl)); - context.rebalance(&native_client, err, &mut tpl); -} - /// A low-level consumer that requires manual polling. /// /// This consumer must be periodically polled to make progress on rebalancing, @@ -70,7 +36,8 @@ where C: ConsumerContext, { client: Client, - main_queue_min_poll_interval: Timeout, + queue: Arc, + static_member: bool, } impl FromClientConfig for BaseConsumer { @@ -95,58 +62,50 @@ where native_config: NativeClientConfig, context: C, ) -> KafkaResult> { + let mut static_member = false; + if let Some(group_instance_id) = config.get("group.instance.id") { + if !group_instance_id.is_empty() { + static_member = true; + } + } unsafe { - rdsys::rd_kafka_conf_set_rebalance_cb( - native_config.ptr(), - Some(native_rebalance_cb::), - ); - rdsys::rd_kafka_conf_set_offset_commit_cb( + rdsys::rd_kafka_conf_set_events( native_config.ptr(), - Some(native_commit_cb::), - ); - } - let main_queue_min_poll_interval = context.main_queue_min_poll_interval(); + rdsys::RD_KAFKA_EVENT_REBALANCE + | rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT + | rdsys::RD_KAFKA_EVENT_STATS + | rdsys::RD_KAFKA_EVENT_ERROR + | rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH, + ) + }; let client = Client::new( config, native_config, RDKafkaType::RD_KAFKA_CONSUMER, context, )?; + + // Redirect rdkafka's main queue to the consumer queue so that we only + // need to listen to the consumer queue to observe events like + // rebalancings and stats. + unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) }; + let queue = Arc::new(client.consumer_queue().ok_or(KafkaError::ClientCreation( + "rdkafka consumer queue not available".to_string(), + ))?); Ok(BaseConsumer { client, - main_queue_min_poll_interval, + queue, + static_member, }) } - /// Polls the consumer for messages and returns a pointer to the native rdkafka-sys struct. - /// This method is for internal use only. Use poll instead. - pub(crate) fn poll_raw(&self, mut timeout: Timeout) -> Option> { - loop { - unsafe { rdsys::rd_kafka_poll(self.client.native_ptr(), 0) }; - let op_timeout = cmp::min(timeout, self.main_queue_min_poll_interval); - let message_ptr = unsafe { - NativePtr::from_ptr(rdsys::rd_kafka_consumer_poll( - self.client.native_ptr(), - op_timeout.as_millis(), - )) - }; - if let Some(message_ptr) = message_ptr { - break Some(message_ptr); - } - if op_timeout >= timeout { - break None; - } - timeout -= op_timeout; - } - } - /// Polls the consumer for new messages. /// /// It won't block for more than the specified timeout. Use zero `Duration` for non-blocking /// call. With no timeout it blocks until an event is received. /// /// This method should be called at regular intervals, even if no message is expected, - /// to serve any queued callbacks waiting to be called. This is especially important for + /// to serve any queued events waiting to be handled. This is especially important for /// automatic consumer rebalance, as the rebalance function will be executed by the thread /// calling the poll() function. /// @@ -154,8 +113,84 @@ where /// /// The returned message lives in the memory of the consumer and cannot outlive it. pub fn poll>(&self, timeout: T) -> Option>> { - self.poll_raw(timeout.into()) - .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, self) }) + let event = Arc::new( + self.client() + .poll_event(self.queue.clone(), timeout.into())?, + ); + let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + match evtype { + rdsys::RD_KAFKA_EVENT_FETCH => self.handle_fetch_event(event.clone()), + rdsys::RD_KAFKA_EVENT_REBALANCE => { + self.handle_rebalance_event(event.clone()); + None + } + rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => { + self.handle_offset_commit_event(event.clone()); + None + } + _ => { + let buf = unsafe { + let evname = rdsys::rd_kafka_event_name(event.ptr()); + CStr::from_ptr(evname).to_bytes() + }; + let evname = String::from_utf8(buf.to_vec()).unwrap(); + warn!("Ignored event '{}' on consumer poll", evname); + None + } + } + } + + fn handle_fetch_event( + &self, + event: Arc>, + ) -> Option>> { + unsafe { + NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _) + .map(|ptr| BorrowedMessage::from_fetch_event(ptr, event.clone())) + } + } + + fn handle_rebalance_event(&self, event: Arc>) { + let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) }; + match err { + rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS + | rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => { + let tpl = unsafe { + let native_tpl = rdsys::rd_kafka_event_topic_partition_list(event.ptr()); + TopicPartitionList::from_ptr(native_tpl) + }; + let mut tpl = ManuallyDrop::new(tpl); + self.context() + .rebalance(self.client.native_client(), err, &mut tpl); + } + _ => { + let buf = unsafe { + let err_name = + rdsys::rd_kafka_err2name(rdsys::rd_kafka_event_error(event.ptr())); + CStr::from_ptr(err_name).to_bytes() + }; + let err = String::from_utf8(buf.to_vec()).unwrap(); + warn!("invalid rebalance event: {:?}", err); + } + } + } + + fn handle_offset_commit_event(&self, event: Arc>) { + let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) }; + let commit_error = if err.is_error() { + Err(KafkaError::ConsumerCommit(err.into())) + } else { + Ok(()) + }; + + let offsets = unsafe { rdsys::rd_kafka_event_topic_partition_list(event.ptr()) }; + if offsets.is_null() { + let tpl = TopicPartitionList::new(); + self.context().commit_callback(commit_error, &tpl); + } else { + let tpl = ManuallyDrop::new(unsafe { TopicPartitionList::from_ptr(offsets) }); + self.context().commit_callback(commit_error, &tpl); + } } /// Returns an iterator over the available messages. @@ -607,7 +642,31 @@ where C: ConsumerContext, { fn drop(&mut self) { - trace!("Destroying consumer: {:?}", self.client.native_ptr()); // TODO: fix me (multiple executions ?) + trace!("Destroying consumer: {:?}", self.client.native_ptr()); + // If this consumer is configured for static membership, do not explicitly unsubscribe from + // the group. Note that static members will *not* receive a final revoke event when they + // shutdown. + if !self.static_member { + // We use the Event API rather than the Callback API. + // As we don't register a rebalance_cb, rd_kafka_consumer_close() + // will shortcut the rebalance_cb and do a forced unassign. + // This is undesired as the application might need the final + // revoke events before shutting down. Hence, we trigger + // an Unsubscribe() first, wait for that to propagate, and then + // close the consumer. + unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) }; + + // Poll for rebalance events + loop { + self.poll(Duration::from_secs(5)); + let qlen = unsafe { rdsys::rd_kafka_queue_length(self.queue.ptr()) }; + if qlen == 0 { + break; + } + } + } + + // TODO(sam): do we need to destroy the queue before calling close? unsafe { rdsys::rd_kafka_consumer_close(self.client.native_ptr()) }; trace!("Consumer destroyed: {:?}", self.client.native_ptr()); } @@ -677,7 +736,7 @@ where /// /// Remember that you must also call [`BaseConsumer::poll`] on the /// associated consumer regularly, even if no messages are expected, to - /// serve callbacks. + /// serve events. pub fn poll>(&self, timeout: T) -> Option>> { unsafe { NativePtr::from_ptr(rdsys::rd_kafka_consume_queue( diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 0c959f329..fd59fd138 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -200,11 +200,6 @@ where let base = BaseConsumer::new(config, native_config, context)?; let native_ptr = base.client().native_ptr() as usize; - // Redirect rdkafka's main queue to the consumer queue so that we only - // need to listen to the consumer queue to observe events like - // rebalancings and stats. - unsafe { rdsys::rd_kafka_poll_set_consumer(base.client().native_ptr()) }; - let queue = base.client().consumer_queue().ok_or_else(|| { KafkaError::ClientCreation("librdkafka failed to create consumer queue".into()) })?; diff --git a/src/message.rs b/src/message.rs index 0f47baebe..08733903c 100644 --- a/src/message.rs +++ b/src/message.rs @@ -6,11 +6,13 @@ use std::marker::PhantomData; use std::os::raw::c_void; use std::ptr; use std::str; +use std::sync::Arc; use std::time::SystemTime; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; +use crate::admin::NativeEvent; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr}; @@ -306,12 +308,15 @@ impl Headers for BorrowedHeaders { /// [`detach`](BorrowedMessage::detach) method. pub struct BorrowedMessage<'a> { ptr: NativePtr, + _event: Option>, _owner: PhantomData<&'a u8>, } +unsafe extern "C" fn no_op(_: *mut RDKafkaMessage) {} + unsafe impl KafkaDrop for RDKafkaMessage { const TYPE: &'static str = "message"; - const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_message_destroy; + const DROP: unsafe extern "C" fn(*mut Self) = no_op; } impl<'a> fmt::Debug for BorrowedMessage<'a> { @@ -342,22 +347,51 @@ impl<'a> BorrowedMessage<'a> { } else { Ok(BorrowedMessage { ptr, + _event: None, + _owner: PhantomData, + }) + } + } + + /// Creates a new `BorrowedMessage` that wraps the native Kafka message + /// pointer returned by a consumer. The lifetime of the message will be + /// bound to the lifetime of the event passed as parameter. If the message + /// contains an error, only the error is returned and the message structure + /// is freed. + pub(crate) unsafe fn from_fetch_event( + ptr: NativePtr, + event: Arc, + ) -> KafkaResult> { + if ptr.err.is_error() { + let err = match ptr.err { + rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => { + KafkaError::PartitionEOF((*ptr).partition) + } + e => KafkaError::MessageConsumption(e.into()), + }; + Err(err) + } else { + Ok(BorrowedMessage { + ptr, + _event: Some(event), + // TODO(sam): what does it mean this when the event holds the ownership? _owner: PhantomData, }) } } /// Creates a new `BorrowedMessage` that wraps the native Kafka message - /// pointer returned by the delivery callback of a producer. The lifetime of - /// the message will be bound to the lifetime of the reference passed as - /// parameter. This method should only be used with messages coming from the - /// delivery callback. The message will not be freed in any circumstance. - pub(crate) unsafe fn from_dr_callback( + /// pointer returned via the delivery report event. The lifetime of + /// the message will be bound to the lifetime of the event passed as + /// parameter. The message will not be freed in any circumstance. + pub(crate) unsafe fn from_dr_event( ptr: *mut RDKafkaMessage, - _owner: &'a O, + event: Arc, ) -> DeliveryResult<'a> { let borrowed_message = BorrowedMessage { ptr: NativePtr::from_ptr(ptr).unwrap(), + _event: Some(event), + // TODO(sam): what does it mean this when the event holds the ownership? _owner: PhantomData, }; if (*ptr).err.is_error() { diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 48acd925d..1c6a3ab38 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -57,7 +57,7 @@ use rdkafka_sys as rdsys; use rdkafka_sys::rd_kafka_vtype_t::*; use rdkafka_sys::types::*; -use crate::client::Client; +use crate::client::{Client, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::consumer::ConsumerGroupMetadata; use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError}; @@ -67,33 +67,12 @@ use crate::producer::{ DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig, }; use crate::topic_partition_list::TopicPartitionList; -use crate::util::{IntoOpaque, Timeout}; +use crate::util::{IntoOpaque, NativePtr, Timeout}; pub use crate::message::DeliveryResult; use super::NoCustomPartitioner; -/// Callback that gets called from librdkafka every time a message succeeds or fails to be -/// delivered. -unsafe extern "C" fn delivery_cb>( - _client: *mut RDKafka, - msg: *const RDKafkaMessage, - opaque: *mut c_void, -) { - let producer_context = &mut *(opaque as *mut C); - let delivery_opaque = C::DeliveryOpaque::from_ptr((*msg)._private); - let owner = 42u8; - // Wrap the message pointer into a BorrowedMessage that will only live for the body of this - // function. - let delivery_result = BorrowedMessage::from_dr_callback(msg as *mut RDKafkaMessage, &owner); - trace!("Delivery event received: {:?}", delivery_result); - producer_context.delivery(&delivery_result, delivery_opaque); - match delivery_result { - // Do not free the message, librdkafka will do it for us - Ok(message) | Err((_, message)) => mem::forget(message), - } -} - // // ********** BASE PRODUCER ********** // @@ -294,7 +273,13 @@ where } unsafe { - rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::)) + rdsys::rd_kafka_conf_set_events( + native_config.ptr(), + rdsys::RD_KAFKA_EVENT_DR + | rdsys::RD_KAFKA_EVENT_STATS + | rdsys::RD_KAFKA_EVENT_ERROR + | rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH, + ) }; let client = Client::new_context_arc( config, @@ -351,6 +336,7 @@ where C: ProducerContext, { client: Client, + queue: Arc, _partitioner: PhantomData, } @@ -361,18 +347,57 @@ where { /// Creates a base producer starting from a Client. fn from_client(client: Client) -> BaseProducer { + let queue = Arc::new(client.main_queue()); BaseProducer { client, + queue, _partitioner: PhantomData, } } - /// Polls the producer, returning the number of events served. + /// Polls the producer /// /// Regular calls to `poll` are required to process the events and execute /// the message delivery callbacks. - pub fn poll>(&self, timeout: T) -> i32 { - unsafe { rdsys::rd_kafka_poll(self.native_ptr(), timeout.into().as_millis()) } + pub fn poll>(&self, timeout: T) { + let event = self.client().poll_event(self.queue.clone(), timeout.into()); + if let Some(ev) = event { + let ev = Arc::new(ev); + let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; + match evtype { + rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev), + _ => { + let buf = unsafe { + let evname = rdsys::rd_kafka_event_name(ev.ptr()); + CStr::from_ptr(evname).to_bytes() + }; + let evname = String::from_utf8(buf.to_vec()).unwrap(); + warn!("Ignored event '{}' on base producer poll", evname); + } + } + } + } + + fn handle_delivery_report_event(&self, event: Arc>) { + let max_messages = unsafe { rdsys::rd_kafka_event_message_count(event.ptr()) }; + let messages: Vec<*const RDKafkaMessage> = Vec::with_capacity(max_messages); + + let mut messages = mem::ManuallyDrop::new(messages); + let messages = unsafe { + let msgs_cnt = rdsys::rd_kafka_event_message_array( + event.ptr(), + messages.as_mut_ptr(), + max_messages, + ); + Vec::from_raw_parts(messages.as_mut_ptr(), msgs_cnt, max_messages) + }; + + for msg in messages { + let delivery_result = + unsafe { BorrowedMessage::from_dr_event(msg as *mut _, event.clone()) }; + let delivery_opaque = unsafe { C::DeliveryOpaque::from_ptr((*msg)._private) }; + self.context().delivery(&delivery_result, delivery_opaque); + } } /// Returns a pointer to the native Kafka client. @@ -618,15 +643,11 @@ where .spawn(move || { trace!("Polling thread loop started"); loop { - let n = producer.poll(Duration::from_millis(100)); - if n == 0 { - if should_stop.load(Ordering::Relaxed) { - // We received nothing and the thread should - // stop, so break the loop. - break; - } - } else { - trace!("Received {} events", n); + producer.poll(Duration::from_millis(100)); + if should_stop.load(Ordering::Relaxed) { + // We received nothing and the thread should + // stop, so break the loop. + break; } } trace!("Polling thread loop terminated"); From b527a3ee648c233951fee9c66bc8162b14b6fb3a Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Tue, 17 Oct 2023 10:54:17 +0200 Subject: [PATCH 02/24] Adapt the StreamConsumer to poll the underlying BaseConsumer --- src/consumer/base_consumer.rs | 4 ++++ src/consumer/stream_consumer.rs | 40 ++++++++++++--------------------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 1b3bcdfe9..f4c56aa30 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -237,6 +237,10 @@ where Iter(self) } + pub(crate) fn get_queue(&self) -> Arc { + self.queue.clone() + } + /// Splits messages for the specified partition into their own queue. /// /// If the `topic` or `partition` is invalid, returns `None`. diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index fd59fd138..df01e56f5 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -38,7 +38,7 @@ unsafe extern "C" fn native_message_queue_nonempty_cb(_: *mut RDKafka, opaque_pt wakers.wake_all(); } -unsafe fn enable_nonempty_callback(queue: &NativeQueue, wakers: &Arc) { +unsafe fn enable_nonempty_callback(queue: &Arc, wakers: &Arc) { rdsys::rd_kafka_queue_cb_event_enable( queue.ptr(), Some(native_message_queue_nonempty_cb), @@ -89,31 +89,24 @@ impl WakerSlab { /// A stream of messages from a [`StreamConsumer`]. /// /// See the documentation of [`StreamConsumer::stream`] for details. -pub struct MessageStream<'a> { +pub struct MessageStream<'a, C: ConsumerContext> { wakers: &'a WakerSlab, - queue: &'a NativeQueue, + base: Arc>, slot: usize, } -impl<'a> MessageStream<'a> { - fn new(wakers: &'a WakerSlab, queue: &'a NativeQueue) -> MessageStream<'a> { +impl<'a, C: ConsumerContext> MessageStream<'a, C> { + fn new(wakers: &'a WakerSlab, base: Arc>) -> MessageStream<'a, C> { let slot = wakers.register(); - MessageStream { - wakers, - queue, - slot, - } + MessageStream { wakers, base, slot } } fn poll(&self) -> Option>> { - unsafe { - NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(self.queue.ptr(), 0)) - .map(|p| BorrowedMessage::from_consumer(p, self.queue)) - } + self.base.poll(Duration::ZERO) } } -impl<'a> Stream for MessageStream<'a> { +impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> { type Item = KafkaResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -140,7 +133,7 @@ impl<'a> Stream for MessageStream<'a> { } } -impl<'a> Drop for MessageStream<'a> { +impl<'a, C: ConsumerContext> Drop for MessageStream<'a, C> { fn drop(&mut self) { self.wakers.unregister(self.slot); } @@ -165,8 +158,7 @@ pub struct StreamConsumer where C: ConsumerContext, { - queue: NativeQueue, // queue must be dropped before the base to avoid deadlock - base: BaseConsumer, + base: Arc>, wakers: Arc, _shutdown_trigger: oneshot::Sender<()>, _runtime: PhantomData, @@ -197,14 +189,11 @@ where Duration::from_millis(millis) }; - let base = BaseConsumer::new(config, native_config, context)?; + let base = Arc::new(BaseConsumer::new(config, native_config, context)?); let native_ptr = base.client().native_ptr() as usize; - let queue = base.client().consumer_queue().ok_or_else(|| { - KafkaError::ClientCreation("librdkafka failed to create consumer queue".into()) - })?; let wakers = Arc::new(WakerSlab::new()); - unsafe { enable_nonempty_callback(&queue, &wakers) } + unsafe { enable_nonempty_callback(&base.get_queue(), &wakers) } // We need to make sure we poll the consumer at least once every max // poll interval, *unless* the processing task has wedged. To accomplish @@ -236,7 +225,6 @@ where Ok(StreamConsumer { base, wakers, - queue, _shutdown_trigger: shutdown_trigger, _runtime: PhantomData, }) @@ -259,8 +247,8 @@ where /// /// If you want multiple independent views of a Kafka topic, create multiple /// consumers, not multiple message streams. - pub fn stream(&self) -> MessageStream<'_> { - MessageStream::new(&self.wakers, &self.queue) + pub fn stream(&self) -> MessageStream<'_, C> { + MessageStream::new(&self.wakers, self.base.clone()) } /// Receives the next message from the stream. From b897ec949c71480b9f67b671d52bf6ef08445112 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Wed, 18 Oct 2023 11:03:57 +0200 Subject: [PATCH 03/24] Pass arc by value rather than reference and fix generic type. --- src/consumer/stream_consumer.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index df01e56f5..14326a853 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -38,7 +38,7 @@ unsafe extern "C" fn native_message_queue_nonempty_cb(_: *mut RDKafka, opaque_pt wakers.wake_all(); } -unsafe fn enable_nonempty_callback(queue: &Arc, wakers: &Arc) { +unsafe fn enable_nonempty_callback(queue: Arc, wakers: &Arc) { rdsys::rd_kafka_queue_cb_event_enable( queue.ptr(), Some(native_message_queue_nonempty_cb), @@ -193,7 +193,7 @@ where let native_ptr = base.client().native_ptr() as usize; let wakers = Arc::new(WakerSlab::new()); - unsafe { enable_nonempty_callback(&base.get_queue(), &wakers) } + unsafe { enable_nonempty_callback(base.get_queue(), &wakers) } // We need to make sure we poll the consumer at least once every max // poll interval, *unless* the processing task has wedged. To accomplish @@ -332,7 +332,7 @@ where let wakers = Arc::new(WakerSlab::new()); unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()); - enable_nonempty_callback(&queue, &wakers); + enable_nonempty_callback(Arc::new(queue), &wakers); } StreamPartitionQueue { queue, @@ -555,7 +555,7 @@ where /// /// If you want multiple independent views of a Kafka partition, create /// multiple consumers, not multiple partition streams. - pub fn stream(&self) -> MessageStream<'_> { + pub fn stream(&self) -> MessageStream<'_, C> { MessageStream::new(&self.wakers, &self.queue) } From ebe7e9cd3b0a1d15cd7455345f881d5e9a1d95b6 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 20 Oct 2023 12:58:59 +0200 Subject: [PATCH 04/24] Refactor to use references and lifetimes rather than Arc. --- src/client.rs | 2 +- src/consumer/base_consumer.rs | 33 +++++++------- src/message.rs | 86 +++++++++++++---------------------- src/producer/base_producer.rs | 14 +++--- 4 files changed, 55 insertions(+), 80 deletions(-) diff --git a/src/client.rs b/src/client.rs index 635dcb451..e71482e99 100644 --- a/src/client.rs +++ b/src/client.rs @@ -280,7 +280,7 @@ impl Client { pub(crate) fn poll_event( &self, - queue: Arc, + queue: &NativeQueue, timeout: Timeout, ) -> Option { let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) }; diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index f4c56aa30..dd2c1db95 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -36,7 +36,7 @@ where C: ConsumerContext, { client: Client, - queue: Arc, + queue: NativeQueue, static_member: bool, } @@ -89,9 +89,9 @@ where // need to listen to the consumer queue to observe events like // rebalancings and stats. unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) }; - let queue = Arc::new(client.consumer_queue().ok_or(KafkaError::ClientCreation( + let queue = client.consumer_queue().ok_or(KafkaError::ClientCreation( "rdkafka consumer queue not available".to_string(), - ))?); + ))?; Ok(BaseConsumer { client, queue, @@ -113,19 +113,18 @@ where /// /// The returned message lives in the memory of the consumer and cannot outlive it. pub fn poll>(&self, timeout: T) -> Option>> { - let event = Arc::new( - self.client() - .poll_event(self.queue.clone(), timeout.into())?, - ); + let event = self + .client() + .poll_event(self.get_queue(), timeout.into())?; let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; match evtype { - rdsys::RD_KAFKA_EVENT_FETCH => self.handle_fetch_event(event.clone()), + rdsys::RD_KAFKA_EVENT_FETCH => self.handle_fetch_event(event), rdsys::RD_KAFKA_EVENT_REBALANCE => { - self.handle_rebalance_event(event.clone()); + self.handle_rebalance_event(event); None } rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => { - self.handle_offset_commit_event(event.clone()); + self.handle_offset_commit_event(event); None } _ => { @@ -142,15 +141,15 @@ where fn handle_fetch_event( &self, - event: Arc>, + event: NativePtr, ) -> Option>> { unsafe { NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _) - .map(|ptr| BorrowedMessage::from_fetch_event(ptr, event.clone())) + .map(|ptr| BorrowedMessage::from_client(ptr, event, self.client())) } } - fn handle_rebalance_event(&self, event: Arc>) { + fn handle_rebalance_event(&self, event: NativePtr) { let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) }; match err { rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS @@ -175,7 +174,7 @@ where } } - fn handle_offset_commit_event(&self, event: Arc>) { + fn handle_offset_commit_event(&self, event: NativePtr) { let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) }; let commit_error = if err.is_error() { Err(KafkaError::ConsumerCommit(err.into())) @@ -237,8 +236,8 @@ where Iter(self) } - pub(crate) fn get_queue(&self) -> Arc { - self.queue.clone() + pub(crate) fn get_queue(&self) -> &NativeQueue { + &self.queue } /// Splits messages for the specified partition into their own queue. @@ -717,7 +716,7 @@ where C: ConsumerContext, { consumer: Arc>, - queue: NativeQueue, + pub(crate) queue: NativeQueue, nonempty_callback: Option>>, } diff --git a/src/message.rs b/src/message.rs index 08733903c..e83971382 100644 --- a/src/message.rs +++ b/src/message.rs @@ -6,7 +6,6 @@ use std::marker::PhantomData; use std::os::raw::c_void; use std::ptr; use std::str; -use std::sync::Arc; use std::time::SystemTime; use rdkafka_sys as rdsys; @@ -308,7 +307,7 @@ impl Headers for BorrowedHeaders { /// [`detach`](BorrowedMessage::detach) method. pub struct BorrowedMessage<'a> { ptr: NativePtr, - _event: Option>, + _event: NativeEvent, _owner: PhantomData<&'a u8>, } @@ -332,9 +331,10 @@ impl<'a> BorrowedMessage<'a> { /// should only be used with messages coming from consumers. If the message /// contains an error, only the error is returned and the message structure /// is freed. - pub(crate) unsafe fn from_consumer( + pub(crate) unsafe fn from_client( ptr: NativePtr, - _consumer: &'a C, + event: NativeEvent, + _client: &'a C, ) -> KafkaResult> { if ptr.err.is_error() { let err = match ptr.err { @@ -347,62 +347,38 @@ impl<'a> BorrowedMessage<'a> { } else { Ok(BorrowedMessage { ptr, - _event: None, + _event: event, _owner: PhantomData, }) } } - /// Creates a new `BorrowedMessage` that wraps the native Kafka message - /// pointer returned by a consumer. The lifetime of the message will be - /// bound to the lifetime of the event passed as parameter. If the message - /// contains an error, only the error is returned and the message structure - /// is freed. - pub(crate) unsafe fn from_fetch_event( - ptr: NativePtr, - event: Arc, - ) -> KafkaResult> { - if ptr.err.is_error() { - let err = match ptr.err { - rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => { - KafkaError::PartitionEOF((*ptr).partition) - } - e => KafkaError::MessageConsumption(e.into()), - }; - Err(err) - } else { - Ok(BorrowedMessage { - ptr, - _event: Some(event), - // TODO(sam): what does it mean this when the event holds the ownership? - _owner: PhantomData, - }) - } - } - - /// Creates a new `BorrowedMessage` that wraps the native Kafka message - /// pointer returned via the delivery report event. The lifetime of - /// the message will be bound to the lifetime of the event passed as - /// parameter. The message will not be freed in any circumstance. - pub(crate) unsafe fn from_dr_event( - ptr: *mut RDKafkaMessage, - event: Arc, - ) -> DeliveryResult<'a> { - let borrowed_message = BorrowedMessage { - ptr: NativePtr::from_ptr(ptr).unwrap(), - _event: Some(event), - // TODO(sam): what does it mean this when the event holds the ownership? - _owner: PhantomData, - }; - if (*ptr).err.is_error() { - Err(( - KafkaError::MessageProduction((*ptr).err.into()), - borrowed_message, - )) - } else { - Ok(borrowed_message) - } - } + /// Creates a new `BorrowedMessage` that wraps the native Kafka message + /// pointer returned via the delivery report event. The lifetime of + /// the message will be bound to the lifetime of the event passed as + /// parameter. The message will not be freed in any circumstance. + pub(crate) unsafe fn from_dr_event( + ptr: *mut RDKafkaMessage, + //ptr: NativePtr, + event: *mut RDKafkaEvent, + _client: &'a C, + ) -> DeliveryResult<'a> { + let borrowed_message = BorrowedMessage { + ptr: NativePtr::from_ptr(ptr).unwrap(), + //ptr, + _event: NativePtr::from_ptr(event).unwrap(), + // TODO(sam): what does it mean this when the event holds the ownership? + _owner: PhantomData, + }; + if (*ptr).err.is_error() { + Err(( + KafkaError::MessageProduction((*ptr).err.into()), + borrowed_message, + )) + } else { + Ok(borrowed_message) + } + } /// Returns a pointer to the [`RDKafkaMessage`]. pub fn ptr(&self) -> *mut RDKafkaMessage { diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 1c6a3ab38..d1de5052c 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -336,7 +336,7 @@ where C: ProducerContext, { client: Client, - queue: Arc, + queue: NativeQueue, _partitioner: PhantomData, } @@ -347,7 +347,7 @@ where { /// Creates a base producer starting from a Client. fn from_client(client: Client) -> BaseProducer { - let queue = Arc::new(client.main_queue()); + let queue = client.main_queue(); BaseProducer { client, queue, @@ -360,9 +360,8 @@ where /// Regular calls to `poll` are required to process the events and execute /// the message delivery callbacks. pub fn poll>(&self, timeout: T) { - let event = self.client().poll_event(self.queue.clone(), timeout.into()); + let event = self.client().poll_event(&self.queue, timeout.into()); if let Some(ev) = event { - let ev = Arc::new(ev); let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; match evtype { rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev), @@ -378,7 +377,7 @@ where } } - fn handle_delivery_report_event(&self, event: Arc>) { + fn handle_delivery_report_event(&self, event: NativePtr) { let max_messages = unsafe { rdsys::rd_kafka_event_message_count(event.ptr()) }; let messages: Vec<*const RDKafkaMessage> = Vec::with_capacity(max_messages); @@ -393,8 +392,9 @@ where }; for msg in messages { - let delivery_result = - unsafe { BorrowedMessage::from_dr_event(msg as *mut _, event.clone()) }; + let delivery_result = unsafe { + BorrowedMessage::from_dr_event(msg as *mut _, event.ptr(), self.client()) + }; let delivery_opaque = unsafe { C::DeliveryOpaque::from_ptr((*msg)._private) }; self.context().delivery(&delivery_result, delivery_opaque); } From 39dec2837eb34e868c8cbf5fa47041d25689e2a0 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Sun, 22 Oct 2023 23:56:41 +0200 Subject: [PATCH 05/24] Work on supporting StreamConsumer via lifetimes instead of Arc. The `Arc>` to `MessageStream` isn't necessary anymore, and the changes to `split_partition_queue` can be reverted as well I think. --- src/consumer/base_consumer.rs | 26 ++++++--- src/consumer/stream_consumer.rs | 95 +++++++++++++++++++-------------- 2 files changed, 74 insertions(+), 47 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index dd2c1db95..1f33644d5 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -741,13 +741,27 @@ where /// associated consumer regularly, even if no messages are expected, to /// serve events. pub fn poll>(&self, timeout: T) -> Option>> { - unsafe { - NativePtr::from_ptr(rdsys::rd_kafka_consume_queue( - self.queue.ptr(), - timeout.into().as_millis(), - )) + let event = self.consumer + .client() + .poll_event(&self.queue, timeout.into())?; + let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + match evtype { + rdsys::RD_KAFKA_EVENT_FETCH => { + unsafe { + NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _) + .map(|ptr| BorrowedMessage::from_client(ptr, event, self)) + } + } + _ => { + let buf = unsafe { + let evname = rdsys::rd_kafka_event_name(event.ptr()); + CStr::from_ptr(evname).to_bytes() + }; + let evname = String::from_utf8(buf.to_vec()).unwrap(); + warn!("Ignored event '{}' on consumer poll", evname); + None + } } - .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, &self.consumer) }) } /// Sets a callback that will be invoked whenever the queue becomes diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 14326a853..09ca27694 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -1,6 +1,6 @@ //! High-level consumers with a [`Stream`](futures_util::Stream) interface. -use std::ffi::CString; +use std::ffi::CStr; use std::marker::PhantomData; use std::os::raw::c_void; use std::pin::Pin; @@ -14,14 +14,16 @@ use futures_channel::oneshot; use futures_util::future::{self, Either, FutureExt}; use futures_util::pin_mut; use futures_util::stream::{Stream, StreamExt}; +use log::warn; use slab::Slab; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; +use crate::admin::NativeEvent; use crate::client::{Client, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; -use crate::consumer::base_consumer::BaseConsumer; +use crate::consumer::base_consumer::{BaseConsumer, PartitionQueue}; use crate::consumer::{ CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, RebalanceProtocol, @@ -38,7 +40,7 @@ unsafe extern "C" fn native_message_queue_nonempty_cb(_: *mut RDKafka, opaque_pt wakers.wake_all(); } -unsafe fn enable_nonempty_callback(queue: Arc, wakers: &Arc) { +unsafe fn enable_nonempty_callback(queue: &NativeQueue, wakers: &Arc) { rdsys::rd_kafka_queue_cb_event_enable( queue.ptr(), Some(native_message_queue_nonempty_cb), @@ -89,24 +91,47 @@ impl WakerSlab { /// A stream of messages from a [`StreamConsumer`]. /// /// See the documentation of [`StreamConsumer::stream`] for details. -pub struct MessageStream<'a, C: ConsumerContext> { +pub struct MessageStream<'a> { wakers: &'a WakerSlab, - base: Arc>, + queue: &'a NativeQueue, slot: usize, } -impl<'a, C: ConsumerContext> MessageStream<'a, C> { - fn new(wakers: &'a WakerSlab, base: Arc>) -> MessageStream<'a, C> { +impl<'a> MessageStream<'a> { + fn new(wakers: &'a WakerSlab, queue: &'a NativeQueue) -> MessageStream<'a> { let slot = wakers.register(); - MessageStream { wakers, base, slot } + MessageStream { wakers, queue, slot } } fn poll(&self) -> Option>> { - self.base.poll(Duration::ZERO) + let timeout: Timeout = Duration::ZERO.into(); + let event = unsafe { NativeEvent::from_ptr(self.queue.poll(timeout)) }; + if let Some(ev) = event { + let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; + match evtype { + rdsys::RD_KAFKA_EVENT_FETCH => { + unsafe { + NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(ev.ptr()) as *mut _) + .map(|ptr| BorrowedMessage::from_client(ptr, ev, self.queue)) + } + } + _ => { + let buf = unsafe { + let evname = rdsys::rd_kafka_event_name(ev.ptr()); + CStr::from_ptr(evname).to_bytes() + }; + let evname = String::from_utf8(buf.to_vec()).unwrap(); + warn!("Ignored event '{}' on consumer poll", evname); + None + } + } + } else { + None + } } } -impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> { +impl<'a> Stream for MessageStream<'a> { type Item = KafkaResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -133,7 +158,7 @@ impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> { } } -impl<'a, C: ConsumerContext> Drop for MessageStream<'a, C> { +impl<'a> Drop for MessageStream<'a> { fn drop(&mut self) { self.wakers.unregister(self.slot); } @@ -159,6 +184,7 @@ where C: ConsumerContext, { base: Arc>, + queue: NativeQueue, wakers: Arc, _shutdown_trigger: oneshot::Sender<()>, _runtime: PhantomData, @@ -190,7 +216,9 @@ where }; let base = Arc::new(BaseConsumer::new(config, native_config, context)?); - let native_ptr = base.client().native_ptr() as usize; + let client = base.client(); + let native_ptr = client.native_ptr() as usize; + let queue = client.main_queue(); let wakers = Arc::new(WakerSlab::new()); unsafe { enable_nonempty_callback(base.get_queue(), &wakers) } @@ -225,6 +253,7 @@ where Ok(StreamConsumer { base, wakers, + queue, _shutdown_trigger: shutdown_trigger, _runtime: PhantomData, }) @@ -247,8 +276,8 @@ where /// /// If you want multiple independent views of a Kafka topic, create multiple /// consumers, not multiple message streams. - pub fn stream(&self) -> MessageStream<'_, C> { - MessageStream::new(&self.wakers, self.base.clone()) + pub fn stream(&self) -> MessageStream<'_> { + MessageStream::new(&self.wakers, &self.queue) } /// Receives the next message from the stream. @@ -317,29 +346,13 @@ where topic: &str, partition: i32, ) -> Option> { - let topic = match CString::new(topic) { - Ok(topic) => topic, - Err(_) => return None, - }; - let queue = unsafe { - NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition( - self.base.client().native_ptr(), - topic.as_ptr(), - partition, - )) - }; - queue.map(|queue| { - let wakers = Arc::new(WakerSlab::new()); - unsafe { - rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()); - enable_nonempty_callback(Arc::new(queue), &wakers); - } - StreamPartitionQueue { - queue, - wakers, - _consumer: self.clone(), - } - }) + self.base + .split_partition_queue(topic, partition) + .map(|queue| { + let wakers = Arc::new(WakerSlab::new()); + StreamPartitionQueue { queue, wakers, _consumer: self.clone() } + }) + } } @@ -534,7 +547,7 @@ pub struct StreamPartitionQueue where C: ConsumerContext, { - queue: NativeQueue, + queue: PartitionQueue, wakers: Arc, _consumer: Arc>, } @@ -555,8 +568,8 @@ where /// /// If you want multiple independent views of a Kafka partition, create /// multiple consumers, not multiple partition streams. - pub fn stream(&self) -> MessageStream<'_, C> { - MessageStream::new(&self.wakers, &self.queue) + pub fn stream(&self) -> MessageStream<'_> { + MessageStream::new(&self.wakers, &self.queue.queue) } /// Receives the next message from the stream. @@ -595,6 +608,6 @@ where C: ConsumerContext, { fn drop(&mut self) { - unsafe { disable_nonempty_callback(&self.queue) } + unsafe { disable_nonempty_callback(&self.queue.queue) } } } From 941cd32980e546705a2876dc4c06311a0f70cd27 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Tue, 24 Oct 2023 18:08:29 -0300 Subject: [PATCH 06/24] Use Arc for events in BorrowMessage This is required as multiple write acks are tied to a single event. --- src/consumer/base_consumer.rs | 19 +++++----- src/consumer/stream_consumer.rs | 23 ++++++++----- src/message.rs | 61 +++++++++++++++++---------------- src/producer/base_producer.rs | 6 ++-- 4 files changed, 57 insertions(+), 52 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 1f33644d5..ae6993bbf 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -113,9 +113,7 @@ where /// /// The returned message lives in the memory of the consumer and cannot outlive it. pub fn poll>(&self, timeout: T) -> Option>> { - let event = self - .client() - .poll_event(self.get_queue(), timeout.into())?; + let event = self.client().poll_event(self.get_queue(), timeout.into())?; let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; match evtype { rdsys::RD_KAFKA_EVENT_FETCH => self.handle_fetch_event(event), @@ -145,7 +143,7 @@ where ) -> Option>> { unsafe { NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _) - .map(|ptr| BorrowedMessage::from_client(ptr, event, self.client())) + .map(|ptr| BorrowedMessage::from_client(ptr, Arc::new(event), self.client())) } } @@ -741,17 +739,16 @@ where /// associated consumer regularly, even if no messages are expected, to /// serve events. pub fn poll>(&self, timeout: T) -> Option>> { - let event = self.consumer + let event = self + .consumer .client() .poll_event(&self.queue, timeout.into())?; let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; match evtype { - rdsys::RD_KAFKA_EVENT_FETCH => { - unsafe { - NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _) - .map(|ptr| BorrowedMessage::from_client(ptr, event, self)) - } - } + rdsys::RD_KAFKA_EVENT_FETCH => unsafe { + NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _) + .map(|ptr| BorrowedMessage::from_client(ptr, Arc::new(event), self)) + }, _ => { let buf = unsafe { let evname = rdsys::rd_kafka_event_name(event.ptr()); diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 09ca27694..28d672b22 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -100,7 +100,11 @@ pub struct MessageStream<'a> { impl<'a> MessageStream<'a> { fn new(wakers: &'a WakerSlab, queue: &'a NativeQueue) -> MessageStream<'a> { let slot = wakers.register(); - MessageStream { wakers, queue, slot } + MessageStream { + wakers, + queue, + slot, + } } fn poll(&self) -> Option>> { @@ -109,12 +113,10 @@ impl<'a> MessageStream<'a> { if let Some(ev) = event { let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; match evtype { - rdsys::RD_KAFKA_EVENT_FETCH => { - unsafe { - NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(ev.ptr()) as *mut _) - .map(|ptr| BorrowedMessage::from_client(ptr, ev, self.queue)) - } - } + rdsys::RD_KAFKA_EVENT_FETCH => unsafe { + NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(ev.ptr()) as *mut _) + .map(|ptr| BorrowedMessage::from_client(ptr, Arc::new(ev), self.queue)) + }, _ => { let buf = unsafe { let evname = rdsys::rd_kafka_event_name(ev.ptr()); @@ -350,9 +352,12 @@ where .split_partition_queue(topic, partition) .map(|queue| { let wakers = Arc::new(WakerSlab::new()); - StreamPartitionQueue { queue, wakers, _consumer: self.clone() } + StreamPartitionQueue { + queue, + wakers, + _consumer: self.clone(), + } }) - } } diff --git a/src/message.rs b/src/message.rs index e83971382..1915d22b6 100644 --- a/src/message.rs +++ b/src/message.rs @@ -6,6 +6,7 @@ use std::marker::PhantomData; use std::os::raw::c_void; use std::ptr; use std::str; +use std::sync::Arc; use std::time::SystemTime; use rdkafka_sys as rdsys; @@ -307,7 +308,7 @@ impl Headers for BorrowedHeaders { /// [`detach`](BorrowedMessage::detach) method. pub struct BorrowedMessage<'a> { ptr: NativePtr, - _event: NativeEvent, + _event: Arc, _owner: PhantomData<&'a u8>, } @@ -320,7 +321,12 @@ unsafe impl KafkaDrop for RDKafkaMessage { impl<'a> fmt::Debug for BorrowedMessage<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Message {{ ptr: {:?} }}", self.ptr()) + write!( + f, + "Message {{ ptr: {:?}, event_ptr: {:?} }}", + self.ptr(), + self._event.ptr() + ) } } @@ -333,7 +339,7 @@ impl<'a> BorrowedMessage<'a> { /// is freed. pub(crate) unsafe fn from_client( ptr: NativePtr, - event: NativeEvent, + event: Arc, _client: &'a C, ) -> KafkaResult> { if ptr.err.is_error() { @@ -353,32 +359,29 @@ impl<'a> BorrowedMessage<'a> { } } - /// Creates a new `BorrowedMessage` that wraps the native Kafka message - /// pointer returned via the delivery report event. The lifetime of - /// the message will be bound to the lifetime of the event passed as - /// parameter. The message will not be freed in any circumstance. - pub(crate) unsafe fn from_dr_event( - ptr: *mut RDKafkaMessage, - //ptr: NativePtr, - event: *mut RDKafkaEvent, - _client: &'a C, - ) -> DeliveryResult<'a> { - let borrowed_message = BorrowedMessage { - ptr: NativePtr::from_ptr(ptr).unwrap(), - //ptr, - _event: NativePtr::from_ptr(event).unwrap(), - // TODO(sam): what does it mean this when the event holds the ownership? - _owner: PhantomData, - }; - if (*ptr).err.is_error() { - Err(( - KafkaError::MessageProduction((*ptr).err.into()), - borrowed_message, - )) - } else { - Ok(borrowed_message) - } - } + /// Creates a new `BorrowedMessage` that wraps the native Kafka message + /// pointer returned via the delivery report event. The lifetime of + /// the message will be bound to the lifetime of the event passed as + /// parameter. The message will not be freed in any circumstance. + pub(crate) unsafe fn from_dr_event( + ptr: *mut RDKafkaMessage, + event: Arc, + _client: &'a C, + ) -> DeliveryResult<'a> { + let borrowed_message = BorrowedMessage { + ptr: NativePtr::from_ptr(ptr).unwrap(), + _event: event, + _owner: PhantomData, + }; + if (*ptr).err.is_error() { + Err(( + KafkaError::MessageProduction((*ptr).err.into()), + borrowed_message, + )) + } else { + Ok(borrowed_message) + } + } /// Returns a pointer to the [`RDKafkaMessage`]. pub fn ptr(&self) -> *mut RDKafkaMessage { diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index d1de5052c..c24089185 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -391,10 +391,10 @@ where Vec::from_raw_parts(messages.as_mut_ptr(), msgs_cnt, max_messages) }; + let ev = Arc::new(event); for msg in messages { - let delivery_result = unsafe { - BorrowedMessage::from_dr_event(msg as *mut _, event.ptr(), self.client()) - }; + let delivery_result = + unsafe { BorrowedMessage::from_dr_event(msg as *mut _, ev.clone(), self.client()) }; let delivery_opaque = unsafe { C::DeliveryOpaque::from_ptr((*msg)._private) }; self.context().delivery(&delivery_result, delivery_opaque); } From 438af770ea6a9ec76b56a76826ff3691dc3bb425 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Tue, 24 Oct 2023 18:47:10 -0300 Subject: [PATCH 07/24] Adapt producer Flush to the Event API semantics --- src/producer/base_producer.rs | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index c24089185..b23bb79af 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -338,6 +338,7 @@ where client: Client, queue: NativeQueue, _partitioner: PhantomData, + min_poll_interval: Timeout, } impl BaseProducer @@ -352,6 +353,7 @@ where client, queue, _partitioner: PhantomData, + min_poll_interval: Timeout::After(Duration::from_millis(100)), } } @@ -489,12 +491,28 @@ where &self.client } + // As this library uses the rdkafka Event API, flush will not call rd_kafka_poll() but instead wait for + // the librdkafka-handled message count to reach zero. Runs until value reaches zero or timeout. fn flush>(&self, timeout: T) -> KafkaResult<()> { - let ret = unsafe { rdsys::rd_kafka_flush(self.native_ptr(), timeout.into().as_millis()) }; - if ret.is_error() { - Err(KafkaError::Flush(ret.into())) - } else { - Ok(()) + let mut timeout = timeout.into(); + loop { + let op_timeout = std::cmp::min(timeout, self.min_poll_interval); + if self.in_flight_count() > 0 { + unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) }; + self.poll(op_timeout); + } else { + return Ok(()); + } + + if op_timeout >= timeout { + let ret = unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) }; + if ret.is_error() { + return Err(KafkaError::Flush(ret.into())); + } else { + return Ok(()); + } + } + timeout -= op_timeout; } } From 0a36b3d57c5c675e7787e3575908720b9a859614 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Wed, 25 Oct 2023 13:00:35 -0300 Subject: [PATCH 08/24] Explain why the TPL need to be manuallyDrop on the consumer events handler --- src/consumer/base_consumer.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index ae6993bbf..2c3813301 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -156,6 +156,8 @@ where let native_tpl = rdsys::rd_kafka_event_topic_partition_list(event.ptr()); TopicPartitionList::from_ptr(native_tpl) }; + // The TPL is owned by the Event and will be destroyed when the event is destroyed. + // Dropping it here will lead to double free. let mut tpl = ManuallyDrop::new(tpl); self.context() .rebalance(self.client.native_client(), err, &mut tpl); @@ -185,6 +187,8 @@ where let tpl = TopicPartitionList::new(); self.context().commit_callback(commit_error, &tpl); } else { + // The TPL is owned by the Event and will be destroyed when the event is destroyed. + // Dropping it here will lead to double free. let tpl = ManuallyDrop::new(unsafe { TopicPartitionList::from_ptr(offsets) }); self.context().commit_callback(commit_error, &tpl); } From 0b885a50a0e0e5fc80a2d678637ffd4acea23cce Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Wed, 25 Oct 2023 13:31:11 -0300 Subject: [PATCH 09/24] Add comment for no-op method used on RDKafkaMessage impl of the KafkaDrop trait --- src/message.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/message.rs b/src/message.rs index 1915d22b6..2ebda9071 100644 --- a/src/message.rs +++ b/src/message.rs @@ -312,6 +312,7 @@ pub struct BorrowedMessage<'a> { _owner: PhantomData<&'a u8>, } +// When using the Event API, messages must not be freed with rd_kafka_message_destroy unsafe extern "C" fn no_op(_: *mut RDKafkaMessage) {} unsafe impl KafkaDrop for RDKafkaMessage { From 32b0d249a531e99674b9b29a7c6f7202f107e301 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Wed, 25 Oct 2023 13:35:36 -0300 Subject: [PATCH 10/24] Update doc comment for BorrowedMessage::from_dr_event --- src/message.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/message.rs b/src/message.rs index 2ebda9071..696a50100 100644 --- a/src/message.rs +++ b/src/message.rs @@ -362,8 +362,8 @@ impl<'a> BorrowedMessage<'a> { /// Creates a new `BorrowedMessage` that wraps the native Kafka message /// pointer returned via the delivery report event. The lifetime of - /// the message will be bound to the lifetime of the event passed as - /// parameter. The message will not be freed in any circumstance. + /// the message will be bound to the lifetime of the client passed as + /// parameter. pub(crate) unsafe fn from_dr_event( ptr: *mut RDKafkaMessage, event: Arc, From 64d2e3200fb6890c2b8e45810b6a336ba2743abb Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Wed, 25 Oct 2023 14:44:50 -0300 Subject: [PATCH 11/24] Replace poll with flush on baseProducer drop One poll call might not be enough to serve the delivery report callbacks of the purged messages. The current flush impl will call poll multiple times until the queue is empty or timeout. --- src/producer/base_producer.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index b23bb79af..73fafc94d 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -611,8 +611,13 @@ where { fn drop(&mut self) { self.purge(PurgeConfig::default().queue().inflight()); - // Still have to poll after purging to get the results that have been made ready by the purge - self.poll(Timeout::After(Duration::ZERO)); + // Still have to flush after purging to get the results that have been made ready by the purge + if let Err(err) = self.flush(Timeout::After(Duration::from_millis(500))) { + warn!( + "Failed to flush outstanding messages while dropping the producer: {:?}", + err + ); + } } } From f3173d57fac6be6a2c8c9aecaaa5f303082232ef Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Thu, 26 Oct 2023 13:21:23 -0300 Subject: [PATCH 12/24] StreamConsumer Stream impl fixes for the event API --- src/client.rs | 6 +-- src/consumer/base_consumer.rs | 31 ++++--------- src/consumer/stream_consumer.rs | 81 ++++++++++++++++----------------- 3 files changed, 51 insertions(+), 67 deletions(-) diff --git a/src/client.rs b/src/client.rs index e71482e99..7475f287c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -278,11 +278,7 @@ impl Client { &self.context } - pub(crate) fn poll_event( - &self, - queue: &NativeQueue, - timeout: Timeout, - ) -> Option { + pub(crate) fn poll_event(&self, queue: &NativeQueue, timeout: Timeout) -> Option { let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) }; if let Some(ev) = event { let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 2c3813301..4f7d0e399 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -113,7 +113,15 @@ where /// /// The returned message lives in the memory of the consumer and cannot outlive it. pub fn poll>(&self, timeout: T) -> Option>> { - let event = self.client().poll_event(self.get_queue(), timeout.into())?; + self.poll_queue(self.get_queue(), timeout) + } + + pub(crate) fn poll_queue>( + &self, + queue: &NativeQueue, + timeout: T, + ) -> Option>> { + let event = self.client().poll_event(queue, timeout.into())?; let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; match evtype { rdsys::RD_KAFKA_EVENT_FETCH => self.handle_fetch_event(event), @@ -743,26 +751,7 @@ where /// associated consumer regularly, even if no messages are expected, to /// serve events. pub fn poll>(&self, timeout: T) -> Option>> { - let event = self - .consumer - .client() - .poll_event(&self.queue, timeout.into())?; - let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - match evtype { - rdsys::RD_KAFKA_EVENT_FETCH => unsafe { - NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _) - .map(|ptr| BorrowedMessage::from_client(ptr, Arc::new(event), self)) - }, - _ => { - let buf = unsafe { - let evname = rdsys::rd_kafka_event_name(event.ptr()); - CStr::from_ptr(evname).to_bytes() - }; - let evname = String::from_utf8(buf.to_vec()).unwrap(); - warn!("Ignored event '{}' on consumer poll", evname); - None - } - } + self.consumer.poll_queue(&self.queue, timeout) } /// Sets a callback that will be invoked whenever the queue becomes diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 28d672b22..5a7f60552 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -1,6 +1,5 @@ //! High-level consumers with a [`Stream`](futures_util::Stream) interface. -use std::ffi::CStr; use std::marker::PhantomData; use std::os::raw::c_void; use std::pin::Pin; @@ -14,13 +13,11 @@ use futures_channel::oneshot; use futures_util::future::{self, Either, FutureExt}; use futures_util::pin_mut; use futures_util::stream::{Stream, StreamExt}; -use log::warn; use slab::Slab; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -use crate::admin::NativeEvent; use crate::client::{Client, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::consumer::base_consumer::{BaseConsumer, PartitionQueue}; @@ -33,7 +30,7 @@ use crate::groups::GroupList; use crate::message::BorrowedMessage; use crate::metadata::Metadata; use crate::topic_partition_list::{Offset, TopicPartitionList}; -use crate::util::{AsyncRuntime, DefaultRuntime, NativePtr, Timeout}; +use crate::util::{AsyncRuntime, DefaultRuntime, Timeout}; unsafe extern "C" fn native_message_queue_nonempty_cb(_: *mut RDKafka, opaque_ptr: *mut c_void) { let wakers = &*(opaque_ptr as *const WakerSlab); @@ -91,49 +88,50 @@ impl WakerSlab { /// A stream of messages from a [`StreamConsumer`]. /// /// See the documentation of [`StreamConsumer::stream`] for details. -pub struct MessageStream<'a> { +pub struct MessageStream<'a, C: ConsumerContext> { wakers: &'a WakerSlab, - queue: &'a NativeQueue, + consumer: &'a BaseConsumer, + partition_queue: Option<&'a NativeQueue>, slot: usize, } -impl<'a> MessageStream<'a> { - fn new(wakers: &'a WakerSlab, queue: &'a NativeQueue) -> MessageStream<'a> { +impl<'a, C: ConsumerContext> MessageStream<'a, C> { + fn new(wakers: &'a WakerSlab, consumer: &'a BaseConsumer) -> MessageStream<'a, C> { + Self::new_with_optional_partition_queue(wakers, consumer, None) + } + + fn new_with_partition_queue( + wakers: &'a WakerSlab, + consumer: &'a BaseConsumer, + partition_queue: &'a NativeQueue, + ) -> MessageStream<'a, C> { + Self::new_with_optional_partition_queue(wakers, consumer, Some(partition_queue)) + } + + fn new_with_optional_partition_queue( + wakers: &'a WakerSlab, + consumer: &'a BaseConsumer, + partition_queue: Option<&'a NativeQueue>, + ) -> MessageStream<'a, C> { let slot = wakers.register(); MessageStream { wakers, - queue, + consumer, + partition_queue, slot, } } fn poll(&self) -> Option>> { - let timeout: Timeout = Duration::ZERO.into(); - let event = unsafe { NativeEvent::from_ptr(self.queue.poll(timeout)) }; - if let Some(ev) = event { - let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; - match evtype { - rdsys::RD_KAFKA_EVENT_FETCH => unsafe { - NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(ev.ptr()) as *mut _) - .map(|ptr| BorrowedMessage::from_client(ptr, Arc::new(ev), self.queue)) - }, - _ => { - let buf = unsafe { - let evname = rdsys::rd_kafka_event_name(ev.ptr()); - CStr::from_ptr(evname).to_bytes() - }; - let evname = String::from_utf8(buf.to_vec()).unwrap(); - warn!("Ignored event '{}' on consumer poll", evname); - None - } - } + if let Some(queue) = self.partition_queue { + self.consumer.poll_queue(queue, Duration::ZERO) } else { - None + self.consumer.poll(Duration::ZERO) } } } -impl<'a> Stream for MessageStream<'a> { +impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> { type Item = KafkaResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -160,7 +158,7 @@ impl<'a> Stream for MessageStream<'a> { } } -impl<'a> Drop for MessageStream<'a> { +impl<'a, C: ConsumerContext> Drop for MessageStream<'a, C> { fn drop(&mut self) { self.wakers.unregister(self.slot); } @@ -186,7 +184,6 @@ where C: ConsumerContext, { base: Arc>, - queue: NativeQueue, wakers: Arc, _shutdown_trigger: oneshot::Sender<()>, _runtime: PhantomData, @@ -218,9 +215,7 @@ where }; let base = Arc::new(BaseConsumer::new(config, native_config, context)?); - let client = base.client(); - let native_ptr = client.native_ptr() as usize; - let queue = client.main_queue(); + let native_ptr = base.client().native_ptr() as usize; let wakers = Arc::new(WakerSlab::new()); unsafe { enable_nonempty_callback(base.get_queue(), &wakers) } @@ -255,7 +250,6 @@ where Ok(StreamConsumer { base, wakers, - queue, _shutdown_trigger: shutdown_trigger, _runtime: PhantomData, }) @@ -278,8 +272,8 @@ where /// /// If you want multiple independent views of a Kafka topic, create multiple /// consumers, not multiple message streams. - pub fn stream(&self) -> MessageStream<'_> { - MessageStream::new(&self.wakers, &self.queue) + pub fn stream(&self) -> MessageStream<'_, C> { + MessageStream::new(&self.wakers, &self.base) } /// Receives the next message from the stream. @@ -322,7 +316,7 @@ where /// `StreamConsumer::recv`. /// /// You must periodically await `StreamConsumer::recv`, even if no messages - /// are expected, to serve callbacks. Consider using a background task like: + /// are expected, to serve events. Consider using a background task like: /// /// ``` /// # use rdkafka::consumer::StreamConsumer; @@ -352,6 +346,7 @@ where .split_partition_queue(topic, partition) .map(|queue| { let wakers = Arc::new(WakerSlab::new()); + unsafe { enable_nonempty_callback(&queue.queue, &wakers) }; StreamPartitionQueue { queue, wakers, @@ -573,8 +568,12 @@ where /// /// If you want multiple independent views of a Kafka partition, create /// multiple consumers, not multiple partition streams. - pub fn stream(&self) -> MessageStream<'_> { - MessageStream::new(&self.wakers, &self.queue.queue) + pub fn stream(&self) -> MessageStream<'_, C> { + MessageStream::new_with_partition_queue( + &self.wakers, + &self._consumer.base, + &self.queue.queue, + ) } /// Receives the next message from the stream. From 6c8c5f0dc50f347dffc606828b00ece73589627f Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Thu, 26 Oct 2023 15:52:17 -0300 Subject: [PATCH 13/24] Consumer needs to read from earliest otherwise consumer will never read anything --- tests/test_metadata.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs index 3b2667a9c..e62bee556 100644 --- a/tests/test_metadata.rs +++ b/tests/test_metadata.rs @@ -22,6 +22,7 @@ fn create_consumer(group_id: &str) -> StreamConsumer { .set("session.timeout.ms", "6000") .set("api.version.request", "true") .set("debug", "all") + .set("auto.offset.reset", "earliest") .create() .expect("Failed to create StreamConsumer") } From 54893abffcfe9eb9c471a3150a55d545f3321f27 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Thu, 26 Oct 2023 18:10:32 -0300 Subject: [PATCH 14/24] Poll should not return None if timeout has not been reached If timeout::Never is used, poll should eventually return a Message or Error rather than None when handling other events like stats, rebalances, etc. --- src/consumer/base_consumer.rs | 51 ++++++++++++++++++++++------------- src/consumer/mod.rs | 4 +-- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 4f7d0e399..870f981fc 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -121,27 +121,40 @@ where queue: &NativeQueue, timeout: T, ) -> Option>> { - let event = self.client().poll_event(queue, timeout.into())?; - let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - match evtype { - rdsys::RD_KAFKA_EVENT_FETCH => self.handle_fetch_event(event), - rdsys::RD_KAFKA_EVENT_REBALANCE => { - self.handle_rebalance_event(event); - None - } - rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => { - self.handle_offset_commit_event(event); - None + let mut timeout = timeout.into(); + let min_poll_interval = self.context().main_queue_min_poll_interval(); + loop { + let op_timeout = std::cmp::min(timeout, min_poll_interval); + let maybe_event = self.client().poll_event(queue, op_timeout); + if let Some(event) = maybe_event { + let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + match evtype { + rdsys::RD_KAFKA_EVENT_FETCH => { + if let Some(result) = self.handle_fetch_event(event) { + return Some(result); + } + } + rdsys::RD_KAFKA_EVENT_REBALANCE => { + self.handle_rebalance_event(event); + } + rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => { + self.handle_offset_commit_event(event); + } + _ => { + let buf = unsafe { + let evname = rdsys::rd_kafka_event_name(event.ptr()); + CStr::from_ptr(evname).to_bytes() + }; + let evname = String::from_utf8(buf.to_vec()).unwrap(); + warn!("Ignored event '{}' on consumer poll", evname); + } + } } - _ => { - let buf = unsafe { - let evname = rdsys::rd_kafka_event_name(event.ptr()); - CStr::from_ptr(evname).to_bytes() - }; - let evname = String::from_utf8(buf.to_vec()).unwrap(); - warn!("Ignored event '{}' on consumer poll", evname); - None + + if op_timeout >= timeout { + return None; } + timeout -= op_timeout; } } diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 65bbd215a..fb3ff0460 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -113,12 +113,12 @@ pub trait ConsumerContext: ClientContext { fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {} /// Returns the minimum interval at which to poll the main queue, which - /// services the logging, stats, and error callbacks. + /// services the logging, stats, and error events. /// /// The main queue is polled once whenever [`BaseConsumer::poll`] is called. /// If `poll` is called with a timeout that is larger than this interval, /// then the main queue will be polled at that interval while the consumer - /// queue is blocked. + /// queue is blocked. This allows serving events while there are no messages. /// /// For example, if the main queue's minimum poll interval is 200ms and /// `poll` is called with a timeout of 1s, then `poll` may block for up to From c7f83a8d0d94a88f820abf863e9da6fea24ca671 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Thu, 26 Oct 2023 18:56:16 -0300 Subject: [PATCH 15/24] Cargo clippy --- src/consumer/base_consumer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 870f981fc..6eb14b8fc 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -89,9 +89,9 @@ where // need to listen to the consumer queue to observe events like // rebalancings and stats. unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) }; - let queue = client.consumer_queue().ok_or(KafkaError::ClientCreation( - "rdkafka consumer queue not available".to_string(), - ))?; + let queue = client.consumer_queue().ok_or_else(|| { + KafkaError::ClientCreation("rdkafka consumer queue not available".to_string()) + })?; Ok(BaseConsumer { client, queue, From 74ff52a176abaef17b08206e6c1a446dfadd1ada Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Fri, 27 Oct 2023 13:27:12 -0300 Subject: [PATCH 16/24] Propagate errors for the consumer rdkafka reports consumer errors via RD_KAFKA_EVENT_ERROR but producer errors gets embedded on the ack returned via RD_KAFKA_EVENT_DR. Hence we need to return this event for the consumer case in order to return the error to the user. --- src/client.rs | 8 +++++++- src/consumer/base_consumer.rs | 23 +++++++++++++++++++++++ src/message.rs | 2 +- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 7475f287c..1b9f6bd1c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -285,7 +285,13 @@ impl Client { match evtype { rdsys::RD_KAFKA_EVENT_LOG => self.handle_log_event(ev.ptr()), rdsys::RD_KAFKA_EVENT_STATS => self.handle_stats_event(ev.ptr()), - rdsys::RD_KAFKA_EVENT_ERROR => self.handle_error_event(ev.ptr()), + rdsys::RD_KAFKA_EVENT_ERROR => { + // rdkafka reports consumer errors via RD_KAFKA_EVENT_ERROR but producer errors gets + // embedded on the ack returned via RD_KAFKA_EVENT_DR. Hence we need to return this event + // for the consumer case in order to return the error to the user. + self.handle_error_event(ev.ptr()); + return Some(ev); + } rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH => { if C::ENABLE_REFRESH_OAUTH_TOKEN { self.handle_oauth_refresh_event(ev.ptr()); diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 6eb14b8fc..4379500c1 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -134,6 +134,11 @@ where return Some(result); } } + rdsys::RD_KAFKA_EVENT_ERROR => { + if let Some(err) = self.handle_error_event(event) { + return Some(Err(err)); + } + } rdsys::RD_KAFKA_EVENT_REBALANCE => { self.handle_rebalance_event(event); } @@ -215,6 +220,24 @@ where } } + fn handle_error_event(&self, event: NativePtr) -> Option { + let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) }; + if rdkafka_err.is_error() { + let err = match rdkafka_err { + rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => { + let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) }; + let partition = unsafe { (*tp_ptr).partition }; + unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) }; + KafkaError::PartitionEOF(partition) + } + e => KafkaError::MessageConsumption(e.into()), + }; + Some(err) + } else { + None + } + } + /// Returns an iterator over the available messages. /// /// It repeatedly calls [`poll`](#method.poll) with no timeout. diff --git a/src/message.rs b/src/message.rs index 696a50100..76bac9c39 100644 --- a/src/message.rs +++ b/src/message.rs @@ -346,7 +346,7 @@ impl<'a> BorrowedMessage<'a> { if ptr.err.is_error() { let err = match ptr.err { rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => { - KafkaError::PartitionEOF((*ptr).partition) + KafkaError::PartitionEOF(ptr.partition) } e => KafkaError::MessageConsumption(e.into()), }; From bb2aee0d45b36b1c0e5ab7aca2e108951966355f Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Fri, 27 Oct 2023 13:37:06 -0300 Subject: [PATCH 17/24] Adapt commit_transaction to the event api --- src/producer/base_producer.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 73fafc94d..886d5bdee 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -577,10 +577,17 @@ where } fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + // rd_kafka_commit_transaction will call flush but the user must call poll in order to + // server the event queue. In order to avoid blocking here forever on the base producer, + // we call Flush that will flush the outstanding messages and serve the event queue. + // https://github.com/confluentinc/librdkafka/blob/95a542c87c61d2c45b445f91c73dd5442eb04f3c/src/rdkafka.h#L10231 + // The recommended timeout here is -1 (never, i.e, infinite). + let timeout = timeout.into(); + self.flush(timeout)?; let ret = unsafe { RDKafkaError::from_ptr(rdsys::rd_kafka_commit_transaction( self.native_ptr(), - timeout.into().as_millis(), + timeout.as_millis(), )) }; if ret.is_error() { From 3b98f95686cd510b40b170e0f17a68c89622b7fa Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Mon, 30 Oct 2023 14:48:50 -0300 Subject: [PATCH 18/24] Adapt consumer close to the event api --- src/consumer/base_consumer.rs | 46 ++++++++--------------------------- 1 file changed, 10 insertions(+), 36 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 4379500c1..fb2188774 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -7,7 +7,7 @@ use std::ptr; use std::sync::Arc; use std::time::Duration; -use log::warn; +use log::{error, warn}; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; @@ -37,7 +37,6 @@ where { client: Client, queue: NativeQueue, - static_member: bool, } impl FromClientConfig for BaseConsumer { @@ -62,12 +61,6 @@ where native_config: NativeClientConfig, context: C, ) -> KafkaResult> { - let mut static_member = false; - if let Some(group_instance_id) = config.get("group.instance.id") { - if !group_instance_id.is_empty() { - static_member = true; - } - } unsafe { rdsys::rd_kafka_conf_set_events( native_config.ptr(), @@ -92,11 +85,7 @@ where let queue = client.consumer_queue().ok_or_else(|| { KafkaError::ClientCreation("rdkafka consumer queue not available".to_string()) })?; - Ok(BaseConsumer { - client, - queue, - static_member, - }) + Ok(BaseConsumer { client, queue }) } /// Polls the consumer for new messages. @@ -692,31 +681,16 @@ where { fn drop(&mut self) { trace!("Destroying consumer: {:?}", self.client.native_ptr()); - // If this consumer is configured for static membership, do not explicitly unsubscribe from - // the group. Note that static members will *not* receive a final revoke event when they - // shutdown. - if !self.static_member { - // We use the Event API rather than the Callback API. - // As we don't register a rebalance_cb, rd_kafka_consumer_close() - // will shortcut the rebalance_cb and do a forced unassign. - // This is undesired as the application might need the final - // revoke events before shutting down. Hence, we trigger - // an Unsubscribe() first, wait for that to propagate, and then - // close the consumer. - unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) }; - - // Poll for rebalance events - loop { - self.poll(Duration::from_secs(5)); - let qlen = unsafe { rdsys::rd_kafka_queue_length(self.queue.ptr()) }; - if qlen == 0 { - break; - } - } + let err = unsafe { + rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr()) + }; + if !err.is_null() { + error!("Failed to close the consumer queue on drop"); } - // TODO(sam): do we need to destroy the queue before calling close? - unsafe { rdsys::rd_kafka_consumer_close(self.client.native_ptr()) }; + while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 { + self.poll(Duration::from_millis(100)); + } trace!("Consumer destroyed: {:?}", self.client.native_ptr()); } } From 2af36710b94d1aaabb920014832a876cb6d2ff51 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Mon, 30 Oct 2023 14:58:07 -0300 Subject: [PATCH 19/24] Allow creating a consumer without group.id Currently if a group.id is not specified we allow the use of the consumer for fetching metadata and watermarks. Keeping this behaviour. --- src/consumer/base_consumer.rs | 47 +++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index fb2188774..01f1af462 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -37,6 +37,7 @@ where { client: Client, queue: NativeQueue, + group_id: Option, } impl FromClientConfig for BaseConsumer { @@ -78,14 +79,26 @@ where context, )?; - // Redirect rdkafka's main queue to the consumer queue so that we only - // need to listen to the consumer queue to observe events like - // rebalancings and stats. - unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) }; - let queue = client.consumer_queue().ok_or_else(|| { - KafkaError::ClientCreation("rdkafka consumer queue not available".to_string()) - })?; - Ok(BaseConsumer { client, queue }) + let group_id = config.get("group.id").map(|s| s.to_string()); + // If a group.id is not specified, we won't redirect the main queue to the consumer queue, + // allowing continued use of the consumer for fetching metadata and watermarks without the + // need to specify a group.id + let queue = if group_id.is_some() { + // Redirect rdkafka's main queue to the consumer queue so that we only need to listen + // to the consumer queue to observe events like rebalancings and stats. + unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) }; + client.consumer_queue().ok_or_else(|| { + KafkaError::ClientCreation("rdkafka consumer queue not available".to_string()) + })? + } else { + client.main_queue() + }; + + Ok(BaseConsumer { + client, + queue, + group_id, + }) } /// Polls the consumer for new messages. @@ -681,15 +694,17 @@ where { fn drop(&mut self) { trace!("Destroying consumer: {:?}", self.client.native_ptr()); - let err = unsafe { - rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr()) - }; - if !err.is_null() { - error!("Failed to close the consumer queue on drop"); - } + if self.group_id.is_some() { + let err = unsafe { + rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr()) + }; + if !err.is_null() { + error!("Failed to close the consumer queue on drop"); + } - while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 { - self.poll(Duration::from_millis(100)); + while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 { + self.poll(Duration::from_millis(100)); + } } trace!("Consumer destroyed: {:?}", self.client.native_ptr()); } From 4fb2266bb9c7cbf476bf6e7f6f870cdba54cc150 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Tue, 31 Oct 2023 16:36:04 -0300 Subject: [PATCH 20/24] Do not panic on transient errors on test_consume_partition_order --- tests/test_high_consumers.rs | 33 +++++++++++++++++++++++++++++++-- tests/test_low_consumers.rs | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/tests/test_high_consumers.rs b/tests/test_high_consumers.rs index d139127b0..97ca4f5a0 100644 --- a/tests/test_high_consumers.rs +++ b/tests/test_high_consumers.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use futures::future::{self, FutureExt}; use futures::stream::StreamExt; use maplit::hashmap; +use rdkafka_sys::RDKafkaErrorCode; use tokio::time::{self, Duration}; use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, StreamConsumer}; @@ -546,13 +547,41 @@ async fn test_consume_partition_order() { let mut i = 0; while i < 12 { if let Some(m) = consumer.recv().now_or_never() { - let partition = m.unwrap().partition(); + // retry on transient errors until we get a message + let m = match m { + Err(KafkaError::MessageConsumption( + RDKafkaErrorCode::BrokerTransportFailure, + )) + | Err(KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown)) + | Err(KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut)) => { + continue + } + Err(err) => { + panic!("Unexpected error receiving message: {:?}", err); + } + Ok(m) => m, + }; + let partition: i32 = m.partition(); assert!(partition == 0 || partition == 2); i += 1; } if let Some(m) = partition1.recv().now_or_never() { - assert_eq!(m.unwrap().partition(), 1); + // retry on transient errors until we get a message + let m = match m { + Err(KafkaError::MessageConsumption( + RDKafkaErrorCode::BrokerTransportFailure, + )) + | Err(KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown)) + | Err(KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut)) => { + continue + } + Err(err) => { + panic!("Unexpected error receiving message: {:?}", err); + } + Ok(m) => m, + }; + assert_eq!(m.partition(), 1); i += 1; } } diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs index e1ce16bdf..c4aa305f7 100644 --- a/tests/test_low_consumers.rs +++ b/tests/test_low_consumers.rs @@ -288,13 +288,41 @@ async fn test_consume_partition_order() { let mut i = 0; while i < 12 { if let Some(m) = consumer.poll(Timeout::After(Duration::from_secs(0))) { - let partition = m.unwrap().partition(); + // retry on transient errors until we get a message + let m = match m { + Err(KafkaError::MessageConsumption( + RDKafkaErrorCode::BrokerTransportFailure, + )) + | Err(KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown)) + | Err(KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut)) => { + continue + } + Err(err) => { + panic!("Unexpected error receiving message: {:?}", err); + } + Ok(m) => m, + }; + let partition = m.partition(); assert!(partition == 0 || partition == 2); i += 1; } if let Some(m) = partition1.poll(Timeout::After(Duration::from_secs(0))) { - assert_eq!(m.unwrap().partition(), 1); + // retry on transient errors until we get a message + let m = match m { + Err(KafkaError::MessageConsumption( + RDKafkaErrorCode::BrokerTransportFailure, + )) + | Err(KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown)) + | Err(KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut)) => { + continue + } + Err(err) => { + panic!("Unexpected error receiving message: {:?}", err); + } + Ok(m) => m, + }; + assert_eq!(m.partition(), 1); i += 1; } } From 34fc3356a9956185f0bb2e44662db617536614f7 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Mon, 30 Oct 2023 15:47:12 -0300 Subject: [PATCH 21/24] Expose a close_queue and closed methods If you have a consumer wrapping this one (FFI cases), the outer consumer must close the queue and serve the events via Poll. Otherwise it will hang forever as prior to calling close there's a rebalance & rdkafka awaits a response before continuing. --- src/consumer/base_consumer.rs | 21 +++++++++++++++++++++ src/error.rs | 8 ++++++++ 2 files changed, 29 insertions(+) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 01f1af462..b2518b73f 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -331,6 +331,27 @@ where PartitionQueue::new(self.clone(), queue) }) } + + /// Close the queue used by a consumer. + /// Only exposed for advanced usage of this API and should not be used under normal circumstances. + pub fn close_queue(&self) -> KafkaResult<()> { + let err = unsafe { + RDKafkaError::from_ptr(rdsys::rd_kafka_consumer_close_queue( + self.client.native_ptr(), + self.queue.ptr(), + )) + }; + if err.is_error() { + Err(KafkaError::ConsumerQueueClose(err.code())) + } else { + Ok(()) + } + } + + /// Returns true if the consumer is closed, else false. + pub fn closed(&self) -> bool { + unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 } + } } impl Consumer for BaseConsumer diff --git a/src/error.rs b/src/error.rs index 72e364479..46a40be54 100644 --- a/src/error.rs +++ b/src/error.rs @@ -147,6 +147,8 @@ pub enum KafkaError { ClientCreation(String), /// Consumer commit failed. ConsumerCommit(RDKafkaErrorCode), + /// Consumer queue close failed. + ConsumerQueueClose(RDKafkaErrorCode), /// Flushing failed Flush(RDKafkaErrorCode), /// Global error. @@ -204,6 +206,9 @@ impl fmt::Debug for KafkaError { KafkaError::ConsumerCommit(err) => { write!(f, "KafkaError (Consumer commit error: {})", err) } + KafkaError::ConsumerQueueClose(err) => { + write!(f, "KafkaError (Consumer queue close error: {})", err) + } KafkaError::Flush(err) => write!(f, "KafkaError (Flush error: {})", err), KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err), KafkaError::GroupListFetch(err) => { @@ -255,6 +260,7 @@ impl fmt::Display for KafkaError { } KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err), KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err), + KafkaError::ConsumerQueueClose(err) => write!(f, "Consumer queue close error: {}", err), KafkaError::Flush(err) => write!(f, "Flush error: {}", err), KafkaError::Global(err) => write!(f, "Global error: {}", err), KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err), @@ -288,6 +294,7 @@ impl Error for KafkaError { KafkaError::ClientConfig(..) => None, KafkaError::ClientCreation(_) => None, KafkaError::ConsumerCommit(err) => Some(err), + KafkaError::ConsumerQueueClose(err) => Some(err), KafkaError::Flush(err) => Some(err), KafkaError::Global(err) => Some(err), KafkaError::GroupListFetch(err) => Some(err), @@ -327,6 +334,7 @@ impl KafkaError { KafkaError::ClientConfig(..) => None, KafkaError::ClientCreation(_) => None, KafkaError::ConsumerCommit(err) => Some(*err), + KafkaError::ConsumerQueueClose(err) => Some(*err), KafkaError::Flush(err) => Some(*err), KafkaError::Global(err) => Some(*err), KafkaError::GroupListFetch(err) => Some(*err), From 7202e7bcc14bda899ca03ccd763aa492c57cfcf6 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Mon, 6 Nov 2023 10:12:12 -0300 Subject: [PATCH 22/24] Use closed and close_queue methods on drop --- src/consumer/base_consumer.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index b2518b73f..a6d1e39a1 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -716,15 +716,12 @@ where fn drop(&mut self) { trace!("Destroying consumer: {:?}", self.client.native_ptr()); if self.group_id.is_some() { - let err = unsafe { - rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr()) - }; - if !err.is_null() { - error!("Failed to close the consumer queue on drop"); - } - - while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 { - self.poll(Duration::from_millis(100)); + if let Err(err) = self.close_queue() { + error!("Failed to close consumer queue on drop: {}", err); + } else { + while !self.closed() { + self.poll(Duration::from_millis(100)); + } } } trace!("Consumer destroyed: {:?}", self.client.native_ptr()); From 3b1394045021723bf1468a15471e2b819bdf6d4d Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Mon, 6 Nov 2023 10:12:31 -0300 Subject: [PATCH 23/24] Propagate fatal errors With the Event API we propagate generic client instance-level errors, such as broker connection failures, authentication issues, etc. However, fatal errors are also propagated via the Event API. These indicates that the particular instance of the client (producer/consumer) becomes non-functional. --- src/consumer/base_consumer.rs | 20 ++++++++++---------- src/error.rs | 10 ++++++++++ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index a6d1e39a1..5fcc42bc5 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -225,16 +225,16 @@ where fn handle_error_event(&self, event: NativePtr) -> Option { let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) }; if rdkafka_err.is_error() { - let err = match rdkafka_err { - rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => { - let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) }; - let partition = unsafe { (*tp_ptr).partition }; - unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) }; - KafkaError::PartitionEOF(partition) - } - e => KafkaError::MessageConsumption(e.into()), - }; - Some(err) + if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF { + let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) }; + let partition = unsafe { (*tp_ptr).partition }; + unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) }; + Some(KafkaError::PartitionEOF(partition)) + } else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 { + Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into())) + } else { + Some(KafkaError::MessageConsumption(rdkafka_err.into())) + } } else { None } diff --git a/src/error.rs b/src/error.rs index 46a40be54..312a6bb65 100644 --- a/src/error.rs +++ b/src/error.rs @@ -157,6 +157,8 @@ pub enum KafkaError { GroupListFetch(RDKafkaErrorCode), /// Message consumption failed. MessageConsumption(RDKafkaErrorCode), + /// Message consumption failed with fatal error. + MessageConsumptionFatal(RDKafkaErrorCode), /// Message production error. MessageProduction(RDKafkaErrorCode), /// Metadata fetch error. @@ -217,6 +219,9 @@ impl fmt::Debug for KafkaError { KafkaError::MessageConsumption(err) => { write!(f, "KafkaError (Message consumption error: {})", err) } + KafkaError::MessageConsumptionFatal(err) => { + write!(f, "(Fatal) KafkaError (Message consumption error: {})", err) + } KafkaError::MessageProduction(err) => { write!(f, "KafkaError (Message production error: {})", err) } @@ -265,6 +270,9 @@ impl fmt::Display for KafkaError { KafkaError::Global(err) => write!(f, "Global error: {}", err), KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err), KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err), + KafkaError::MessageConsumptionFatal(err) => { + write!(f, "(Fatal) Message consumption error: {}", err) + } KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err), KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err), KafkaError::NoMessageReceived => { @@ -299,6 +307,7 @@ impl Error for KafkaError { KafkaError::Global(err) => Some(err), KafkaError::GroupListFetch(err) => Some(err), KafkaError::MessageConsumption(err) => Some(err), + KafkaError::MessageConsumptionFatal(err) => Some(err), KafkaError::MessageProduction(err) => Some(err), KafkaError::MetadataFetch(err) => Some(err), KafkaError::NoMessageReceived => None, @@ -339,6 +348,7 @@ impl KafkaError { KafkaError::Global(err) => Some(*err), KafkaError::GroupListFetch(err) => Some(*err), KafkaError::MessageConsumption(err) => Some(*err), + KafkaError::MessageConsumptionFatal(err) => Some(*err), KafkaError::MessageProduction(err) => Some(*err), KafkaError::MetadataFetch(err) => Some(*err), KafkaError::NoMessageReceived => None, From 978c9649be902da859d8837446bf3eea6cc81fb2 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Tue, 7 Nov 2023 14:39:04 -0300 Subject: [PATCH 24/24] Fix op timeout computation logic on poll_queue --- src/consumer/base_consumer.rs | 7 ++++--- src/util.rs | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 5fcc42bc5..08ec51b78 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -5,7 +5,7 @@ use std::mem::ManuallyDrop; use std::os::raw::c_void; use std::ptr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use log::{error, warn}; use rdkafka_sys as rdsys; @@ -123,6 +123,7 @@ where queue: &NativeQueue, timeout: T, ) -> Option>> { + let now = Instant::now(); let mut timeout = timeout.into(); let min_poll_interval = self.context().main_queue_min_poll_interval(); loop { @@ -158,10 +159,10 @@ where } } - if op_timeout >= timeout { + timeout = timeout.saturating_sub(now.elapsed()); + if timeout.is_zero() { return None; } - timeout -= op_timeout; } } diff --git a/src/util.rs b/src/util.rs index 16b146f58..543481d3f 100644 --- a/src/util.rs +++ b/src/util.rs @@ -48,6 +48,22 @@ impl Timeout { Timeout::Never => -1, } } + + /// Saturating `Duration` subtraction to Timeout. + pub(crate) fn saturating_sub(&self, rhs: Duration) -> Timeout { + match (self, rhs) { + (Timeout::After(lhs), rhs) => Timeout::After(lhs.saturating_sub(rhs)), + (Timeout::Never, _) => Timeout::Never, + } + } + + /// Returns `true` if the timeout is zero. + pub(crate) fn is_zero(&self) -> bool { + match self { + Timeout::After(d) => d.is_zero(), + Timeout::Never => false, + } + } } impl std::ops::SubAssign for Timeout {