Skip to content

Commit

Permalink
feat: support trans message e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye committed Dec 6, 2023
1 parent 809375b commit 4eacdcb
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt message) {
// }
// }
// 本地事务已成功则提交消息
System.out.println("checkLocalTransaction: commit");
return checker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public void testDelayMsgSize4M() {
}
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed")
public void testTransMsgSize4MAdd1() {
Expand Down Expand Up @@ -212,7 +211,6 @@ public Thread newThread(Runnable r) {
producer.shutdown();
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success")
public void testTransMsgSize4M() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.server.transaction;

import apache.rocketmq.controller.v1.MessageType;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
Expand All @@ -30,7 +31,6 @@
import org.apache.rocketmq.frame.BaseOperate;
import org.apache.rocketmq.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.listener.rmq.concurrent.TransactionListenerImpl;
import org.apache.rocketmq.utils.MQAdmin;
import org.apache.rocketmq.utils.NameUtils;
import org.apache.rocketmq.utils.TestUtils;
import org.apache.rocketmq.utils.VerifyUtils;
Expand All @@ -56,7 +56,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

@Disabled
@Tag(TESTSET.TRANSACTION)
@Tag(TESTSET.SMOKE)
public class TransactionMessageTest extends BaseOperate {
Expand All @@ -68,16 +67,16 @@ public class TransactionMessageTest extends BaseOperate {

@BeforeEach
public void setUp() {
topic = NameUtils.getTopicName();
tag = NameUtils.getTagName();
groupId = NameUtils.getGroupName();
MQAdmin.createTopic(namesrvAddr, cluster, topic, 8);
logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId);
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages synchronously, expecting all to be consumed")
public void testConsumeNormalMessage() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);
RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
consumer.subscribeAndStart(topic, tag, new RMQNormalListener());

Expand Down Expand Up @@ -105,7 +104,7 @@ public Thread newThread(Runnable r) {
@DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer")
public void testTrans_SendRollback_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand Down Expand Up @@ -135,12 +134,13 @@ public Thread newThread(Runnable r) {
pushConsumer.shutdown();
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and COMMIT the transaction by Checker (perform COMMIT), expecting the 10 messages to be consumed by PushConsumer")
public void testTrans_SendCheckerCommit_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand Down Expand Up @@ -173,7 +173,7 @@ public Thread newThread(Runnable r) {
@DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer")
public void testTrans_CheckerRollback() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand Down Expand Up @@ -201,12 +201,13 @@ public Thread newThread(Runnable r) {
pushConsumer.shutdown();
}

@Disabled
@Test
@DisplayName("Send 10 transactional messages and commit them by checking back (Checker commits for partial messages), and the expected committed messages can be consumed by PushConsumer")
public void testTrans_SendCheckerPartionCommit() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand Down Expand Up @@ -253,6 +254,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
await().atMost(90, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() {
System.out.printf("rollbackMsg: %d, commitMsg: %d \n", rollbackMsgNum.get(), commitMsgNum.get());
return rollbackMsgNum.get() == commitMsgNum.get() && commitMsgNum.get() == SEND_NUM / 2;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ public class NormalMessageSizeTest extends BaseOperate {
public static void setUpAll() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
normalTopic = getTopic(TopicMessageType.NORMAL.getValue(), methodName);
// transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
delayTopic = getTopic(TopicMessageType.DELAY.getValue(), methodName);
fifoTopic = getTopic(TopicMessageType.FIFO.getValue(), methodName);
try {
producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> TransactionResolution.COMMIT)
.setClientConfiguration(ClientConfigurationFactory.build(account))
.setTopics(normalTopic, delayTopic, fifoTopic)
.setTopics(normalTopic, delayTopic, transTopic, fifoTopic)
.build();
} catch (ClientException e) {
Assertions.fail("create producer failed");
Expand Down Expand Up @@ -133,7 +133,6 @@ public void testDelayMsgSize4M() {
}
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed")
public void testTransMsgSize4MAdd1() {
Expand All @@ -147,7 +146,6 @@ public void testTransMsgSize4MAdd1() {
});
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success")
public void testTransMsgSize4M() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public void tearDown() {
}
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and synchronously commit the transaction (Checker performs rollback), expecting those 10 messages to be consumed via PushConsumer")
public void testTrans_SendCommit_PushConsume() {
Expand All @@ -91,8 +90,6 @@ public void testTrans_SendCommit_PushConsume() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK));
Assertions.assertNotNull(producer);
Expand All @@ -104,7 +101,6 @@ public void testTrans_SendCommit_PushConsume() {
VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages());
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer")
public void testTrans_SendRollback_PushConsume() {
Expand All @@ -113,8 +109,6 @@ public void testTrans_SendRollback_PushConsume() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT));
Assertions.assertNotNull(producer);
Expand All @@ -128,7 +122,6 @@ public void testTrans_SendRollback_PushConsume() {
Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize());
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and COMMIT the transaction by Checker (perform COMMIT), expecting the 10 messages to be consumed by PushConsumer")
public void testTrans_SendCheckerCommit_PushConsume() {
Expand All @@ -138,8 +131,6 @@ public void testTrans_SendCheckerCommit_PushConsume() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT));
Assertions.assertNotNull(producer);
Expand All @@ -153,7 +144,6 @@ public void testTrans_SendCheckerCommit_PushConsume() {
VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages());
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer")
public void testTrans_CheckerRollback() {
Expand All @@ -162,8 +152,6 @@ public void testTrans_CheckerRollback() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK));
Assertions.assertNotNull(producer);
Expand All @@ -177,7 +165,6 @@ public void testTrans_CheckerRollback() {
Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize());
}

@Disabled
@Test
@DisplayName("Send 10 transactional messages and commit them by checking back (Checker commits for partial messages), and the expected committed messages can be consumed by PushConsumer")
public void testTrans_SendCheckerPartionCommit() {
Expand All @@ -187,8 +174,6 @@ public void testTrans_SendCheckerPartionCommit() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

AtomicInteger commitMsgNum = new AtomicInteger(0);
AtomicInteger rollbackMsgNum = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void testDelay_simple_receive_ack() {
VerifyUtils.waitDelayReceiveThenAck(producer, consumer, 1, 10000);
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages synchronously and expect SimpleConsumer to receive() and ack() messages properly")
public void testTrans_simple_receive_ackAsync() {
Expand All @@ -93,7 +92,6 @@ public void testTrans_simple_receive_ackAsync() {
String groupId = getGroupId(methodName);

SimpleConsumer consumer = ConsumerFactory.getSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(consumer);
RMQNormalProducer producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT));
Assertions.assertNotNull(producer, "Get Producer failed");
for (int i = 0; i < SEND_NUM; i++) {
Expand Down

0 comments on commit 4eacdcb

Please sign in to comment.