Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use one group in trans topic for grpc e2e test #20

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ protected static String getTopic(String messageType, String methodName) {
return null;
}

protected static String getTransTopic(String messageType, String methodName) {
String topic = String.format("topic_%s_%s_%s", messageType, methodName, RandomUtils.getStringWithCharacter(6));
log.info("[Topic] topic:{}, messageType:{}, methodName:{}", topic, messageType, methodName);
try {
CreateTopicRequest request = CreateTopicRequest.newBuilder()
.setTopic(topic)
.setCount(1)
.setAcceptTypes(convertAcceptTypes(messageType))
.build();
Long topicId = client.createTopic(endPoint, request).join();
log.info("create topic: {} , topicId:{}", topic, topicId);
return topic;
} catch (Exception e) {
log.error("create topic error", e);
}
return null;
}

private static AcceptTypes convertAcceptTypes(String typeStr) {
switch (typeStr) {
case "NORMAL":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void tearDown() {
public void testTrans_SendCommit_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -105,7 +105,7 @@ public void testTrans_SendCommit_PushConsume() {
@DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer")
public void testTrans_SendRollback_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -127,7 +127,7 @@ public void testTrans_SendRollback_PushConsume() {
public void testTrans_SendCheckerCommit_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -148,7 +148,7 @@ public void testTrans_SendCheckerCommit_PushConsume() {
@DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer")
public void testTrans_CheckerRollback() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -170,7 +170,7 @@ public void testTrans_CheckerRollback() {
public void testTrans_SendCheckerPartionCommit() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -195,7 +195,7 @@ public TransactionResolution check(MessageView messageView) {
Message message = MessageFactory.buildMessage(topic, tag, String.valueOf(i));
producer.sendTrans(message, null);
}
await().atMost(90, SECONDS).until(new Callable<Boolean>() {
await().atMost(120, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() {
return rollbackMsgNum.get() == commitMsgNum.get() && commitMsgNum.get() == SEND_NUM / 2;
Expand Down