diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/client/callback/RMQSendCallBack.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/client/callback/RMQSendCallBack.java index 310ea79..370a7c4 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/client/callback/RMQSendCallBack.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/client/callback/RMQSendCallBack.java @@ -49,6 +49,7 @@ public void onSuccess(SendResult sendResult) { public void onException(Throwable e) { //logger.warn("{} callback message failed: {} exception: {}", context.getTopic(), context.getMessageId(), context.getException()); bFailResponse = true; + logger.warn("callback message failed: {}", e); } public void waitResponse() { diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/client/rmq/RMQNormalProducer.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/client/rmq/RMQNormalProducer.java index 59ec091..aeef5c7 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/client/rmq/RMQNormalProducer.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/client/rmq/RMQNormalProducer.java @@ -40,6 +40,7 @@ public class RMQNormalProducer extends AbstractMQProducer { private static Logger logger = LoggerFactory.getLogger(RMQNormalProducer.class); private DefaultMQProducer producer; + static final String PROPERTY_SHARDING_KEY = "__SHARDINGKEY"; public RMQNormalProducer(DefaultMQProducer producer) { this.producer = producer; } @@ -115,9 +116,11 @@ public void sendWithQueue(List mqs, int messageNum) { */ public void sendWithQueue(List mqs, String tag, int messageNum) { logger.info("Producer start to send messages"); + String orderId = "biz_" + 0; for (MessageQueue mq : mqs) { for (int i = 0; i < messageNum; i++) { Message message = MessageFactory.buildOneMessageWithTagAndBody(mq.getTopic(), tag, String.valueOf(i)); + message.putUserProperty(PROPERTY_SHARDING_KEY, orderId); try { SendResult sendResult = producer.send(message, mq); MessageExt messageExt = new MessageExt(); @@ -141,9 +144,11 @@ public void sendWithQueue(List mqs, String tag, int messageNum) { */ public void sendWithQueue(List mqs, int messageNum, String tag) { logger.info("Producer start to send messages"); + String orderId = "biz_" + 0; for (MessageQueue mq : mqs) { for (int i = 0; i < messageNum; i++) { Message message = MessageFactory.buildOneMessageWithTagAndBody(mq.getTopic(), tag, String.valueOf(i)); + message.putUserProperty(PROPERTY_SHARDING_KEY, orderId); try { SendResult sendResult = producer.send(message, mq); MessageExt messageExt = new MessageExt(); @@ -152,6 +157,7 @@ public void sendWithQueue(List mqs, int messageNum, String tag) { logger.info("{}, index: {}, tag: {}", sendResult, i, tag); this.enqueueMessages.addData(messageExt); } catch (Exception e) { + System.out.printf("send message failed %s", e); logger.error("DefaultMQProducer send message failed"); } } diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java index 58abf41..a89c31a 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java @@ -99,6 +99,19 @@ protected static String getGroupId(String methodName, SubscriptionMode mode) { return groupId; } + protected static String getOrderlyGroupId(String methodName, SubscriptionMode mode) { + String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6)); + CreateGroupRequest request = CreateGroupRequest.newBuilder() + .setName(groupId) + .setMaxDeliveryAttempt(16) + .setGroupType(GroupType.GROUP_TYPE_FIFO) + .setSubMode(mode) + .build(); + CreateGroupReply reply = createConsumerGroup(request).join(); + logger.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply); + return groupId; + } + private static CompletableFuture createConsumerGroup(CreateGroupRequest request) { try { CompletableFuture groupCf = client.createGroup(namesrvAddr, request); diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/cluster/ClusterTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/cluster/ClusterTest.java index 7bfa574..3550f33 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/cluster/ClusterTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/cluster/ClusterTest.java @@ -17,13 +17,9 @@ package org.apache.rocketmq.cluster; -import apache.rocketmq.controller.v1.SubscriptionMode; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.client.rmq.RMQNormalProducer; import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.enums.TESTSET; import org.apache.rocketmq.factory.ConsumerFactory; import org.apache.rocketmq.factory.ProducerFactory; @@ -32,14 +28,10 @@ import org.apache.rocketmq.utils.NameUtils; import org.apache.rocketmq.utils.RandomUtils; import org.apache.rocketmq.utils.VerifyUtils; -import org.awaitility.Awaitility; import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Field; -import java.util.Set; -import java.util.concurrent.TimeUnit; @Tag(TESTSET.MODEL) public class ClusterTest extends BaseOperate { diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullOrderParamTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullOrderParamTest.java index bdea36a..6e54f1c 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullOrderParamTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullOrderParamTest.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.pull; +import apache.rocketmq.controller.v1.MessageType; +import apache.rocketmq.controller.v1.SubscriptionMode; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -68,8 +70,8 @@ public void tearDown() { @DisplayName("When sending 20 sequential messages synchronously using the same MessageQueue, PullConsumer normally receives messages, but does not ack messages, and keeps the sequence; the messages are stuck at the first") public void testFIFO_pull_receive_nack() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); - String groupId = getGroupId(methodName); + String topic = getTopic(MessageType.FIFO, methodName); + String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL); RMQNormalConsumer consumer = ConsumerFactory.getRMQPullConsumer(namesrvAddr, groupId, rpcHook); consumer.startDefaultPull(); diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullOrderTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullOrderTest.java index 5d25fca..6bf1b40 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullOrderTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullOrderTest.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.pull; +import apache.rocketmq.controller.v1.MessageType; +import apache.rocketmq.controller.v1.SubscriptionMode; import org.apache.rocketmq.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.client.rmq.RMQNormalProducer; import org.apache.rocketmq.common.message.MessageQueue; @@ -54,9 +56,8 @@ public void setUp() { @DisplayName("Send 20 sequential messages synchronously, and expect PullConsumer to receive and ack messages properly and maintain the sequence") public void testFIFO_simple_receive_ack() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - - String topic = getTopic(methodName); - String groupId = getGroupId(methodName); + String topic = getTopic(MessageType.FIFO, methodName); + String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL); RMQNormalConsumer consumer = ConsumerFactory.getRMQPullConsumer(namesrvAddr, groupId, rpcHook); consumer.startDefaultPull(); diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/abnormal/PushConsumerRetryTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/abnormal/PushConsumerRetryTest.java index 9e3010a..57a7918 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/abnormal/PushConsumerRetryTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/abnormal/PushConsumerRetryTest.java @@ -33,6 +33,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import apache.rocketmq.controller.v1.MessageType; +import apache.rocketmq.controller.v1.SubscriptionMode; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -54,6 +56,7 @@ import org.apache.rocketmq.utils.NameUtils; import org.apache.rocketmq.utils.RandomUtils; import org.apache.rocketmq.utils.TestUtils; +import org.apache.rocketmq.utils.VerifyUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -67,7 +70,6 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; -@Disabled @Tag(TESTSET.RETRY) public class PushConsumerRetryTest extends BaseOperate { private final Logger log = LoggerFactory.getLogger(PushConsumerRetryTest.class); @@ -79,6 +81,7 @@ public void setUp() { tag = NameUtils.getRandomTagName(); } + @Disabled @Test @Timeout(value = 180, unit = TimeUnit.SECONDS) @DisplayName("Send normal messages, set the maximum number of retries and set the received messages to RECONSUME_LATER. The expected retry time is about 10s for the first time and about 30s for the second time") @@ -104,7 +107,7 @@ public void testNormalMessageReconsumeTime() { pushConsumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { if (msg.getReconsumeTimes() == 0) { msgsReConsumeTime.putIfAbsent(msg.getMsgId(), Instant.now()); @@ -141,6 +144,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } + log.info(String.format("recv msgid(success) %s ", msgs.get(0).getMsgId())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } @@ -151,13 +155,16 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, } Assertions.assertNotNull(producer, "Get Producer Failed"); - for (int i = 0; i < SEND_NUM; i++) { - Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i)); - producer.send(message); - } +// for (int i = 0; i < SEND_NUM; i++) { +// Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i)); +// producer.send(message); +// System.out.printf("send message %s", message); +// } + producer.send(topic, tag, SEND_NUM); + Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed"); - await().atMost(120, SECONDS).until(new Callable() { + await().atMost(240, SECONDS).until(new Callable() { @Override public Boolean call() throws Exception { boolean flag = true; @@ -180,8 +187,8 @@ public Boolean call() throws Exception { @DisplayName("Send order messages, set the maximum number of retries and set the received messages to SUSPEND_CURRENT_QUEUE_A_MOMENT. The expected retry time is about 1s") public void testOrderMessageReconsumeTime() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); - String groupId = getGroupId(methodName); + String topic = getTopic(MessageType.FIFO, methodName); + String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); Map msgsReConsumeTime = new ConcurrentHashMap<>(); @@ -247,6 +254,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, } Assertions.assertNotNull(producer, "Get Producer Failed"); + List mqs = producer.fetchPublishMessageQueues(topic); List sendMqs = new ArrayList<>(); sendMqs.add(mqs.get(0)); @@ -271,6 +279,7 @@ public Boolean call() throws Exception { pushConsumer.shutdown(); } + @Disabled @Test @Timeout(value = 180, unit = TimeUnit.SECONDS) @DisplayName("Send normal messages, set the maximum consumption to 0(The first retry is 10 seconds, and the setting is 15 seconds. Then check whether the retry occurs), and set the message reception to RECONUME_LATER, expecting that the retry will not occur") @@ -313,10 +322,11 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, } Assertions.assertNotNull(producer, "Get Producer Failed"); - for (int i = 0; i < SEND_NUM; i++) { - Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i)); - producer.send(message); - } +// for (int i = 0; i < SEND_NUM; i++) { +// Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i)); +// producer.send(message); +// } + producer.send(topic, tag, SEND_NUM); Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed"); TestUtils.waitForSeconds(15); @@ -333,8 +343,8 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, @DisplayName("Send one order message, set the maximum consumption to 0(The retry time of each message is 1s. Then check whether the retry occurs), and set the message reception to SUSPEND_CURRENT_QUEUE_A_MOMENT, expecting that the retry will not occur") public void testOrderMessageRetryTimesSetting() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); - String groupId = getGroupId(methodName); + String topic = getTopic(MessageType.FIFO, methodName); + String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); Map msgsReConsumeTimes = new ConcurrentHashMap<>(); @@ -392,8 +402,8 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, @DisplayName("Send order messages, set the maximum consumption to 30, and set the message reception to SUSPEND_CURRENT_QUEUE_A_MOMENT, expecting that the received messages's reconsume time will be equal to 30 in 2 minutes") public void testOrderMessageRetryTimesWith30() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); - String groupId = getGroupId(methodName); + String topic = getTopic(MessageType.FIFO, methodName); + String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); Map msgsReConsumeTimes = new ConcurrentHashMap<>(); @@ -448,8 +458,8 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, @DisplayName("To send order messages, set SUSPEND_CURRENT_QUEUE_A_MOMENT for the first message. The next sequential message will not be consumed until the first message is in the dead letter queue. All messages except the first message are expected to be received") public void testOrderMessageRetryTimesWithMaxReconsumeimes() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); - String groupId = getGroupId(methodName); + String topic = getTopic(MessageType.FIFO, methodName); + String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); Map msgsReConsumeTimes = new ConcurrentHashMap<>(); @@ -502,6 +512,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, pushConsumer.shutdown(); } + @Disabled @Test @Timeout(value = 180, unit = TimeUnit.SECONDS) @DisplayName("Simulate pushconsumer consumption fail, expect that the original message was not received, and capture all messages after message retry") @@ -566,6 +577,7 @@ public Boolean call() throws Exception { pushConsumer.shutdown(); } + @Disabled @Test @Timeout(value = 180, unit = TimeUnit.SECONDS) @DisplayName("Simulate pushconsumer consumption return null, expect that the original message was not received, and capture all messages after message retry") @@ -591,7 +603,7 @@ public void testNullConsumption() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { - if (msg.getReconsumeTimes() == 2) { + if (msg.getReconsumeTimes() == 1) { retryMsgs.putIfAbsent(msg.getMsgId(), msg); log.info("consume success: {}", msg); } else { @@ -599,7 +611,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, log.info("{}", "Simulate consuming operations return null"); firstMsgs.putIfAbsent(msg.getMsgId(), msg); log.info(String.format("recv msg(null) %s ", msg)); - return null; + return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } @@ -608,18 +620,21 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, }); pushConsumer.start(); } catch (MQClientException e) { + System.out.printf("exception: %s%n", e); Assertions.fail(e.getMessage()); } Assertions.assertNotNull(producer, "Get Producer Failed"); for (int i = 0; i < SEND_NUM; i++) { Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i)); producer.send(message); + System.out.printf("send message: %s%n", message); } Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed"); await().atMost(120, SECONDS).until(new Callable() { @Override public Boolean call() throws Exception { + System.out.printf("retryMsgs size: %s; firstMsgs size: %s", retryMsgs.size(), firstMsgs.size()); return retryMsgs.size() == SEND_NUM && firstMsgs.size() == SEND_NUM; } }); @@ -693,6 +708,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, pushConsumer.shutdown(); } + @Disabled @Test @Timeout(value = 300, unit = TimeUnit.SECONDS) @DisplayName("The normal message is sent, and after the PushConsumer retry, the retry message is expected to be consumed") @@ -759,9 +775,8 @@ public Boolean call() throws Exception { @DisplayName("The send order message, after the PushConsumer retry, is expected to consume the retry message, and the message consumption order and send order") public void testFiFoTopicPushConsumerRetry() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); - String groupId = getGroupId(methodName); - + String topic = getTopic(MessageType.FIFO, methodName); + String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); Assertions.assertNotNull(producer, "Get producer failed"); Vector recvMessages = new Vector<>(); diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/batch/BatchProducerTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/batch/BatchProducerTest.java index 9bc0d40..3609c0c 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/batch/BatchProducerTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/batch/BatchProducerTest.java @@ -80,6 +80,7 @@ public void testBatchProducer() { try { producer.getProducer().send(messages); } catch (Exception e) { + System.out.printf("send message error: %s", e.getMessage()); Assertions.fail(e.getMessage()); } @@ -114,6 +115,7 @@ public void testBatchProducer_queue() { try { producer.getProducer().send(messages, msgQueues.get(0)); } catch (Exception e) { + System.out.printf("send message error: %s", e.getMessage()); Assertions.fail(e.getMessage()); } diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/delay/DelayMessageTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/delay/DelayMessageTest.java index 03f63a1..d5a0ed6 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/delay/DelayMessageTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/delay/DelayMessageTest.java @@ -45,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; -@Disabled @Tag(TESTSET.SMOKE) @Tag(TESTSET.DELAY) public class DelayMessageTest extends BaseOperate { @@ -61,7 +60,9 @@ public void setUp() { topic = NameUtils.getTopicName(); tag = NameUtils.getTagName(); groupId = NameUtils.getGroupName(); - MQAdmin.createTopic(namesrvAddr, cluster, topic, 8); +// MQAdmin.createTopic(namesrvAddr, cluster, topic, 8); + getTopic(topic); + getGroupId(groupId); logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId); } @@ -102,7 +103,6 @@ public void testDelayLevel4() { consumer.shutdown(); } - @Disabled @Test @DisplayName("Send one delay message and set the delay test negative delay level, expecting message building wrong") public void testNegativeDelayLevel() { @@ -119,7 +119,6 @@ public void testNegativeDelayLevel() { producer.shutdown(); } - @Disabled @Test @DisplayName("Send one delay message and set the delay test delay level=19, expecting message building wrong") public void testDelayLevelWith19() { diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/normal/NormalMessageTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/normal/NormalMessageTest.java index 2445780..487099b 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/normal/NormalMessageTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/normal/NormalMessageTest.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.server.normal; +import apache.rocketmq.controller.v1.MessageType; +import apache.rocketmq.controller.v1.SubscriptionMode; import org.apache.rocketmq.client.callback.RMQSendCallBack; import org.apache.rocketmq.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.client.rmq.RMQNormalProducer; @@ -37,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Disabled @Tag(TESTSET.NORMAL) @Tag(TESTSET.SMOKE) public class NormalMessageTest extends BaseOperate { @@ -49,17 +50,22 @@ public class NormalMessageTest extends BaseOperate { @BeforeEach public void setUp() { - topic = NameUtils.getTopicName(); +// topic = NameUtils.getTopicName(); +// groupId = NameUtils.getGroupName(); +// MQAdmin.createTopic(namesrvAddr, cluster, topic, 8); +// MQAdmin.createSub(namesrvAddr, cluster, groupId); tag = NameUtils.getTagName(); - groupId = NameUtils.getGroupName(); - MQAdmin.createTopic(namesrvAddr, cluster, topic, 8); - MQAdmin.createSub(namesrvAddr, cluster, groupId); logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId); } @Test @DisplayName("Send 10 normal messages synchronously, expecting all to be consumed") public void testConsumeNormalMessage() { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + + String topic = getTopic(MessageType.NORMAL, methodName); + String groupId = getGroupId(methodName); + RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); consumer.subscribeAndStart(topic, tag, new RMQNormalListener()); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr,rpcHook); @@ -73,6 +79,11 @@ public void testConsumeNormalMessage() { @Test @DisplayName("Send 10 normal messages asynchronously, expecting all to be consumed") public void testConsumeNormalMessageAndSendWithAsync() { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + + String topic = getTopic(MessageType.NORMAL, methodName); + String groupId = getGroupId(methodName); + RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); consumer.subscribeAndStart(topic, tag, new RMQNormalListener()); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr,rpcHook); @@ -86,6 +97,11 @@ public void testConsumeNormalMessageAndSendWithAsync() { @Test @DisplayName("Send 10 normal messages in OneWay, expecting all to be consumed") public void testConsumeNormalMessageAndSendWithOneWay() { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + + String topic = getTopic(MessageType.NORMAL, methodName); + String groupId = getGroupId(methodName); + RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); consumer.subscribeAndStart(topic, tag, new RMQNormalListener()); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr,rpcHook); diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/order/OrderMessageTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/order/OrderMessageTest.java index 18cd827..f53a9d7 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/order/OrderMessageTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/order/OrderMessageTest.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.server.order; +import apache.rocketmq.controller.v1.MessageType; +import apache.rocketmq.controller.v1.SubscriptionMode; import org.apache.rocketmq.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.client.rmq.RMQNormalProducer; import org.apache.rocketmq.common.message.MessageQueue; @@ -39,7 +41,6 @@ import java.util.List; -@Disabled @Tag(TESTSET.SMOKE) @Tag(TESTSET.ORDER) public class OrderMessageTest extends BaseOperate { @@ -51,10 +52,10 @@ public class OrderMessageTest extends BaseOperate { @BeforeEach public void setUp() { - topic = NameUtils.getTopicName(); +// topic = NameUtils.getTopicName(); +// groupId = NameUtils.getGroupName(); +// MQAdmin.createTopic(namesrvAddr, cluster, topic, 8); tag = NameUtils.getTagName(); - groupId = NameUtils.getGroupName(); - MQAdmin.createTopic(namesrvAddr, cluster, topic, 8); logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId); } @@ -66,6 +67,11 @@ public void tearDown() { @Test @DisplayName("Thirty messages are sent to each of the eight queues in a topic, with the expectation that the sequential consumption client will consume the messages in each queue in order") public void testConsumePartitionOrderMessage() { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + + String topic = getTopic(MessageType.FIFO, methodName); + String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_POP); + RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); consumer.subscribeAndStart(topic, "*", new RMQOrderListener()); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); @@ -80,6 +86,11 @@ public void testConsumePartitionOrderMessage() { @Test @DisplayName("100 messages are sent to a queue for a topic, with the expectation that the sequential consuming client will consume the messages in the queue in order") public void testConsumeGlobalOrderMessage() { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + + String topic = getTopic(MessageType.FIFO, methodName); + String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_POP); + RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); consumer.subscribeAndStart(topic, "*", new RMQOrderListener()); RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java index 7f6aeee..65a9430 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java @@ -26,7 +26,6 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.enums.TESTSET; import org.apache.rocketmq.factory.ConsumerFactory; -import org.apache.rocketmq.factory.MessageFactory; import org.apache.rocketmq.factory.ProducerFactory; import org.apache.rocketmq.frame.BaseOperate; import org.apache.rocketmq.listener.rmq.concurrent.RMQNormalListener; @@ -46,7 +45,6 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService;