diff --git a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java index 4ca76f5..a91927e 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java @@ -23,6 +23,7 @@ import apache.rocketmq.controller.v1.GroupType; import apache.rocketmq.controller.v1.MessageType; import apache.rocketmq.controller.v1.AcceptTypes; +import apache.rocketmq.controller.v1.SubscriptionMode; import com.automq.rocketmq.controller.metadata.GrpcControllerClient; import com.automq.rocketmq.cli.CliClientConfig; import java.util.concurrent.CompletableFuture; @@ -144,15 +145,20 @@ protected static String resetOffsetByTimestamp(String consumerGroup, String topi // //The synchronization consumption retry policy is DefaultRetryPolicy protected static String getGroupId(String methodName) { + return getGroupId(methodName, SubscriptionMode.SUB_MODE_POP); + } + + protected static String getGroupId(String methodName, SubscriptionMode mode) { String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6)); // prepare consumer group CreateGroupRequest request = CreateGroupRequest.newBuilder() .setName(groupId) .setMaxDeliveryAttempt(16) .setGroupType(GroupType.GROUP_TYPE_STANDARD) + .setSubMode(mode) .build(); CreateGroupReply reply = createConsumerGroup(request).join(); - log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply); + log.info("[ConsumerGroupId] groupId:{} , methodName:{} , mode: {} , reply:{}", groupId, methodName, mode, reply); return groupId; }