diff --git a/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java b/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java index c803a4a..42acfe5 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java @@ -185,8 +185,15 @@ public static void verifyOrderMessage(DataCollector enqueueMessages, public static void verifyDelayMessage(DataCollector enqueueMessages, DataCollector dequeueMessages, int delayTime) { //Check whether the consumption is complete - Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, - (TIMEOUT + delayTime) * 1000L, 1); + Collection unConsumedMessages = null; + if (delayTime == 0) { + unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, + (TIMEOUT / 2 + delayTime) * 1000L, 1); + } else { + unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, + (TIMEOUT + delayTime) * 1000L, 1); + } + if (unConsumedMessages.size() > 0) { Assertions.fail(String.format("The following %s messages are not consumed: %s", unConsumedMessages.size(), unConsumedMessages)); } diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java index 2a941e5..b623fab 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java @@ -146,7 +146,7 @@ public void testDelayTime15SecondsAgo() { producer = ProducerFactory.getRMQProducer(account, topic); Assertions.assertNotNull(producer, "Get Producer failed"); for (int i = 0; i < SEND_NUM; i++) { - Message message = MessageFactory.buildDelayMessage(topic, tag, RandomUtils.getStringByUUID(), System.currentTimeMillis() - 5 * 1000); + Message message = MessageFactory.buildDelayMessage(topic, tag, RandomUtils.getStringByUUID(), System.currentTimeMillis()); producer.send(message); } Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed"); diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleParamTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleParamTest.java index f20ab80..d5acf66 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleParamTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleParamTest.java @@ -131,7 +131,7 @@ public void test_waitAckException_reReceive_ack() { if (messageViews.size() > 0) { for (MessageView messageView : messageViews) { log.info("MessageId:{}, Body:{}, tag:{}, property:{}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody()).toString(), messageView.getTag().get(), messageView.getProperties()); - TestUtils.waitForSeconds(11); + TestUtils.waitForSeconds(15); consumer.ack(messageView); Assertions.fail("Calling changeInvisibleDuration after ack fails with an INVALID_RECEIPT_HANDLE error"); }