Skip to content

Commit

Permalink
refactor: Remove the Option inside RDKafkaError
Browse files Browse the repository at this point in the history
  • Loading branch information
Marwes committed Mar 19, 2024
1 parent e69c2aa commit 33bc42d
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
8 changes: 4 additions & 4 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ where
self.queue.ptr(),
))
};
if err.is_error() {
if let Some(err) = err {
Err(KafkaError::ConsumerQueueClose(err.code()))
} else {
Ok(())
Expand Down Expand Up @@ -423,7 +423,7 @@ where
assignment.ptr(),
))
};
if ret.is_error() {
if let Some(ret) = ret {
let error = ret.name();
return Err(KafkaError::Subscription(error));
};
Expand All @@ -437,7 +437,7 @@ where
assignment.ptr(),
))
};
if ret.is_error() {
if let Some(ret) = ret {
let error = ret.name();
return Err(KafkaError::Subscription(error));
};
Expand Down Expand Up @@ -477,7 +477,7 @@ where
timeout.into().as_millis(),
))
};
if ret.is_error() {
if let Some(ret) = ret {
let error = ret.name();
return Err(KafkaError::Seek(error));
}
Expand Down
24 changes: 13 additions & 11 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,12 @@ impl<'a, C: ConsumerContext> MessageStream<'a, C> {
self.consumer.poll(Duration::ZERO)
}
}
}

impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> {
type Item = KafkaResult<BorrowedMessage<'a>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next_item(&self, cx: &mut Context<'_>) -> Poll<KafkaResult<BorrowedMessage<'a>>> {
// If there is a message ready, yield it immediately to avoid the
// taking the lock in `self.set_waker`.
if let Some(message) = self.poll() {
return Poll::Ready(Some(message));
return Poll::Ready(message);
}

// Otherwise, we need to wait for a message to become available. Store
Expand All @@ -153,11 +149,19 @@ impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> {
// installed the waker.
match self.poll() {
None => Poll::Pending,
Some(message) => Poll::Ready(Some(message)),
Some(message) => Poll::Ready(message),
}
}
}

impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> {
type Item = KafkaResult<BorrowedMessage<'a>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next_item(cx).map(Some)
}
}

impl<'a, C: ConsumerContext> Drop for MessageStream<'a, C> {
fn drop(&mut self) {
self.wakers.unregister(self.slot);
Expand Down Expand Up @@ -297,10 +301,8 @@ where
///
/// [cancellation safe]: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
pub async fn recv(&self) -> Result<BorrowedMessage<'_>, KafkaError> {
self.stream()
.next()
.await
.expect("kafka streams never terminate")
let stream = self.stream();
futures_util::future::poll_fn(move |cx| stream.poll_next_item(cx)).await
}

/// Splits messages for the specified partition into their own stream.
Expand Down
13 changes: 5 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ impl IsError for RDKafkaConfRes {

impl IsError for RDKafkaError {
fn is_error(&self) -> bool {
self.0.is_some()
true
}
}

/// Native rdkafka error.
#[derive(Clone)]
pub struct RDKafkaError(Option<Arc<NativePtr<rdsys::rd_kafka_error_t>>>);
pub struct RDKafkaError(Arc<NativePtr<rdsys::rd_kafka_error_t>>);

unsafe impl KafkaDrop for rdsys::rd_kafka_error_t {
const TYPE: &'static str = "error";
Expand All @@ -56,15 +56,12 @@ unsafe impl Send for RDKafkaError {}
unsafe impl Sync for RDKafkaError {}

impl RDKafkaError {
pub(crate) unsafe fn from_ptr(ptr: *mut rdsys::rd_kafka_error_t) -> RDKafkaError {
RDKafkaError(NativePtr::from_ptr(ptr).map(Arc::new))
pub(crate) unsafe fn from_ptr(ptr: *mut rdsys::rd_kafka_error_t) -> Option<RDKafkaError> {
NativePtr::from_ptr(ptr).map(|p| RDKafkaError(Arc::new(p)))
}

fn ptr(&self) -> *const rdsys::rd_kafka_error_t {
match &self.0 {
None => ptr::null(),
Some(p) => p.ptr(),
}
self.0.ptr()
}

/// Returns the error code or [`RDKafkaErrorCode::NoError`] if the error is
Expand Down
10 changes: 5 additions & 5 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ where
timeout.into().as_millis(),
))
};
if ret.is_error() {
if let Some(ret) = ret {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
Expand All @@ -547,7 +547,7 @@ where
fn begin_transaction(&self) -> KafkaResult<()> {
let ret =
unsafe { RDKafkaError::from_ptr(rdsys::rd_kafka_begin_transaction(self.native_ptr())) };
if ret.is_error() {
if let Some(ret) = ret {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
Expand All @@ -568,7 +568,7 @@ where
timeout.into().as_millis(),
))
};
if ret.is_error() {
if let Some(ret) = ret {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
Expand All @@ -589,7 +589,7 @@ where
timeout.as_millis(),
))
};
if ret.is_error() {
if let Some(ret) = ret {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
Expand All @@ -603,7 +603,7 @@ where
timeout.into().as_millis(),
))
};
if ret.is_error() {
if let Some(ret) = ret {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
Expand Down

0 comments on commit 33bc42d

Please sign in to comment.