Skip to content

Commit

Permalink
fix: adapt to create consumer groups
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye committed Nov 29, 2023
1 parent 8b80fca commit d63719c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.rocketmq.frame;

import apache.rocketmq.controller.v1.AcceptTypes;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.GroupType;
Expand Down Expand Up @@ -94,7 +93,7 @@ protected static String getGroupId(String methodName, SubscriptionMode mode) {
.setGroupType(GroupType.GROUP_TYPE_STANDARD)
.setSubMode(mode)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
Long reply = createConsumerGroup(request).join();
logger.info("[ConsumerGroupId] groupId:{} , methodName:{} , mode: {} , reply:{}", groupId, methodName, mode, reply);
return groupId;
}
Expand All @@ -107,14 +106,14 @@ protected static String getOrderlyGroupId(String methodName, SubscriptionMode mo
.setGroupType(GroupType.GROUP_TYPE_FIFO)
.setSubMode(mode)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
Long reply = createConsumerGroup(request).join();
logger.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply);
return groupId;
}

private static CompletableFuture<CreateGroupReply> createConsumerGroup(CreateGroupRequest request) {
private static CompletableFuture<Long> createConsumerGroup(CreateGroupRequest request) {
try {
CompletableFuture<CreateGroupReply> groupCf = client.createGroup(namesrvAddr, request);
CompletableFuture<Long> groupCf = client.createGroup(namesrvAddr, request);
return groupCf.exceptionally(throwable -> {
logger.error("Create group failed", throwable);
throw new CompletionException(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.rocketmq.frame;

import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.GroupType;
Expand Down Expand Up @@ -157,7 +156,7 @@ protected static String getGroupId(String methodName, SubscriptionMode mode) {
.setGroupType(GroupType.GROUP_TYPE_STANDARD)
.setSubMode(mode)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
Long reply = createConsumerGroup(request).join();
log.info("[ConsumerGroupId] groupId:{} , methodName:{} , mode: {} , reply:{}", groupId, methodName, mode, reply);
return groupId;
}
Expand All @@ -171,7 +170,7 @@ protected static String getOrderlyGroupId(String methodName) {
.setGroupType(GroupType.GROUP_TYPE_FIFO)
.setSubMode(SubscriptionMode.SUB_MODE_POP)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
Long reply = createConsumerGroup(request).join();
log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply);
return groupId;
}
Expand All @@ -184,14 +183,14 @@ protected static String getOrderlyGroupId(String methodName, SubscriptionMode mo
.setGroupType(GroupType.GROUP_TYPE_FIFO)
.setSubMode(mode)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
Long reply = createConsumerGroup(request).join();
log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply);
return groupId;
}

private static CompletableFuture<CreateGroupReply> createConsumerGroup(CreateGroupRequest request) {
private static CompletableFuture<Long> createConsumerGroup(CreateGroupRequest request) {
try {
CompletableFuture<CreateGroupReply> groupCf = client.createGroup(account.getEndpoint(), request);
CompletableFuture<Long> groupCf = client.createGroup(account.getEndpoint(), request);
return groupCf.exceptionally(throwable -> {
log.error("Create group failed", throwable);
throw new CompletionException(throwable);
Expand Down

0 comments on commit d63719c

Please sign in to comment.