From a4fd9162ad01b5a46a9e9fc5a85aeea95a0dd827 Mon Sep 17 00:00:00 2001 From: Abyss <45425302+wangxye@users.noreply.github.com> Date: Sun, 5 Nov 2023 21:07:21 -0600 Subject: [PATCH] feat: adaption remoting with RoS (#12) * feat: adaption remoting with RoS Signed-off-by: wangxye * feat: support remoting e2e in RoS Signed-off-by: wangxye --------- Signed-off-by: wangxye --- java/e2e-v4/pom.xml | 29 ++++++- .../apache/rocketmq/frame/BaseOperate.java | 83 ++++++++++++++++--- .../org/apache/rocketmq/utils/MQAdmin.java | 2 +- .../apache/rocketmq/frame/BaseOperate.java | 1 + 4 files changed, 103 insertions(+), 12 deletions(-) diff --git a/java/e2e-v4/pom.xml b/java/e2e-v4/pom.xml index cc40760..63cb423 100644 --- a/java/e2e-v4/pom.xml +++ b/java/e2e-v4/pom.xml @@ -28,7 +28,7 @@ rocketmq-java-e2e-v4 - 4.9.3 + 4.9.5 5.7.2 @@ -43,6 +43,22 @@ org.apache.rocketmq rocketmq-acl ${rocketmq.client.version} + + + protobuf-java + com.google.protobuf + + + protobuf-java-util + com.google.protobuf + + + + + + org.apache.rocketmq + rocketmq-remoting + ${rocketmq.client.version} @@ -88,11 +104,22 @@ logback-core 1.3.0-beta0 + + com.automq.rocketmq + rocketmq-controller + 5.1.3-automq-0-SNAPSHOT + + + com.automq.rocketmq + rocketmq-cli + 5.1.3-automq-0-SNAPSHOT + org.awaitility awaitility 4.0.3 + diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java index d3c8ce9..041faea 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java @@ -17,35 +17,98 @@ package org.apache.rocketmq.frame; +import apache.rocketmq.controller.v1.AcceptTypes; +import apache.rocketmq.controller.v1.CreateGroupReply; +import apache.rocketmq.controller.v1.CreateGroupRequest; +import apache.rocketmq.controller.v1.CreateTopicRequest; +import apache.rocketmq.controller.v1.GroupType; +import apache.rocketmq.controller.v1.MessageType; +import apache.rocketmq.controller.v1.SubscriptionMode; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.controller.client.GrpcControllerClient; import org.apache.rocketmq.utils.MQAdmin; import org.apache.rocketmq.utils.RandomUtils; -import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + public class BaseOperate extends ResourceInit { private static Logger logger = LoggerFactory.getLogger(BaseOperate.class); + protected static GrpcControllerClient client; + static { - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - MQAdmin.mqAdminExt.shutdown(); - logger.info("Shutdown Hook is running !"); - } - }); + try { + client = new GrpcControllerClient(new CliClientConfig()); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + MQAdmin.mqAdminExt.shutdown(); + logger.info("Shutdown Hook is running !"); + } + }); + } catch (Throwable e) { + e.printStackTrace(); + } + } protected static String getTopic(String methodName) { + return getTopic(MessageType.NORMAL, methodName); + } + + protected static String getTopic(MessageType messageType, String methodName) { String topic = String.format("topic_%s_%s", methodName, RandomUtils.getStringWithCharacter(6)); logger.info("[Topic] topic:{}, methodName:{}", topic, methodName); - boolean result = MQAdmin.createTopic(namesrvAddr,cluster, topic, 8); - Assertions.assertTrue(result, String.format("Create topic:%s failed", topic)); + try { + CreateTopicRequest request = CreateTopicRequest.newBuilder() + .setTopic(topic) + .setCount(8) + .setAcceptTypes(convertAcceptTypes(messageType)) + .build(); + Long topicId = client.createTopic(namesrvAddr, request).join(); + logger.info("create topic: {} , topicId:{}", topic, topicId); + return topic; + } catch (Exception e) { + logger.error("create topic error", e); + } return topic; } + private static AcceptTypes convertAcceptTypes(MessageType messageType) { + return AcceptTypes.newBuilder().addTypes(messageType).build(); + } + protected static String getGroupId(String methodName) { + return getGroupId(methodName, SubscriptionMode.SUB_MODE_POP); + } + + protected static String getGroupId(String methodName, SubscriptionMode mode) { String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6)); - logger.info("[ConsumerGroupId] groupId:{}, methodName:{}", groupId, methodName); + // prepare consumer group + CreateGroupRequest request = CreateGroupRequest.newBuilder() + .setName(groupId) + .setMaxDeliveryAttempt(16) + .setGroupType(GroupType.GROUP_TYPE_STANDARD) + .setSubMode(mode) + .build(); + CreateGroupReply reply = createConsumerGroup(request).join(); + logger.info("[ConsumerGroupId] groupId:{} , methodName:{} , mode: {} , reply:{}", groupId, methodName, mode, reply); return groupId; } + + private static CompletableFuture createConsumerGroup(CreateGroupRequest request) { + try { + CompletableFuture groupCf = client.createGroup(namesrvAddr, request); + return groupCf.exceptionally(throwable -> { + logger.error("Create group failed", throwable); + throw new CompletionException(throwable); + }); + } catch (Exception e) { + logger.error("Create group failed", e); + return CompletableFuture.failedFuture(e); + } + } } diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/utils/MQAdmin.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/utils/MQAdmin.java index 1d62eb8..81dcc9d 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/utils/MQAdmin.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/utils/MQAdmin.java @@ -141,7 +141,7 @@ public static void deleteTopic(String nameSrvAddr, String cluster, String topic) try { Set set = new HashSet<>(); set.add(nameSrvAddr); - mqAdminExt.deleteTopicInNameServer(set, topic); + mqAdminExt.deleteTopicInNameServer(set, topic, cluster); boolean isTopicExist = checkTopicExist(mqAdminExt, topic); long startTime = System.currentTimeMillis(); diff --git a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java index 197b444..800b67c 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java @@ -169,6 +169,7 @@ protected static String getOrderlyGroupId(String methodName) { .setName(groupId) .setMaxDeliveryAttempt(16) .setGroupType(GroupType.GROUP_TYPE_FIFO) + .setSubMode(SubscriptionMode.SUB_MODE_POP) .build(); CreateGroupReply reply = createConsumerGroup(request).join(); log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply);