diff --git a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java index 3b2b255..10f4747 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java @@ -97,6 +97,24 @@ protected static String getTopic(String messageType, String methodName) { return null; } + protected static String getTransTopic(String messageType, String methodName) { + String topic = String.format("topic_%s_%s_%s", messageType, methodName, RandomUtils.getStringWithCharacter(6)); + log.info("[Topic] topic:{}, messageType:{}, methodName:{}", topic, messageType, methodName); + try { + CreateTopicRequest request = CreateTopicRequest.newBuilder() + .setTopic(topic) + .setCount(1) + .setAcceptTypes(convertAcceptTypes(messageType)) + .build(); + Long topicId = client.createTopic(endPoint, request).join(); + log.info("create topic: {} , topicId:{}", topic, topicId); + return topic; + } catch (Exception e) { + log.error("create topic error", e); + } + return null; + } + private static AcceptTypes convertAcceptTypes(String typeStr) { switch (typeStr) { case "NORMAL": diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java index b2e5087..dbc8704 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java @@ -86,7 +86,7 @@ public void tearDown() { public void testTrans_SendCommit_PushConsume() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName); + String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName); String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); @@ -105,7 +105,7 @@ public void testTrans_SendCommit_PushConsume() { @DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer") public void testTrans_SendRollback_PushConsume() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName); + String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName); String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); @@ -127,7 +127,7 @@ public void testTrans_SendRollback_PushConsume() { public void testTrans_SendCheckerCommit_PushConsume() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName); + String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName); String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); @@ -148,7 +148,7 @@ public void testTrans_SendCheckerCommit_PushConsume() { @DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer") public void testTrans_CheckerRollback() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName); + String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName); String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); @@ -170,7 +170,7 @@ public void testTrans_CheckerRollback() { public void testTrans_SendCheckerPartionCommit() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName); + String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName); String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); @@ -195,7 +195,7 @@ public TransactionResolution check(MessageView messageView) { Message message = MessageFactory.buildMessage(topic, tag, String.valueOf(i)); producer.sendTrans(message, null); } - await().atMost(90, SECONDS).until(new Callable() { + await().atMost(120, SECONDS).until(new Callable() { @Override public Boolean call() { return rollbackMsgNum.get() == commitMsgNum.get() && commitMsgNum.get() == SEND_NUM / 2;