diff --git a/sea-streamer-socket/src/connect_options.rs b/sea-streamer-socket/src/connect_options.rs index 2eca86f..7d60421 100644 --- a/sea-streamer-socket/src/connect_options.rs +++ b/sea-streamer-socket/src/connect_options.rs @@ -74,7 +74,16 @@ impl ConnectOptions for SeaConnectOptions { type Error = BackendErr; fn timeout(&self) -> SeaResult { - self.stdio.timeout().map_err(map_err) + #![allow(unreachable_code)] + + #[cfg(feature = "backend-kafka")] + return self.kafka.timeout().map_err(map_err); + #[cfg(feature = "backend-redis")] + return self.redis.timeout().map_err(map_err); + #[cfg(feature = "backend-stdio")] + return self.stdio.timeout().map_err(map_err); + #[cfg(feature = "backend-file")] + return self.file.timeout().map_err(map_err); } fn set_timeout(&mut self, d: Duration) -> SeaResult<&mut Self> { diff --git a/sea-streamer-socket/src/consumer_options.rs b/sea-streamer-socket/src/consumer_options.rs index cec5948..547110a 100644 --- a/sea-streamer-socket/src/consumer_options.rs +++ b/sea-streamer-socket/src/consumer_options.rs @@ -117,12 +117,30 @@ impl ConsumerOptions for SeaConsumerOptions { /// Get currently set ConsumerMode fn mode(&self) -> SeaResult<&ConsumerMode> { - self.stdio.mode().map_err(map_err) + #![allow(unreachable_code)] + + #[cfg(feature = "backend-kafka")] + return self.kafka.mode().map_err(map_err); + #[cfg(feature = "backend-redis")] + return self.redis.mode().map_err(map_err); + #[cfg(feature = "backend-stdio")] + return self.stdio.mode().map_err(map_err); + #[cfg(feature = "backend-file")] + return self.file.mode().map_err(map_err); } /// Get currently set consumer group; may return `StreamErr::ConsumerGroupNotSet`. fn consumer_group(&self) -> SeaResult<&ConsumerGroup> { - self.stdio.consumer_group().map_err(map_err) + #![allow(unreachable_code)] + + #[cfg(feature = "backend-kafka")] + return self.kafka.consumer_group().map_err(map_err); + #[cfg(feature = "backend-redis")] + return self.redis.consumer_group().map_err(map_err); + #[cfg(feature = "backend-stdio")] + return self.stdio.consumer_group().map_err(map_err); + #[cfg(feature = "backend-file")] + return self.file.consumer_group().map_err(map_err); } /// Set consumer group for this consumer. Note the semantic is implementation-specific.