Skip to content

Commit

Permalink
Make sure unsettled message goes back to queue on connection closing
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Sep 27, 2024
1 parent 7891645 commit c9501a8
Showing 1 changed file with 57 additions and 5 deletions.
62 changes: 57 additions & 5 deletions src/test/java/com/rabbitmq/client/amqp/impl/AmqpConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
// [email protected].
package com.rabbitmq.client.amqp.impl;

import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
import static com.rabbitmq.client.amqp.Resource.State.OPEN;
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
import static com.rabbitmq.client.amqp.Resource.State.*;
import static com.rabbitmq.client.amqp.impl.Assertions.*;
import static com.rabbitmq.client.amqp.impl.Cli.closeConnection;
import static com.rabbitmq.client.amqp.impl.TestUtils.name;
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
import static org.assertj.core.api.Assertions.*;

import com.rabbitmq.client.amqp.*;
Expand All @@ -41,6 +40,7 @@
@ExtendWith(AmqpTestInfrastructureExtension.class)
public class AmqpConsumerTest {

// used by the test extension
BackOffDelayPolicy backOffDelayPolicy = BackOffDelayPolicy.fixed(Duration.ofMillis(100));
Environment environment;
Connection connection;
Expand All @@ -50,7 +50,6 @@ public class AmqpConsumerTest {
@BeforeEach
void init(TestInfo info) {
this.q = name(info);
connection.management().queue(this.q).type(STREAM).declare();
this.connectionName = ((AmqpConnection) connection).name();
}

Expand All @@ -61,6 +60,7 @@ void tearDown() {

@Test
void subscriptionListenerShouldBeCalledOnRecovery() {
connection.management().queue(this.q).type(STREAM).declare();
Sync subscriptionSync = sync();
Sync recoveredSync = sync();
connection
Expand All @@ -81,6 +81,7 @@ void subscriptionListenerShouldBeCalledOnRecovery() {

@Test
void streamConsumerRestartsWhereItLeftOff() {
connection.management().queue(this.q).type(STREAM).declare();
Connection publisherConnection = environment.connectionBuilder().build();
Publisher publisher = publisherConnection.publisherBuilder().queue(this.q).build();
int messageCount = 100;
Expand Down Expand Up @@ -159,6 +160,57 @@ void streamConsumerRestartsWhereItLeftOff() {
assertThat(lastOffsetProcessed).hasValueGreaterThan(offsetAfterClosing);
}

@Test
void unsettledMessageShouldGoBackToQueueIfConnectionIsClosed(TestInfo testInfo) {
String cName = name(testInfo);
connection.management().queue(this.q).type(QUORUM).declare();
Sync connectionRecoveredSync = sync();
Connection c =
((AmqpConnectionBuilder) environment.connectionBuilder())
.name(cName)
.recovery()
.backOffDelayPolicy(backOffDelayPolicy)
.connectionBuilder()
.listeners(recoveredListener(connectionRecoveredSync))
.build();
Publisher publisher = c.publisherBuilder().queue(this.q).build();

Sync deliveredSync = sync(2);
Sync consumerClosedSync = sync();
AtomicInteger deliveryCount = new AtomicInteger();
c.consumerBuilder()
.listeners(
ctx -> {
if (ctx.currentState() == CLOSED) {
consumerClosedSync.down();
}
})
.queue(this.q)
.messageHandler(
(ctx, msg) -> {
if (deliveryCount.incrementAndGet() == 1) {
closeConnection(cName);
}
deliveredSync.down();
})
.build();

publisher.publish(publisher.message(), ctx -> {});

assertThat(deliveredSync).completes();
assertThat(deliveryCount).hasValue(2);
assertThat(connectionRecoveredSync).completes();
assertThat(consumerClosedSync).hasNotCompleted();
c.close();
assertThat(consumerClosedSync).completes();

waitAtMost(
() -> {
Management.QueueInfo info = connection.management().queueInfo(this.q);
return info.messageCount() == 1 && info.consumerCount() == 0;
});
}

private static Resource.StateListener recoveredListener(Sync sync) {
return context -> {
if (context.previousState() == RECOVERING && context.currentState() == OPEN) {
Expand Down

0 comments on commit c9501a8

Please sign in to comment.