diff --git a/sea-streamer-types/Cargo.toml b/sea-streamer-types/Cargo.toml index 1c91ca2..249db5b 100644 --- a/sea-streamer-types/Cargo.toml +++ b/sea-streamer-types/Cargo.toml @@ -20,9 +20,9 @@ futures = { version = "0.3", default-features = false, features = ["std", "alloc thiserror = { version = "1", default-features = false } time = { version = "0.3", default-features = false, features = ["std", "macros", "formatting"] } url = { version = "2.2", default-features = false } -serde = { version = "1", default-features = false, optional = true, features = ["derive"] } +serde = { version = "1", default-features = false, optional = true } serde_json = { version = "1", optional = true } [features] -json = ["serde", "serde_json"] +json = ["serde/derive", "serde_json"] wide-seq-no = [] \ No newline at end of file diff --git a/sea-streamer-types/src/message.rs b/sea-streamer-types/src/message.rs index 15b9759..fd2fb97 100644 --- a/sea-streamer-types/src/message.rs +++ b/sea-streamer-types/src/message.rs @@ -47,15 +47,6 @@ pub struct MessageHeader { timestamp: Timestamp, } -#[cfg(feature = "serde")] -#[derive(serde::Serialize)] -struct HeaderJson<'a> { - stream_key: &'a str, - shard_id: u64, - sequence: u64, - timestamp: String, -} - /// Common interface of byte containers. pub trait Buffer { fn size(&self) -> usize; @@ -367,12 +358,20 @@ impl<'a> IntoBytesOrStr<'a> for &'a [u8] { } } -#[cfg(feature = "serde")] +#[cfg(feature = "json")] impl serde::Serialize for MessageHeader { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { + #[derive(serde::Serialize)] + struct HeaderJson<'a> { + stream_key: &'a str, + shard_id: u64, + sequence: SeqNo, + timestamp: String, + } + HeaderJson { timestamp: self .timestamp diff --git a/sea-streamer-types/src/stream.rs b/sea-streamer-types/src/stream.rs index 0d0d1ac..084151d 100644 --- a/sea-streamer-types/src/stream.rs +++ b/sea-streamer-types/src/stream.rs @@ -95,3 +95,27 @@ pub fn is_valid_stream_key_char(c: char) -> bool { // https://stackoverflow.com/questions/37062904/what-are-apache-kafka-topic-name-limitations c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-') } + +#[cfg(feature = "json")] +mod impl_serde { + use super::StreamKey; + + impl<'de> serde::Deserialize<'de> for StreamKey { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = <&str>::deserialize(deserializer)?; + s.parse().map_err(serde::de::Error::custom) + } + } + + impl serde::Serialize for StreamKey { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(self.name()) + } + } +}