Skip to content

Commit

Permalink
Check message too large goes back to queue
Browse files Browse the repository at this point in the history
In case manual acknowledgement is used.
  • Loading branch information
acogoluegnes committed Jan 10, 2024
1 parent 7d25259 commit adb9091
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 5 deletions.
2 changes: 0 additions & 2 deletions src/test/java/com/rabbitmq/client/test/BrokerTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ protected boolean isAutomaticRecoveryEnabled() {

@BeforeEach
public void setUp(TestInfo testInfo) throws IOException, TimeoutException {


Assumptions.assumeTrue(shouldRun());
this.testInfo = testInfo;
openConnection();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom
// Inc.
// and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -20,11 +22,13 @@

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

public class MaxInboundMessageSizeTest extends BrokerTestCase {

Expand Down Expand Up @@ -64,6 +68,60 @@ void maxInboundMessageSizeMustBeEnforced(int maxMessageSize, int frameMax, boole
byte[] body = new byte[maxMessageSize * 2];
ch.basicPublish("", q, null, body);
ch.waitForConfirmsOrDie();
AtomicReference<Throwable> channelException = new AtomicReference<>();
CountDownLatch channelErrorLatch = new CountDownLatch(1);
ch.addShutdownListener(
cause -> {
channelException.set(cause.getCause());
channelErrorLatch.countDown();
});
AtomicReference<Throwable> connectionException = new AtomicReference<>();
CountDownLatch connectionErrorLatch = new CountDownLatch(1);
c.addShutdownListener(
cause -> {
connectionException.set(cause.getCause());
connectionErrorLatch.countDown();
});
if (basicGet) {
try {
ch.basicGet(q, true);
} catch (Exception e) {
// OK for basicGet
}
} else {
ch.basicConsume(q, new DefaultConsumer(ch));
}
assertThat(channelErrorLatch).is(completed());
assertThat(channelException.get())
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Message body is too large");
assertThat(connectionErrorLatch).is(completed());
assertThat(connectionException.get())
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Message body is too large");
} finally {
safeClose(c);
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void largeMessageShouldGoBackToQueue(boolean basicGet) throws Exception {
int maxMessageSize = 5_000;
int maxFrameSize = maxMessageSize * 4;
ConnectionFactory cf = newConnectionFactory();
cf.setMaxInboundMessageBodySize(maxMessageSize);
cf.setRequestedFrameMax(maxFrameSize);
String messageId = UUID.randomUUID().toString();
Connection c = cf.newConnection();
try {
Channel ch = c.createChannel();
ch.confirmSelect();
AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
propsBuilder.messageId(messageId);
byte[] body = new byte[maxMessageSize * 2];
ch.basicPublish("", q, propsBuilder.build(), body);
ch.waitForConfirmsOrDie();
AtomicReference<Throwable> exception = new AtomicReference<>();
CountDownLatch errorLatch = new CountDownLatch(1);
ch.addShutdownListener(
Expand All @@ -73,12 +131,12 @@ void maxInboundMessageSizeMustBeEnforced(int maxMessageSize, int frameMax, boole
});
if (basicGet) {
try {
ch.basicGet(q, true);
ch.basicGet(q, false);
} catch (Exception e) {
// OK for basicGet
}
} else {
ch.basicConsume(q, new DefaultConsumer(ch));
ch.basicConsume(q, false, new DefaultConsumer(ch));
}
assertThat(errorLatch).is(completed());
assertThat(exception.get())
Expand All @@ -87,6 +145,26 @@ void maxInboundMessageSizeMustBeEnforced(int maxMessageSize, int frameMax, boole
} finally {
safeClose(c);
}

cf = newConnectionFactory();
cf.setMaxInboundMessageBodySize(maxMessageSize * 3);
cf.setRequestedFrameMax(maxFrameSize * 3);
try (Connection conn = cf.newConnection()) {
AtomicReference<String> receivedMessageId = new AtomicReference<>();
Channel ch = conn.createChannel();
CountDownLatch consumeLatch = new CountDownLatch(1);
ch.basicConsume(
q,
true,
(consumerTag, message) -> {
receivedMessageId.set(message.getProperties().getMessageId());
consumeLatch.countDown();
},
consumerTag -> {});

assertThat(consumeLatch).is(completed());
assertThat(receivedMessageId).hasValue(messageId);
}
}

@Override
Expand Down

0 comments on commit adb9091

Please sign in to comment.