From 87b3a59f9efd8d69ef38e5110a155b1d2cd3c0db Mon Sep 17 00:00:00 2001 From: wangxye Date: Mon, 13 Nov 2023 19:21:24 +0800 Subject: [PATCH] fix: fix the fifo message of tag and order test Signed-off-by: wangxye --- .../rocketmq/broker/filter/push/TagFilterTest.java | 3 +++ .../rocketmq/broker/simple/SimpleOrderParamTest.java | 9 ++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/filter/push/TagFilterTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/filter/push/TagFilterTest.java index 467f0a1..0469ba0 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/filter/push/TagFilterTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/filter/push/TagFilterTest.java @@ -31,12 +31,14 @@ import org.apache.rocketmq.util.NameUtils; import org.apache.rocketmq.util.TestUtils; import org.apache.rocketmq.util.VerifyUtils; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +104,7 @@ public void testSndTagATagB_SubTagATagB() { // simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(receiveTag), Duration.ofSeconds(10)); // VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); producer = ProducerFactory.getRMQProducer(account, topic); + TestUtils.waitForSeconds(1); Assertions.assertNotNull(producer); producer.send(topic, sendTagA, SEND_NUM); producer.send(topic, sendTagB, SEND_NUM); diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java index 0366ddc..880acb6 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java @@ -59,6 +59,7 @@ public class SimpleOrderParamTest extends BaseOperate { private String tag; private String groupId; private final static int SEND_NUM = 20; + static final String PROPERTY_SHARDING_KEY = "__SHARDINGKEY"; @BeforeEach public void setUp() { @@ -82,10 +83,11 @@ public void testFIFO_simple_receive_nack() { VerifyUtils.tryReceiveOnce(consumer); RMQNormalProducer producer = ProducerFactory.getRMQProducer(account, topic); Assertions.assertNotNull(producer, "Get Producer failed"); - + String orderId = RandomUtils.getStringByUUID(); String messageGroup = RandomUtils.getStringByUUID(); for (int i = 0; i < SEND_NUM; i++) { Message message = MessageFactory.buildOrderMessage(topic, tag, String.valueOf(i), messageGroup); + message.getProperties().put(PROPERTY_SHARDING_KEY, orderId); producer.send(message); } TestUtils.waitForSeconds(1); @@ -193,14 +195,15 @@ public void testFIFO_simple_receive_multi_nack() { String groupId = getOrderlyGroupId(methodName); SimpleConsumer consumer = ConsumerFactory.getSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(consumer); +// VerifyUtils.tryReceiveOnce(consumer); RMQNormalProducer producer = ProducerFactory.getRMQProducer(account, topic); Assertions.assertNotNull(producer, "Get Producer failed"); - + String orderId = RandomUtils.getStringByUUID(); String messageGroup = RandomUtils.getStringByUUID(); for (int i = 0; i < SEND_NUM; i++) { System.out.printf("Producer send message %s%n", i); Message message = MessageFactory.buildOrderMessage(topic, tag, String.valueOf(i), messageGroup); + message.getProperties().put(PROPERTY_SHARDING_KEY, orderId); producer.send(message); } TestUtils.waitForSeconds(1);