From 36530cf5dcb14e456e5ee7c28ac3ba830cb53b0e Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Thu, 10 Oct 2024 22:42:58 +0100 Subject: [PATCH] Support nanosecond timestamp in Redis --- sea-streamer-file/src/file.rs | 2 +- sea-streamer-file/src/format.rs | 8 ++-- sea-streamer-file/src/messages.rs | 22 +++++------ sea-streamer-file/src/producer/backend.rs | 4 +- sea-streamer-file/tests/consumer.rs | 33 +++++++++------- sea-streamer-kafka/src/producer.rs | 4 +- sea-streamer-redis/Cargo.toml | 1 + sea-streamer-redis/src/consumer/mod.rs | 3 +- sea-streamer-redis/src/consumer/node.rs | 27 +++++++------ sea-streamer-redis/src/consumer/options.rs | 3 +- sea-streamer-redis/src/message.rs | 46 ++++++++++++---------- sea-streamer-redis/src/producer.rs | 19 +++++++-- sea-streamer-redis/src/streamer.rs | 25 +++++++++++- sea-streamer-redis/tests/realtime.rs | 2 + sea-streamer-stdio/src/producer.rs | 2 +- sea-streamer-types/Cargo.toml | 3 +- sea-streamer-types/src/stream.rs | 3 ++ 17 files changed, 131 insertions(+), 76 deletions(-) diff --git a/sea-streamer-file/src/file.rs b/sea-streamer-file/src/file.rs index bf3a7ce..dbc5df3 100644 --- a/sea-streamer-file/src/file.rs +++ b/sea-streamer-file/src/file.rs @@ -224,7 +224,7 @@ impl AsyncFile { .seek(match to { SeqPos::Beginning => SeekFrom::Start(0), SeqPos::End => SeekFrom::End(0), - SeqPos::At(to) => SeekFrom::Start(to), + SeqPos::At(to) => SeekFrom::Start(to.try_into().expect("SeqNo out of range")), }) .await .map_err(FileErr::IoError)?; diff --git a/sea-streamer-file/src/format.rs b/sea-streamer-file/src/format.rs index b28db14..d51171c 100644 --- a/sea-streamer-file/src/format.rs +++ b/sea-streamer-file/src/format.rs @@ -63,7 +63,8 @@ use crate::{ ByteSink, ByteSource, Bytes, FileErr, }; use sea_streamer_types::{ - Buffer, Message as MessageTrait, OwnedMessage, ShardId, StreamKey, StreamKeyErr, Timestamp, + Buffer, Message as MessageTrait, OwnedMessage, SeqNo, ShardId, StreamKey, StreamKeyErr, + Timestamp, }; #[cfg(feature = "serde")] use serde::Serialize; @@ -353,7 +354,7 @@ impl MessageHeader { use sea_streamer_types::MessageHeader as Header; let stream_key = StreamKey::new(ShortString::read_from(file).await?.string())?; let shard_id = ShardId::new(U64::read_from(file).await?.0); - let sequence = U64::read_from(file).await?.0; + let sequence = U64::read_from(file).await?.0 as SeqNo; let timestamp = UnixTimestamp::read_from(file).await?.0; Ok(Self(Header::new(stream_key, shard_id, sequence, timestamp))) } @@ -363,7 +364,8 @@ impl MessageHeader { let h = self.0; sum += ShortString::new(h.stream_key().name().to_owned())?.write_to(sink)?; sum += U64(h.shard_id().id()).write_to(sink)?; - sum += U64(*h.sequence()).write_to(sink)?; + sum += U64(TryInto::::try_into(*h.sequence()).expect("SeqNo out of range")) + .write_to(sink)?; sum += UnixTimestamp(*h.timestamp()).write_to(sink)?; Ok(sum) } diff --git a/sea-streamer-file/src/messages.rs b/sea-streamer-file/src/messages.rs index 267129c..53c38d9 100644 --- a/sea-streamer-file/src/messages.rs +++ b/sea-streamer-file/src/messages.rs @@ -109,12 +109,12 @@ impl MessageSource { /// Warning: This future must not be canceled. pub async fn rewind(&mut self, target: SeqPos) -> Result { let pos = match target { - SeqPos::Beginning | SeqPos::At(0) => SeqPos::At(Header::size() as u64), + SeqPos::Beginning | SeqPos::At(0) => SeqPos::At(Header::size() as SeqNo), SeqPos::End => SeqPos::End, SeqPos::At(nth) => { - let at = nth * self.beacon_interval(); + let at = nth as u64 * self.beacon_interval(); if at < self.known_size() { - SeqPos::At(at) + SeqPos::At(at as SeqNo) } else { SeqPos::End } @@ -130,7 +130,7 @@ impl MessageSource { SeqPos::Beginning | SeqPos::At(0) => unreachable!(), SeqPos::End => max, SeqPos::At(nth) => { - let at = nth * self.beacon_interval(); + let at = nth as u64 * self.beacon_interval(); if at < self.known_size() { at } else { @@ -139,7 +139,7 @@ impl MessageSource { } }; if self.offset != pos { - self.offset = self.source.seek(SeqPos::At(pos)).await?; + self.offset = self.source.seek(SeqPos::At(pos as SeqNo)).await?; } } @@ -175,7 +175,7 @@ impl MessageSource { while let Ok(message) = Message::read_from(&mut buffer).await { next += message.size() as u64; } - self.offset = self.source.seek(SeqPos::At(next)).await?; + self.offset = self.source.seek(SeqPos::At(next as SeqNo)).await?; } Ok((self.offset / self.beacon_interval()) as u32) @@ -226,7 +226,7 @@ impl MessageSource { } }; // now we know roughly where's the message - match self.rewind(SeqPos::At(pos as u64)).await { + match self.rewind(SeqPos::At(pos as SeqNo)).await { Ok(_) => (), Err(e) => { break 'outer match e { @@ -260,7 +260,7 @@ impl MessageSource { self.source = source.switch_to(source_type).await?; if res.is_err() { - self.source.seek(SeqPos::At(savepoint)).await?; + self.source.seek(SeqPos::At(savepoint as SeqNo)).await?; self.buffer.clear(); self.pending.take(); } @@ -399,7 +399,7 @@ impl BeaconReader for MessageSource { fn survey(&mut self, at: NonZeroU32) -> Self::Future<'_> { async move { let at = at.get() as u64 * self.beacon_interval(); - let offset = self.source.seek(SeqPos::At(at)).await?; + let offset = self.source.seek(SeqPos::At(at as SeqNo)).await?; if at == offset { let beacon = Beacon::read_from(&mut self.source).await?; Ok(beacon) @@ -462,7 +462,7 @@ impl MessageSink { if nth > 0 { // we need to rewind further backwards nth -= 1; - source.rewind(SeqPos::At(nth as u64)).await?; + source.rewind(SeqPos::At(nth as SeqNo)).await?; } else { // we reached the start now break; @@ -492,7 +492,7 @@ impl MessageSink { let has_beacon = source.has_beacon(offset).is_some(); if let DynFileSource::FileReader(reader) = source.source { let (mut file, _, _) = reader.end(); - assert_eq!(offset, file.seek(SeqPos::At(offset)).await?); + assert_eq!(offset, file.seek(SeqPos::At(offset as SeqNo)).await?); let mut sink = FileSink::new(file, limit)?; if has_beacon { diff --git a/sea-streamer-file/src/producer/backend.rs b/sea-streamer-file/src/producer/backend.rs index 82b2638..7e766ff 100644 --- a/sea-streamer-file/src/producer/backend.rs +++ b/sea-streamer-file/src/producer/backend.rs @@ -204,7 +204,7 @@ impl Writer { n -= 1; } // 2. go forward from there and read all messages up to started_from, recording the latest messages - source.rewind(SeqPos::At(n as u64)).await?; + source.rewind(SeqPos::At(n as SeqNo)).await?; while source.offset() < sink.started_from() { match source.next().await { Ok(msg) => { @@ -234,7 +234,7 @@ impl Writer { _ => panic!("Must be FileReader"), }; let (mut file, _, _) = reader.end(); - file.seek(SeqPos::At(sink.offset())).await?; // restore offset + file.seek(SeqPos::At(sink.offset() as SeqNo)).await?; // restore offset sink.use_file(FileSink::new(file, file_size_limit)?); // now we've gone through the stream, we can safely assume the stream state let entry = streams.entry(key.clone()).or_default(); diff --git a/sea-streamer-file/tests/consumer.rs b/sea-streamer-file/tests/consumer.rs index bca5304..e11fb3c 100644 --- a/sea-streamer-file/tests/consumer.rs +++ b/sea-streamer-file/tests/consumer.rs @@ -15,7 +15,7 @@ async fn consumer() -> anyhow::Result<()> { }; use sea_streamer_types::{ export::futures::TryStreamExt, Buffer, Consumer, Message, MessageHeader, OwnedMessage, - ShardId, SharedMessage, StreamErr, StreamKey, Streamer, Timestamp, + SeqNo, ShardId, SharedMessage, StreamErr, StreamKey, Streamer, Timestamp, }; const TEST: &str = "consumer"; @@ -33,13 +33,14 @@ async fn consumer() -> anyhow::Result<()> { let shard = ShardId::new(1); let message = |i: u64| -> OwnedMessage { - let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc()); + let header = + MessageHeader::new(stream_key.clone(), shard, i as SeqNo, Timestamp::now_utc()); OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes()) }; let check = |i: u64, mess: SharedMessage| { assert_eq!(mess.header().stream_key(), &stream_key); assert_eq!(mess.header().shard_id(), &shard); - assert_eq!(mess.header().sequence(), &i); + assert_eq!(*mess.header().sequence() as u64, i); assert_eq!( mess.message().as_str().unwrap(), format!("{}-{}", stream_key.name(), i) @@ -119,8 +120,8 @@ async fn demux() -> anyhow::Result<()> { DEFAULT_FILE_SIZE_LIMIT, }; use sea_streamer_types::{ - Buffer, Consumer, Message, MessageHeader, OwnedMessage, ShardId, SharedMessage, StreamKey, - Streamer, Timestamp, + Buffer, Consumer, Message, MessageHeader, OwnedMessage, SeqNo, ShardId, SharedMessage, + StreamKey, Streamer, Timestamp, }; const TEST: &str = "demux"; @@ -139,16 +140,18 @@ async fn demux() -> anyhow::Result<()> { let shard = ShardId::new(1); let cat = |i: u64| -> OwnedMessage { - let header = MessageHeader::new(cat_key.clone(), shard, i, Timestamp::now_utc()); + let header = + MessageHeader::new(cat_key.clone(), shard, i as SeqNo, Timestamp::now_utc()); OwnedMessage::new(header, format!("{}", i).into_bytes()) }; let dog = |i: u64| -> OwnedMessage { - let header = MessageHeader::new(dog_key.clone(), shard, i, Timestamp::now_utc()); + let header = + MessageHeader::new(dog_key.clone(), shard, i as SeqNo, Timestamp::now_utc()); OwnedMessage::new(header, format!("{}", i).into_bytes()) }; let check = |i: u64, mess: &SharedMessage| { assert_eq!(mess.header().shard_id(), &shard); - assert_eq!(mess.header().sequence(), &i); + assert_eq!(*mess.header().sequence() as u64, i); assert_eq!(mess.message().as_str().unwrap(), format!("{}", i)); }; let is_cat = |i: u64, m: SharedMessage| { @@ -217,7 +220,7 @@ async fn group() -> anyhow::Result<()> { }; use sea_streamer_types::{ Buffer, Consumer, ConsumerGroup, ConsumerMode, ConsumerOptions, Message, MessageHeader, - OwnedMessage, ShardId, SharedMessage, StreamKey, Streamer, Timestamp, + OwnedMessage, SeqNo, ShardId, SharedMessage, StreamKey, Streamer, Timestamp, }; const TEST: &str = "group"; @@ -236,13 +239,14 @@ async fn group() -> anyhow::Result<()> { let group = ConsumerGroup::new("friends"); let message = |i: u64| -> OwnedMessage { - let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc()); + let header = + MessageHeader::new(stream_key.clone(), shard, i as SeqNo, Timestamp::now_utc()); OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes()) }; let check = |i: u64, mess: SharedMessage| { assert_eq!(mess.header().stream_key(), &stream_key); assert_eq!(mess.header().shard_id(), &shard); - assert_eq!(mess.header().sequence(), &i); + assert_eq!(*mess.header().sequence() as u64, i); assert_eq!( mess.message().as_str().unwrap(), format!("{}-{}", stream_key.name(), i) @@ -325,7 +329,7 @@ async fn seek() -> anyhow::Result<()> { }; use sea_streamer_types::{ Buffer, Consumer, ConsumerGroup, ConsumerMode, ConsumerOptions, Message, MessageHeader, - OwnedMessage, SeqPos, ShardId, SharedMessage, StreamKey, Streamer, Timestamp, + OwnedMessage, SeqNo, SeqPos, ShardId, SharedMessage, StreamKey, Streamer, Timestamp, }; const TEST: &str = "seek"; @@ -344,13 +348,14 @@ async fn seek() -> anyhow::Result<()> { let group = ConsumerGroup::new("group"); let message = |i: u64| -> OwnedMessage { - let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc()); + let header = + MessageHeader::new(stream_key.clone(), shard, i as SeqNo, Timestamp::now_utc()); OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes()) }; let check = |i: u64, mess: SharedMessage| { assert_eq!(mess.header().stream_key(), &stream_key); assert_eq!(mess.header().shard_id(), &shard); - assert_eq!(mess.header().sequence(), &i); + assert_eq!(*mess.header().sequence() as u64, i); assert_eq!( mess.message().as_str().unwrap(), format!("{}-{}", stream_key.name(), i) diff --git a/sea-streamer-kafka/src/producer.rs b/sea-streamer-kafka/src/producer.rs index 5ddb792..6e3fbd9 100644 --- a/sea-streamer-kafka/src/producer.rs +++ b/sea-streamer-kafka/src/producer.rs @@ -12,7 +12,7 @@ pub use rdkafka::{consumer::ConsumerGroupMetadata, producer::FutureRecord, Topic use sea_streamer_runtime::spawn_blocking; use sea_streamer_types::{ export::futures::FutureExt, runtime_error, Buffer, MessageHeader, Producer, ProducerOptions, - ShardId, StreamErr, StreamKey, StreamResult, StreamerUri, Timestamp, + SeqNo, ShardId, StreamErr, StreamKey, StreamResult, StreamerUri, Timestamp, }; #[derive(Clone)] @@ -373,7 +373,7 @@ impl Future for SendFuture { Ok((part, offset)) => Ok(MessageHeader::new( self.stream_key.take().expect("Must have stream_key"), ShardId::new(part as u64), - offset as u64, + offset as SeqNo, Timestamp::now_utc(), )), Err((err, _)) => Err(stream_err(err)), diff --git a/sea-streamer-redis/Cargo.toml b/sea-streamer-redis/Cargo.toml index eb90be5..007a659 100644 --- a/sea-streamer-redis/Cargo.toml +++ b/sea-streamer-redis/Cargo.toml @@ -40,6 +40,7 @@ runtime-async-std = ["async-std", "redis/async-std-comp", "sea-streamer-runtime/ runtime-tokio = ["tokio", "redis/tokio-comp", "sea-streamer-runtime/runtime-tokio"] runtime-async-std-native-tls = ["runtime-async-std", "redis/async-std-native-tls-comp"] runtime-tokio-native-tls = ["runtime-tokio", "redis/tokio-native-tls-comp"] +nanosecond-timestamp = ["sea-streamer-types/wide-seq-no"] [[bin]] name = "consumer" diff --git a/sea-streamer-redis/src/consumer/mod.rs b/sea-streamer-redis/src/consumer/mod.rs index 468b79a..465dbad 100644 --- a/sea-streamer-redis/src/consumer/mod.rs +++ b/sea-streamer-redis/src/consumer/mod.rs @@ -16,7 +16,7 @@ use std::{fmt::Debug, future::Future, sync::Arc, time::Duration}; use crate::{ from_seq_no, get_message_id, host_id, MessageId, RedisCluster, RedisErr, RedisResult, - DEFAULT_TIMEOUT, MAX_MSG_ID, + TimestampFormat, DEFAULT_TIMEOUT, MAX_MSG_ID, }; use sea_streamer_runtime::{spawn_task, timeout}; use sea_streamer_types::{ @@ -50,6 +50,7 @@ pub struct RedisConsumerOptions { batch_size: usize, shard_ownership: ShardOwnership, mkstream: bool, + pub(crate) timestamp_format: TimestampFormat, } #[derive(Debug)] diff --git a/sea-streamer-redis/src/consumer/node.rs b/sea-streamer-redis/src/consumer/node.rs index e0f29c5..c711f43 100644 --- a/sea-streamer-redis/src/consumer/node.rs +++ b/sea-streamer-redis/src/consumer/node.rs @@ -544,20 +544,22 @@ impl Node { log::trace!("XREAD ..."); assert!(self.buffer.is_empty()); match conn.req_packed_command(&cmd).await { - Ok(value) => match StreamReadReply::from_redis_value(value) { - Ok(StreamReadReply(mut mess)) => { - log::trace!("Read {} messages", mess.len()); - if mess.is_empty() { - // If we receive an empty reply, it means if we were reading the pending list - // then the list is now empty - self.group.pending_state = false; + Ok(value) => { + match StreamReadReply::from_redis_value(value, self.options.timestamp_format) { + Ok(StreamReadReply(mut mess)) => { + log::trace!("Read {} messages", mess.len()); + if mess.is_empty() { + // If we receive an empty reply, it means if we were reading the pending list + // then the list is now empty + self.group.pending_state = false; + } + mess.reverse(); + self.buffer = mess; + Ok(ReadResult::Msg(self.buffer.len())) } - mess.reverse(); - self.buffer = mess; - Ok(ReadResult::Msg(self.buffer.len())) + Err(err) => self.send_error(err).await, } - Err(err) => self.send_error(err).await, - }, + } Err(err) => { let kind = err.kind(); if kind == ErrorKind::Moved { @@ -675,6 +677,7 @@ impl Node { value, claiming.stream.0.clone(), claiming.stream.1, + self.options.timestamp_format, ) { Ok(AutoClaimReply(mut mess)) => { log::trace!( diff --git a/sea-streamer-redis/src/consumer/options.rs b/sea-streamer-redis/src/consumer/options.rs index cc01999..9d6558a 100644 --- a/sea-streamer-redis/src/consumer/options.rs +++ b/sea-streamer-redis/src/consumer/options.rs @@ -1,5 +1,5 @@ use super::{constants::*, ConsumerConfig, RedisConsumerOptions}; -use crate::{RedisErr, RedisResult}; +use crate::{RedisErr, RedisResult, TimestampFormat}; use sea_streamer_types::{ConsumerGroup, ConsumerId, ConsumerMode, ConsumerOptions, StreamErr}; use std::time::Duration; @@ -83,6 +83,7 @@ impl ConsumerOptions for RedisConsumerOptions { }, shard_ownership: ShardOwnership::Shared, mkstream: false, + timestamp_format: TimestampFormat::default(), } } diff --git a/sea-streamer-redis/src/message.rs b/sea-streamer-redis/src/message.rs index 86b144d..97466c0 100644 --- a/sea-streamer-redis/src/message.rs +++ b/sea-streamer-redis/src/message.rs @@ -1,4 +1,4 @@ -use crate::{RedisErr, RedisResult, MSG, ZERO}; +use crate::{RedisErr, RedisResult, TimestampFormat as TsFmt, MSG, ZERO}; use redis::Value; use sea_streamer_types::{ MessageHeader, SeqNo, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp, @@ -24,8 +24,12 @@ pub(crate) struct AutoClaimReply(pub(crate) Vec); /// we only allocate 48 bit to the timestamp, and the remaining 16 bit to the sub-sequence number. /// /// This limits the number of messages per millisecond to 65536, -/// and the maximum timestamp to 10889-08-02T05:31:50. -pub fn parse_message_id(id: &str) -> RedisResult<(Timestamp, SeqNo)> { +/// and the maximum timestamp to `10889-08-02T05:31:50`. +/// +/// This limit can be lifted with the `nanosecond-timestamp` feature flag, which widens +/// SeaStreamer's SeqNo to be u128. But squeezing nanosecond timestamps into Redis still +/// limit it to `2554-07-21T23:34:33`. +pub fn parse_message_id(ts_fmt: TsFmt, id: &str) -> RedisResult<(Timestamp, SeqNo)> { if let Some((timestamp, seq_no)) = id.split_once('-') { if let Ok(timestamp) = timestamp.parse::() { if let Ok(seq_no) = seq_no.parse::() { @@ -34,14 +38,20 @@ pub fn parse_message_id(id: &str) -> RedisResult<(Timestamp, SeqNo)> { "Sequence number out of range: {seq_no}" )))); } + #[cfg(not(feature = "nanosecond-timestamp"))] if timestamp > 0xFFFFFFFFFFFF { return Err(StreamErr::Backend(RedisErr::MessageId(format!( "Timestamp out of range: {timestamp}" )))); } + let nano = match ts_fmt { + TsFmt::UnixTimestampMillis => timestamp as i128 * 1_000_000, + #[cfg(feature = "nanosecond-timestamp")] + TsFmt::UnixTimestampNanos => timestamp as i128, + }; return Ok(( - Timestamp::from_unix_timestamp_nanos(timestamp as i128 * 1_000_000).unwrap(), - timestamp << 16 | seq_no, + Timestamp::from_unix_timestamp_nanos(nano).unwrap(), + (timestamp as SeqNo) << 16 | seq_no as SeqNo, )); } } @@ -49,22 +59,14 @@ pub fn parse_message_id(id: &str) -> RedisResult<(Timestamp, SeqNo)> { Err(StreamErr::Backend(RedisErr::MessageId(id.to_owned()))) } +#[inline] pub(crate) fn get_message_id(header: &MessageHeader) -> MessageId { - ( - (header.timestamp().unix_timestamp_nanos() / 1_000_000) - .try_into() - .expect("RedisConsumer: timestamp out of range"), - (header.sequence() & 0xFFFF) - .try_into() - .expect("Never fails"), - ) + from_seq_no(*header.sequence()) } +#[inline] pub(crate) fn from_seq_no(seq_no: SeqNo) -> MessageId { - ( - (seq_no >> 16), - (seq_no & 0xFFFF).try_into().expect("Never fails"), - ) + ((seq_no >> 16) as u64, (seq_no & 0xFFFF) as u16) } /// A trait that adds some methods to [`RedisMessage`]. @@ -83,7 +85,7 @@ impl RedisMessageId for RedisMessage { // LOL such nesting. This is still undesirable, as there are 5 layers of nested Vec. But at least we don't have to copy the bytes again. impl StreamReadReply { /// Like [`redis::FromRedisValue`], but taking ownership instead of copying. - pub(crate) fn from_redis_value(value: Value) -> RedisResult { + pub(crate) fn from_redis_value(value: Value, ts_fmt: TsFmt) -> RedisResult { let mut messages = Vec::new(); if let Value::Bulk(values) = value { @@ -111,7 +113,7 @@ impl StreamReadReply { }; let stream_key = StreamKey::new(stream_key)?; if let Value::Bulk(values) = value_1 { - parse_messages(values, stream_key, shard, &mut messages)?; + parse_messages(values, stream_key, shard, &mut messages, ts_fmt)?; } } } @@ -126,6 +128,7 @@ impl AutoClaimReply { value: Value, stream_key: StreamKey, shard: ShardId, + ts_fmt: TsFmt, ) -> RedisResult { let mut messages = Vec::new(); if let Value::Bulk(values) = value { @@ -136,7 +139,7 @@ impl AutoClaimReply { _ = values.next().unwrap(); let value = values.next().unwrap(); if let Value::Bulk(values) = value { - parse_messages(values, stream_key, shard, &mut messages)?; + parse_messages(values, stream_key, shard, &mut messages, ts_fmt)?; } else { return Err(err(value)); } @@ -150,6 +153,7 @@ fn parse_messages( stream: StreamKey, shard: ShardId, messages: &mut Vec, + ts_fmt: TsFmt, ) -> RedisResult<()> { for value in values { if let Value::Bulk(values) = value { @@ -160,7 +164,7 @@ fn parse_messages( let value_0 = values.next().unwrap(); let value_1 = values.next().unwrap(); let id = string_from_redis_value(value_0)?; - let (timestamp, sequence) = parse_message_id(&id)?; + let (timestamp, sequence) = parse_message_id(ts_fmt, &id)?; if let Value::Bulk(values) = value_1 { assert!(values.len() % 2 == 0); let pairs = values.len() / 2; diff --git a/sea-streamer-redis/src/producer.rs b/sea-streamer-redis/src/producer.rs index e769390..53fa9ef 100644 --- a/sea-streamer-redis/src/producer.rs +++ b/sea-streamer-redis/src/producer.rs @@ -3,8 +3,8 @@ use redis::{aio::ConnectionLike, cmd as command, ErrorKind, Pipeline}; use std::{fmt::Debug, future::Future, sync::Arc, time::Duration}; use crate::{ - map_err, parse_message_id, string_from_redis_value, RedisCluster, RedisErr, RedisResult, MSG, - ZERO, + map_err, parse_message_id, string_from_redis_value, RedisCluster, RedisErr, RedisResult, + TimestampFormat, MSG, ZERO, }; use sea_streamer_runtime::{sleep, spawn_task}; use sea_streamer_types::{ @@ -25,6 +25,7 @@ pub struct RedisProducer { /// Options for Producers, including sharding. pub struct RedisProducerOptions { sharder: Option>, + pub(crate) timestamp_format: TimestampFormat, } impl Debug for RedisProducerOptions { @@ -188,6 +189,7 @@ pub(crate) async fn create_producer( cluster.reconnect_all().await?; // init connections let (sender, receiver) = unbounded(); let mut sharder = options.sharder.take().map(|a| a.init()); + let timestamp_format = options.timestamp_format; // Redis commands are exclusive (`&mut self`), so we need a producer task spawn_task(async move { @@ -227,7 +229,15 @@ pub(crate) async fn create_producer( }; let mut cmd = command("XADD"); cmd.arg(redis_key); - cmd.arg("*"); + match timestamp_format { + TimestampFormat::UnixTimestampMillis => cmd.arg("*"), + #[cfg(feature = "nanosecond-timestamp")] + TimestampFormat::UnixTimestampNanos => { + let ts = + format!("{}-*", Timestamp::now_utc().unix_timestamp_nanos()); + cmd.arg(&ts) + } + }; let msg = [(MSG, bytes)]; cmd.arg(&msg); let command = (redis_key.to_owned(), stream_key, shard, receipt); @@ -275,7 +285,8 @@ pub(crate) async fn create_producer( .zip(batch.0.iter()) .map(|(id, (_, stream_key, shard, _))| { match string_from_redis_value(id) { - Ok(id) => match parse_message_id(&id) { + Ok(id) => match parse_message_id(timestamp_format, &id) + { Ok((timestamp, sequence)) => { Ok(MessageHeader::new( stream_key.clone(), diff --git a/sea-streamer-redis/src/streamer.rs b/sea-streamer-redis/src/streamer.rs index 4e7862d..2c87fc9 100644 --- a/sea-streamer-redis/src/streamer.rs +++ b/sea-streamer-redis/src/streamer.rs @@ -24,6 +24,15 @@ pub struct RedisConnectOptions { timeout: Option, enable_cluster: bool, disable_hostname_verification: bool, + timestamp_format: TimestampFormat, +} + +#[derive(Debug, Default, Clone, Copy)] +pub enum TimestampFormat { + #[default] + UnixTimestampMillis, + #[cfg(feature = "nanosecond-timestamp")] + UnixTimestampNanos, } impl Streamer for RedisStreamer { @@ -63,8 +72,9 @@ impl Streamer for RedisStreamer { async fn create_generic_producer( &self, - options: Self::ProducerOptions, + mut options: Self::ProducerOptions, ) -> RedisResult { + options.timestamp_format = self.options.timestamp_format; let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?; create_producer(cluster, options).await } @@ -72,8 +82,9 @@ impl Streamer for RedisStreamer { async fn create_consumer( &self, streams: &[StreamKey], - options: Self::ConsumerOptions, + mut options: Self::ConsumerOptions, ) -> RedisResult { + options.timestamp_format = self.options.timestamp_format; let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?; create_consumer(cluster, options, streams.to_vec()).await } @@ -139,4 +150,14 @@ impl RedisConnectOptions { self.disable_hostname_verification = bool; self } + + pub fn timestamp_format(&self) -> TimestampFormat { + self.timestamp_format + } + /// Set timestamp format. i.e. the default timestamp format is milliseconds, + /// which is recommended by Redis. + pub fn set_timestamp_format(&mut self, fmt: TimestampFormat) -> &mut Self { + self.timestamp_format = fmt; + self + } } diff --git a/sea-streamer-redis/tests/realtime.rs b/sea-streamer-redis/tests/realtime.rs index d1442cd..371509b 100644 --- a/sea-streamer-redis/tests/realtime.rs +++ b/sea-streamer-redis/tests/realtime.rs @@ -26,6 +26,8 @@ async fn main() -> anyhow::Result<()> { let mut options = RedisConnectOptions::default(); options.set_enable_cluster(enable_cluster); + #[cfg(feature = "nanosecond-timestamp")] + options.set_timestamp_format(sea_streamer_redis::TimestampFormat::UnixTimestampNanos); let streamer = RedisStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "redis://localhost".to_owned()) diff --git a/sea-streamer-stdio/src/producer.rs b/sea-streamer-stdio/src/producer.rs index efc1661..81bc0d6 100644 --- a/sea-streamer-stdio/src/producer.rs +++ b/sea-streamer-stdio/src/producer.rs @@ -188,7 +188,7 @@ impl ProducerTrait for StdioProducer { MessageHeader::new( stream.to_owned(), ShardId::new(ZERO), - ZERO, + ZERO as SeqNo, Timestamp::now_utc(), ), payload.into_bytes(), diff --git a/sea-streamer-types/Cargo.toml b/sea-streamer-types/Cargo.toml index 2c5ac7f..1c91ca2 100644 --- a/sea-streamer-types/Cargo.toml +++ b/sea-streamer-types/Cargo.toml @@ -24,4 +24,5 @@ serde = { version = "1", default-features = false, optional = true, features = [ serde_json = { version = "1", optional = true } [features] -json = ["serde", "serde_json"] \ No newline at end of file +json = ["serde", "serde_json"] +wide-seq-no = [] \ No newline at end of file diff --git a/sea-streamer-types/src/stream.rs b/sea-streamer-types/src/stream.rs index 0bc15ab..0d0d1ac 100644 --- a/sea-streamer-types/src/stream.rs +++ b/sea-streamer-types/src/stream.rs @@ -26,7 +26,10 @@ pub struct ShardId { } /// The tuple (StreamKey, ShardId, SeqNo) uniquely identifies a message. Aka. offset. +#[cfg(not(feature = "wide-seq-no"))] pub type SeqNo = u64; +#[cfg(feature = "wide-seq-no")] +pub type SeqNo = u128; #[derive(Debug, Clone, Copy, PartialEq, Eq)] /// Identifies a position in a stream.