Skip to content

Commit

Permalink
reconnect on a AwaitPingResp error, too
Browse files Browse the repository at this point in the history
  • Loading branch information
paultag committed Oct 28, 2024
1 parent 9574d54 commit 788639d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
32 changes: 19 additions & 13 deletions bambulabs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,26 @@ impl Client {
let msg_opt = match ep.poll().await {
Ok(msg_opt) => msg_opt,
Err(err) => {
if let rumqttc::ConnectionError::MqttState(rumqttc::StateError::Io(err)) = err {
tracing::error!("Error polling for message: {:?}", err);
tracing::warn!("Reconnecting...");
// We are in a bad state and should reconnect.
let opts = Self::get_config(&self.ip, &self.access_code)?;
let (client, event_loop) = rumqttc::AsyncClient::new(opts, 25);
drop(ep);
self.client = Arc::new(client);
self.event_loop = Arc::new(Mutex::new(event_loop));
tracing::warn!("Reconnected.");
return Ok(());
match err {
rumqttc::ConnectionError::MqttState(rumqttc::StateError::Io(err)) => {
tracing::error!("Error polling for message: {:?}", err);

Check warning on line 96 in bambulabs/src/client.rs

View check run for this annotation

Codecov / codecov/patch

bambulabs/src/client.rs#L94-L96

Added lines #L94 - L96 were not covered by tests
}
rumqttc::ConnectionError::MqttState(rumqttc::StateError::AwaitPingResp) => {
tracing::error!("Error polling for message: AwaitPingResp");

Check warning on line 99 in bambulabs/src/client.rs

View check run for this annotation

Codecov / codecov/patch

bambulabs/src/client.rs#L99

Added line #L99 was not covered by tests
}
_ => {
tracing::error!("Error polling for message: {:?}; aborting", err);
return Ok(());

Check warning on line 103 in bambulabs/src/client.rs

View check run for this annotation

Codecov / codecov/patch

bambulabs/src/client.rs#L102-L103

Added lines #L102 - L103 were not covered by tests
}
}

tracing::error!("Error polling for message: {:?}", err);
tracing::warn!("Reconnecting...");

Check warning on line 106 in bambulabs/src/client.rs

View check run for this annotation

Codecov / codecov/patch

bambulabs/src/client.rs#L106

Added line #L106 was not covered by tests
// We are in a bad state and should reconnect.
let opts = Self::get_config(&self.ip, &self.access_code)?;
let (client, event_loop) = rumqttc::AsyncClient::new(opts, 25);
drop(ep);
self.client = Arc::new(client);
self.event_loop = Arc::new(Mutex::new(event_loop));
tracing::warn!("Reconnected.");

Check warning on line 113 in bambulabs/src/client.rs

View check run for this annotation

Codecov / codecov/patch

bambulabs/src/client.rs#L108-L113

Added lines #L108 - L113 were not covered by tests
return Ok(());
}
};
Expand Down
1 change: 1 addition & 0 deletions bambulabs/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) fn parse_message(message: &rumqttc::Event) -> Message {
let payload = publish.payload.clone();

if let Ok(payload) = std::str::from_utf8(&payload) {
tracing::warn!("{}", payload);

Check warning on line 11 in bambulabs/src/parser.rs

View check run for this annotation

Codecov / codecov/patch

bambulabs/src/parser.rs#L11

Added line #L11 was not covered by tests
match serde_json::from_str::<Message>(payload)
.map_err(|err| format_serde_error::SerdeError::new(payload.to_string(), err))
{
Expand Down

0 comments on commit 788639d

Please sign in to comment.