Skip to content

Commit

Permalink
fix: unify the time wait for consumer in delay
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye committed Nov 13, 2023
1 parent 3ece6b6 commit f546ba9
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
11 changes: 9 additions & 2 deletions java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,15 @@ public static void verifyOrderMessage(DataCollector<Object> enqueueMessages,
public static void verifyDelayMessage(DataCollector<Object> enqueueMessages,
DataCollector<Object> dequeueMessages, int delayTime) {
//Check whether the consumption is complete
Collection<Object> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages,
(TIMEOUT + delayTime) * 1000L, 1);
Collection<Object> unConsumedMessages = null;
if (delayTime == 0) {
unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages,
(TIMEOUT / 2 + delayTime) * 1000L, 1);
} else {
unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages,
(TIMEOUT + delayTime) * 1000L, 1);
}

if (unConsumedMessages.size() > 0) {
Assertions.fail(String.format("The following %s messages are not consumed: %s", unConsumedMessages.size(), unConsumedMessages));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void testDelayTime15SecondsAgo() {
producer = ProducerFactory.getRMQProducer(account, topic);
Assertions.assertNotNull(producer, "Get Producer failed");
for (int i = 0; i < SEND_NUM; i++) {
Message message = MessageFactory.buildDelayMessage(topic, tag, RandomUtils.getStringByUUID(), System.currentTimeMillis() - 5 * 1000);
Message message = MessageFactory.buildDelayMessage(topic, tag, RandomUtils.getStringByUUID(), System.currentTimeMillis());
producer.send(message);
}
Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void test_waitAckException_reReceive_ack() {
if (messageViews.size() > 0) {
for (MessageView messageView : messageViews) {
log.info("MessageId:{}, Body:{}, tag:{}, property:{}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody()).toString(), messageView.getTag().get(), messageView.getProperties());
TestUtils.waitForSeconds(11);
TestUtils.waitForSeconds(15);
consumer.ack(messageView);
Assertions.fail("Calling changeInvisibleDuration after ack fails with an INVALID_RECEIPT_HANDLE error");
}
Expand Down

0 comments on commit f546ba9

Please sign in to comment.