Skip to content

Commit

Permalink
Support producing a FutureRecord for Kafka (#17)
Browse files Browse the repository at this point in the history
* Support producing a FutureRecord for Kafka

* Address feedback
  • Loading branch information
scrogson authored Nov 19, 2023
1 parent 874a465 commit 63c0cac
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
24 changes: 22 additions & 2 deletions sea-streamer-kafka/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use crate::{
cluster::cluster_uri, impl_into_string, stream_err, BaseOptionKey, KafkaConnectOptions,
KafkaErr, KafkaResult, DEFAULT_TIMEOUT,
};
pub use rdkafka::producer::FutureRecord;
use rdkafka::{
config::ClientConfig,
producer::{DeliveryFuture, FutureRecord as RawPayload, Producer as ProducerTrait},
producer::{DeliveryFuture, Producer as ProducerTrait},
};
pub use rdkafka::{consumer::ConsumerGroupMetadata, TopicPartitionList};
use sea_streamer_runtime::spawn_blocking;
Expand Down Expand Up @@ -88,7 +89,7 @@ impl Producer for KafkaProducer {
fn send_to<S: Buffer>(&self, stream: &StreamKey, payload: S) -> KafkaResult<Self::SendFuture> {
let fut = self
.get()
.send_result(RawPayload::<str, [u8]>::to(stream.name()).payload(payload.as_bytes()))
.send_result(FutureRecord::<str, [u8]>::to(stream.name()).payload(payload.as_bytes()))
.map_err(|(err, _raw)| stream_err(err))?;

Ok(SendFuture {
Expand Down Expand Up @@ -134,6 +135,25 @@ impl KafkaProducer {
.expect("Producer is still inside a transaction, please await the future")
}

/// Send a `FutureRecord` to a stream
pub fn send_record<K, P>(&self, record: FutureRecord<K, P>) -> KafkaResult<SendFuture>
where
K: rdkafka::message::ToBytes + ?Sized,
P: rdkafka::message::ToBytes + ?Sized,
{
let stream = StreamKey::new(record.topic.to_owned())?;

let fut = self
.get()
.send_result(record)
.map_err(|(err, _raw)| stream_err(err))?;

Ok(SendFuture {
stream_key: Some(stream),
fut,
})
}

/// Returns the number of messages that are either waiting to be sent or
/// are sent but are waiting to be acknowledged.
pub fn in_flight_count(&self) -> i32 {
Expand Down
23 changes: 23 additions & 0 deletions sea-streamer-kafka/tests/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ async fn main() -> anyhow::Result<()> {
producer.send(message)?;
}

for i in 10..20 {
use rdkafka::producer::FutureRecord;
let key = i.to_string();
let payload = format!("{i}");
let record = FutureRecord::to(topic.name())
.key(&key)
.partition(0)
.payload(&payload);
let receipt = producer.send_record(record)?.await?;
assert_eq!(receipt.stream_key(), &topic);
assert_eq!(receipt.sequence(), &i);
assert_eq!(receipt.shard_id(), &zero);
}

producer.end().await?;

let mut options = KafkaConsumerOptions::new(ConsumerMode::RealTime);
Expand Down Expand Up @@ -100,6 +114,15 @@ async fn main() -> anyhow::Result<()> {
assert_eq!(seq, [7, 8, 9]);
println!("Seek stream ... ok");

let seq = consume(&mut consumer, 10).await;
// this should continue from 10
assert_eq!(seq, [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
println!("Resume ... ok");

// commit up to 19
consumer.commit(&topic, &zero, &19).await?;
println!("Commit ... ok");

async fn consume(consumer: &mut KafkaConsumer, num: usize) -> Vec<usize> {
consumer
.stream()
Expand Down

0 comments on commit 63c0cac

Please sign in to comment.