From d63719c43f7ec2cec2b6f176ba2d3ec4a5854b84 Mon Sep 17 00:00:00 2001 From: wangxye Date: Wed, 29 Nov 2023 11:05:54 +0800 Subject: [PATCH] fix: adapt to create consumer groups Signed-off-by: wangxye --- .../java/org/apache/rocketmq/frame/BaseOperate.java | 9 ++++----- .../java/org/apache/rocketmq/frame/BaseOperate.java | 11 +++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java index a89c31a..c106a7c 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.frame; import apache.rocketmq.controller.v1.AcceptTypes; -import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.CreateTopicRequest; import apache.rocketmq.controller.v1.GroupType; @@ -94,7 +93,7 @@ protected static String getGroupId(String methodName, SubscriptionMode mode) { .setGroupType(GroupType.GROUP_TYPE_STANDARD) .setSubMode(mode) .build(); - CreateGroupReply reply = createConsumerGroup(request).join(); + Long reply = createConsumerGroup(request).join(); logger.info("[ConsumerGroupId] groupId:{} , methodName:{} , mode: {} , reply:{}", groupId, methodName, mode, reply); return groupId; } @@ -107,14 +106,14 @@ protected static String getOrderlyGroupId(String methodName, SubscriptionMode mo .setGroupType(GroupType.GROUP_TYPE_FIFO) .setSubMode(mode) .build(); - CreateGroupReply reply = createConsumerGroup(request).join(); + Long reply = createConsumerGroup(request).join(); logger.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply); return groupId; } - private static CompletableFuture createConsumerGroup(CreateGroupRequest request) { + private static CompletableFuture createConsumerGroup(CreateGroupRequest request) { try { - CompletableFuture groupCf = client.createGroup(namesrvAddr, request); + CompletableFuture groupCf = client.createGroup(namesrvAddr, request); return groupCf.exceptionally(throwable -> { logger.error("Create group failed", throwable); throw new CompletionException(throwable); diff --git a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java index 54b9778..3b2b255 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.frame; -import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.CreateTopicRequest; import apache.rocketmq.controller.v1.GroupType; @@ -157,7 +156,7 @@ protected static String getGroupId(String methodName, SubscriptionMode mode) { .setGroupType(GroupType.GROUP_TYPE_STANDARD) .setSubMode(mode) .build(); - CreateGroupReply reply = createConsumerGroup(request).join(); + Long reply = createConsumerGroup(request).join(); log.info("[ConsumerGroupId] groupId:{} , methodName:{} , mode: {} , reply:{}", groupId, methodName, mode, reply); return groupId; } @@ -171,7 +170,7 @@ protected static String getOrderlyGroupId(String methodName) { .setGroupType(GroupType.GROUP_TYPE_FIFO) .setSubMode(SubscriptionMode.SUB_MODE_POP) .build(); - CreateGroupReply reply = createConsumerGroup(request).join(); + Long reply = createConsumerGroup(request).join(); log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply); return groupId; } @@ -184,14 +183,14 @@ protected static String getOrderlyGroupId(String methodName, SubscriptionMode mo .setGroupType(GroupType.GROUP_TYPE_FIFO) .setSubMode(mode) .build(); - CreateGroupReply reply = createConsumerGroup(request).join(); + Long reply = createConsumerGroup(request).join(); log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply); return groupId; } - private static CompletableFuture createConsumerGroup(CreateGroupRequest request) { + private static CompletableFuture createConsumerGroup(CreateGroupRequest request) { try { - CompletableFuture groupCf = client.createGroup(account.getEndpoint(), request); + CompletableFuture groupCf = client.createGroup(account.getEndpoint(), request); return groupCf.exceptionally(throwable -> { log.error("Create group failed", throwable); throw new CompletionException(throwable);