From 4eacdcbdde79e349b347c0c32ad9f4f7eb630b70 Mon Sep 17 00:00:00 2001 From: wangxye Date: Wed, 6 Dec 2023 14:19:58 +0800 Subject: [PATCH] feat: support trans message e2e test Signed-off-by: wangxye --- .../concurrent/TransactionListenerImpl.java | 1 + .../client/message/NormalMessageSizeTest.java | 2 -- .../transaction/TransactionMessageTest.java | 22 ++++++++++--------- .../client/message/NormalMessageSizeTest.java | 6 ++--- .../broker/server/TransactionMessageTest.java | 15 ------------- .../broker/simple/SimpleTopicTypeTest.java | 2 -- 6 files changed, 15 insertions(+), 33 deletions(-) diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/listener/rmq/concurrent/TransactionListenerImpl.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/listener/rmq/concurrent/TransactionListenerImpl.java index 77008c0..c654ec3 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/listener/rmq/concurrent/TransactionListenerImpl.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/listener/rmq/concurrent/TransactionListenerImpl.java @@ -94,6 +94,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt message) { // } // } // 本地事务已成功则提交消息 + System.out.println("checkLocalTransaction: commit"); return checker; } } diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/client/message/NormalMessageSizeTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/client/message/NormalMessageSizeTest.java index 57d577b..c586b72 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/client/message/NormalMessageSizeTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/client/message/NormalMessageSizeTest.java @@ -176,7 +176,6 @@ public void testDelayMsgSize4M() { } } - @Disabled @Test @DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed") public void testTransMsgSize4MAdd1() { @@ -212,7 +211,6 @@ public Thread newThread(Runnable r) { producer.shutdown(); } - @Disabled @Test @DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success") public void testTransMsgSize4M() { diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java index 65a9430..b64d529 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.server.transaction; +import apache.rocketmq.controller.v1.MessageType; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; @@ -30,7 +31,6 @@ import org.apache.rocketmq.frame.BaseOperate; import org.apache.rocketmq.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.listener.rmq.concurrent.TransactionListenerImpl; -import org.apache.rocketmq.utils.MQAdmin; import org.apache.rocketmq.utils.NameUtils; import org.apache.rocketmq.utils.TestUtils; import org.apache.rocketmq.utils.VerifyUtils; @@ -56,7 +56,6 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; -@Disabled @Tag(TESTSET.TRANSACTION) @Tag(TESTSET.SMOKE) public class TransactionMessageTest extends BaseOperate { @@ -68,16 +67,16 @@ public class TransactionMessageTest extends BaseOperate { @BeforeEach public void setUp() { - topic = NameUtils.getTopicName(); tag = NameUtils.getTagName(); - groupId = NameUtils.getGroupName(); - MQAdmin.createTopic(namesrvAddr, cluster, topic, 8); - logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId); } + @Disabled @Test @DisplayName("Send 10 transaction messages synchronously, expecting all to be consumed") public void testConsumeNormalMessage() { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + String topic = getTopic(MessageType.TRANSACTION, methodName); + String groupId = getGroupId(methodName); RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); consumer.subscribeAndStart(topic, tag, new RMQNormalListener()); @@ -105,7 +104,7 @@ public Thread newThread(Runnable r) { @DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer") public void testTrans_SendRollback_PushConsume() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); + String topic = getTopic(MessageType.TRANSACTION, methodName); String groupId = getGroupId(methodName); RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); @@ -135,12 +134,13 @@ public Thread newThread(Runnable r) { pushConsumer.shutdown(); } + @Disabled @Test @DisplayName("Send 10 transaction messages and COMMIT the transaction by Checker (perform COMMIT), expecting the 10 messages to be consumed by PushConsumer") public void testTrans_SendCheckerCommit_PushConsume() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); + String topic = getTopic(MessageType.TRANSACTION, methodName); String groupId = getGroupId(methodName); RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); @@ -173,7 +173,7 @@ public Thread newThread(Runnable r) { @DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer") public void testTrans_CheckerRollback() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); + String topic = getTopic(MessageType.TRANSACTION, methodName); String groupId = getGroupId(methodName); RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); @@ -201,12 +201,13 @@ public Thread newThread(Runnable r) { pushConsumer.shutdown(); } + @Disabled @Test @DisplayName("Send 10 transactional messages and commit them by checking back (Checker commits for partial messages), and the expected committed messages can be consumed by PushConsumer") public void testTrans_SendCheckerPartionCommit() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); + String topic = getTopic(MessageType.TRANSACTION, methodName); String groupId = getGroupId(methodName); RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); @@ -253,6 +254,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { await().atMost(90, SECONDS).until(new Callable() { @Override public Boolean call() { + System.out.printf("rollbackMsg: %d, commitMsg: %d \n", rollbackMsgNum.get(), commitMsgNum.get()); return rollbackMsgNum.get() == commitMsgNum.get() && commitMsgNum.get() == SEND_NUM / 2; } }); diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/NormalMessageSizeTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/NormalMessageSizeTest.java index 330726c..412c548 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/NormalMessageSizeTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/NormalMessageSizeTest.java @@ -57,14 +57,14 @@ public class NormalMessageSizeTest extends BaseOperate { public static void setUpAll() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); normalTopic = getTopic(TopicMessageType.NORMAL.getValue(), methodName); -// transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName); + transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName); delayTopic = getTopic(TopicMessageType.DELAY.getValue(), methodName); fifoTopic = getTopic(TopicMessageType.FIFO.getValue(), methodName); try { producer = provider.newProducerBuilder() .setTransactionChecker(messageView -> TransactionResolution.COMMIT) .setClientConfiguration(ClientConfigurationFactory.build(account)) - .setTopics(normalTopic, delayTopic, fifoTopic) + .setTopics(normalTopic, delayTopic, transTopic, fifoTopic) .build(); } catch (ClientException e) { Assertions.fail("create producer failed"); @@ -133,7 +133,6 @@ public void testDelayMsgSize4M() { } } - @Disabled @Test @DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed") public void testTransMsgSize4MAdd1() { @@ -147,7 +146,6 @@ public void testTransMsgSize4MAdd1() { }); } - @Disabled @Test @DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success") public void testTransMsgSize4M() { diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java index 6ee3f98..b2e5087 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java @@ -81,7 +81,6 @@ public void tearDown() { } } - @Disabled @Test @DisplayName("Send 10 transaction messages and synchronously commit the transaction (Checker performs rollback), expecting those 10 messages to be consumed via PushConsumer") public void testTrans_SendCommit_PushConsume() { @@ -91,8 +90,6 @@ public void testTrans_SendCommit_PushConsume() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK)); Assertions.assertNotNull(producer); @@ -104,7 +101,6 @@ public void testTrans_SendCommit_PushConsume() { VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages()); } - @Disabled @Test @DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer") public void testTrans_SendRollback_PushConsume() { @@ -113,8 +109,6 @@ public void testTrans_SendRollback_PushConsume() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT)); Assertions.assertNotNull(producer); @@ -128,7 +122,6 @@ public void testTrans_SendRollback_PushConsume() { Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize()); } - @Disabled @Test @DisplayName("Send 10 transaction messages and COMMIT the transaction by Checker (perform COMMIT), expecting the 10 messages to be consumed by PushConsumer") public void testTrans_SendCheckerCommit_PushConsume() { @@ -138,8 +131,6 @@ public void testTrans_SendCheckerCommit_PushConsume() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT)); Assertions.assertNotNull(producer); @@ -153,7 +144,6 @@ public void testTrans_SendCheckerCommit_PushConsume() { VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages()); } - @Disabled @Test @DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer") public void testTrans_CheckerRollback() { @@ -162,8 +152,6 @@ public void testTrans_CheckerRollback() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK)); Assertions.assertNotNull(producer); @@ -177,7 +165,6 @@ public void testTrans_CheckerRollback() { Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize()); } - @Disabled @Test @DisplayName("Send 10 transactional messages and commit them by checking back (Checker commits for partial messages), and the expected committed messages can be consumed by PushConsumer") public void testTrans_SendCheckerPartionCommit() { @@ -187,8 +174,6 @@ public void testTrans_SendCheckerPartionCommit() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); AtomicInteger commitMsgNum = new AtomicInteger(0); AtomicInteger rollbackMsgNum = new AtomicInteger(0); diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleTopicTypeTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleTopicTypeTest.java index df1dcf4..dc8d1f4 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleTopicTypeTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleTopicTypeTest.java @@ -83,7 +83,6 @@ public void testDelay_simple_receive_ack() { VerifyUtils.waitDelayReceiveThenAck(producer, consumer, 1, 10000); } - @Disabled @Test @DisplayName("Send 10 transaction messages synchronously and expect SimpleConsumer to receive() and ack() messages properly") public void testTrans_simple_receive_ackAsync() { @@ -93,7 +92,6 @@ public void testTrans_simple_receive_ackAsync() { String groupId = getGroupId(methodName); SimpleConsumer consumer = ConsumerFactory.getSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(consumer); RMQNormalProducer producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT)); Assertions.assertNotNull(producer, "Get Producer failed"); for (int i = 0; i < SEND_NUM; i++) {