Skip to content

Commit

Permalink
From Redis construct lossy
Browse files Browse the repository at this point in the history
  • Loading branch information
aamalev committed May 9, 2024
1 parent 6d0f90c commit db1a5ff
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions src/redis/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,26 +178,26 @@ impl RedisStream {
let mut headers: HashMap<String, Vec<u8>> =
redis::FromRedisValue::from_redis_value(&m).unwrap_or_default();
if let Some(topic) = headers.remove("topic") {
msg.topic = String::from_utf8(topic).unwrap_or_default();
msg.topic = String::from_utf8_lossy(&topic).to_string();
}
if let Some(key) = headers.remove("key") {
msg.key = String::from_utf8(key).unwrap_or_default();
msg.key = String::from_utf8_lossy(&key).to_string();
}
let mut data = prost_types::Any::default();
if let Some(ct) = headers.remove("content-type") {
data.type_url = String::from_utf8(ct).unwrap_or_default();
data.type_url = String::from_utf8_lossy(&ct).to_string();
}
if let Some(body) = headers.remove(&self.cfg.body_fieldname) {
data.value = body;
}
msg.data = Some(data);
for (k, v) in headers.into_iter() {
msg.headers
.insert(k, String::from_utf8(v).unwrap_or_default());
.insert(k, String::from_utf8_lossy(&v).to_string());
}
}
if let Some(redis::Value::Data(id)) = b.pop() {
msg_id = String::from_utf8(id).unwrap_or_default();
msg_id = String::from_utf8_lossy(&id).to_string();
self.order_id = msg_id.clone();
self.fail = 0;
}
Expand Down Expand Up @@ -363,8 +363,7 @@ impl RedisStreamQueue {
for stream_cfg in cfg.streams.iter() {
match stream_cfg {
Stream::String(s) => {
let stream =
RedisStream::new(connection.clone(), cfg.clone(), s.clone());
let stream = RedisStream::new(connection.clone(), cfg.clone(), s.clone());
ackers.insert(s.clone(), stream.clone());
writers.push_back(stream);
unack.insert(s.clone(), UnAck::new(name.clone(), ack_timeout.clone()));
Expand Down

0 comments on commit db1a5ff

Please sign in to comment.