Skip to content

Commit

Permalink
assorted cleanups and fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <[email protected]>
  • Loading branch information
Keruspe committed Sep 5, 2024
1 parent 91d2865 commit ab38c3d
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
8 changes: 4 additions & 4 deletions src/acknowledgement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ impl Acknowledgements {
self.0.lock().on_channel_error(error);
}

pub(crate) fn reset(&self) {
self.0.lock().reset();
pub(crate) fn reset(&self, error: Error) {
self.0.lock().reset(error);
}
}

Expand Down Expand Up @@ -179,8 +179,8 @@ impl Inner {
}
}

fn reset(&mut self) {
// FIXME(recovery): handle pendings ??
fn reset(&mut self, error: Error) {
self.delivery_tag = IdSequence::new(false);
self.on_channel_error(error);
}
}
16 changes: 9 additions & 7 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,6 @@ impl Channel {
self.error_handler.set_handler(handler);
}

pub(crate) fn reset(&self) {
// FIXME
}

pub(crate) async fn restore(
&self,
ch: &ChannelDefinitionInternal,
Expand Down Expand Up @@ -576,7 +572,9 @@ impl Channel {
}

fn on_channel_close_ok_sent(&self, error: Option<Error>) {
if !self.recovery_config.auto_recover_channels || !error.as_ref().map_or(false, Error::is_amqp_soft_error) {
if !self.recovery_config.auto_recover_channels
|| !error.as_ref().map_or(false, Error::is_amqp_soft_error)
{
self.set_closed(
error
.clone()
Expand Down Expand Up @@ -870,8 +868,9 @@ impl Channel {
if self.recovery_config.auto_recover_channels {
self.status.update_recovery_context(|ctx| {
ctx.set_expected_replies(self.frames.take_expected_replies(self.id));
self.acknowledgements.reset(ctx.cause());
self.consumers.error(ctx.cause());
});
self.acknowledgements.reset();
if !self.status.confirm() {
self.status.finalize_recovery();
}
Expand Down Expand Up @@ -913,7 +912,10 @@ impl Channel {
);
Error::ProtocolError(error)
});
match (self.recovery_config.auto_recover_channels, error.clone().ok()) {
match (
self.recovery_config.auto_recover_channels,
error.clone().ok(),
) {
(true, Some(error)) if error.is_amqp_soft_error() => {
self.status.set_reconnecting(error)
}
Expand Down
4 changes: 4 additions & 0 deletions src/channel_recovery_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ impl ChannelRecoveryContext {
}
}

pub(crate) fn cause(&self) -> Error {
self.cause.clone()
}

pub(crate) fn notifier(&self) -> Notifier {
self.notifier.clone()
}
Expand Down
1 change: 0 additions & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ impl Connection {
.channels
.push(RestoredChannel::new(if let Some(c) = c.channel.clone() {
let channel = c.clone();
c.reset();
c.channel_open(channel).await?
} else {
self.create_channel().await?
Expand Down

0 comments on commit ab38c3d

Please sign in to comment.