From 63c0cace3963213b3198347ec7b25543e93ad511 Mon Sep 17 00:00:00 2001 From: Sonny Scroggin Date: Sun, 19 Nov 2023 05:54:02 -0600 Subject: [PATCH] Support producing a FutureRecord for Kafka (#17) * Support producing a FutureRecord for Kafka * Address feedback --- sea-streamer-kafka/src/producer.rs | 24 ++++++++++++++++++++++-- sea-streamer-kafka/tests/consumer.rs | 23 +++++++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/sea-streamer-kafka/src/producer.rs b/sea-streamer-kafka/src/producer.rs index fc3ff8a..b0ca51e 100644 --- a/sea-streamer-kafka/src/producer.rs +++ b/sea-streamer-kafka/src/producer.rs @@ -4,9 +4,10 @@ use crate::{ cluster::cluster_uri, impl_into_string, stream_err, BaseOptionKey, KafkaConnectOptions, KafkaErr, KafkaResult, DEFAULT_TIMEOUT, }; +pub use rdkafka::producer::FutureRecord; use rdkafka::{ config::ClientConfig, - producer::{DeliveryFuture, FutureRecord as RawPayload, Producer as ProducerTrait}, + producer::{DeliveryFuture, Producer as ProducerTrait}, }; pub use rdkafka::{consumer::ConsumerGroupMetadata, TopicPartitionList}; use sea_streamer_runtime::spawn_blocking; @@ -88,7 +89,7 @@ impl Producer for KafkaProducer { fn send_to(&self, stream: &StreamKey, payload: S) -> KafkaResult { let fut = self .get() - .send_result(RawPayload::::to(stream.name()).payload(payload.as_bytes())) + .send_result(FutureRecord::::to(stream.name()).payload(payload.as_bytes())) .map_err(|(err, _raw)| stream_err(err))?; Ok(SendFuture { @@ -134,6 +135,25 @@ impl KafkaProducer { .expect("Producer is still inside a transaction, please await the future") } + /// Send a `FutureRecord` to a stream + pub fn send_record(&self, record: FutureRecord) -> KafkaResult + where + K: rdkafka::message::ToBytes + ?Sized, + P: rdkafka::message::ToBytes + ?Sized, + { + let stream = StreamKey::new(record.topic.to_owned())?; + + let fut = self + .get() + .send_result(record) + .map_err(|(err, _raw)| stream_err(err))?; + + Ok(SendFuture { + stream_key: Some(stream), + fut, + }) + } + /// Returns the number of messages that are either waiting to be sent or /// are sent but are waiting to be acknowledged. pub fn in_flight_count(&self) -> i32 { diff --git a/sea-streamer-kafka/tests/consumer.rs b/sea-streamer-kafka/tests/consumer.rs index b7582cb..ca8f138 100644 --- a/sea-streamer-kafka/tests/consumer.rs +++ b/sea-streamer-kafka/tests/consumer.rs @@ -42,6 +42,20 @@ async fn main() -> anyhow::Result<()> { producer.send(message)?; } + for i in 10..20 { + use rdkafka::producer::FutureRecord; + let key = i.to_string(); + let payload = format!("{i}"); + let record = FutureRecord::to(topic.name()) + .key(&key) + .partition(0) + .payload(&payload); + let receipt = producer.send_record(record)?.await?; + assert_eq!(receipt.stream_key(), &topic); + assert_eq!(receipt.sequence(), &i); + assert_eq!(receipt.shard_id(), &zero); + } + producer.end().await?; let mut options = KafkaConsumerOptions::new(ConsumerMode::RealTime); @@ -100,6 +114,15 @@ async fn main() -> anyhow::Result<()> { assert_eq!(seq, [7, 8, 9]); println!("Seek stream ... ok"); + let seq = consume(&mut consumer, 10).await; + // this should continue from 10 + assert_eq!(seq, [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]); + println!("Resume ... ok"); + + // commit up to 19 + consumer.commit(&topic, &zero, &19).await?; + println!("Commit ... ok"); + async fn consume(consumer: &mut KafkaConsumer, num: usize) -> Vec { consumer .stream()