Skip to content

Commit

Permalink
feat: set consume mode for consumer group
Browse files Browse the repository at this point in the history
1. set consume mode for consumer group
  • Loading branch information
TheR1sing3un committed Nov 1, 2023
1 parent e507ea9 commit cd67e89
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.AcceptTypes;
import apache.rocketmq.controller.v1.SubscriptionMode;
import com.automq.rocketmq.controller.metadata.GrpcControllerClient;
import com.automq.rocketmq.cli.CliClientConfig;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -144,15 +145,20 @@ protected static String resetOffsetByTimestamp(String consumerGroup, String topi
//
//The synchronization consumption retry policy is DefaultRetryPolicy
protected static String getGroupId(String methodName) {
return getGroupId(methodName, SubscriptionMode.SUB_MODE_POP);
}

protected static String getGroupId(String methodName, SubscriptionMode mode) {
String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6));
// prepare consumer group
CreateGroupRequest request = CreateGroupRequest.newBuilder()
.setName(groupId)
.setMaxDeliveryAttempt(16)
.setGroupType(GroupType.GROUP_TYPE_STANDARD)
.setSubMode(mode)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply);
log.info("[ConsumerGroupId] groupId:{} , methodName:{} , mode: {} , reply:{}", groupId, methodName, mode, reply);
return groupId;
}

Expand Down

0 comments on commit cd67e89

Please sign in to comment.