Skip to content

Commit

Permalink
feat: adaption remoting with RoS (#12)
Browse files Browse the repository at this point in the history
* feat: adaption remoting with RoS

Signed-off-by: wangxye <[email protected]>

* feat: support remoting e2e in RoS

Signed-off-by: wangxye <[email protected]>

---------

Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye authored Nov 6, 2023
1 parent 4d9a99d commit a4fd916
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 12 deletions.
29 changes: 28 additions & 1 deletion java/e2e-v4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<artifactId>rocketmq-java-e2e-v4</artifactId>

<properties>
<rocketmq.client.version>4.9.3</rocketmq.client.version>
<rocketmq.client.version>4.9.5</rocketmq.client.version>
<junit.jupiter.version>5.7.2</junit.jupiter.version>
</properties>

Expand All @@ -43,6 +43,22 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>${rocketmq.client.version}</version>
<exclusions>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
<exclusion>
<artifactId>protobuf-java-util</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>${rocketmq.client.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -88,11 +104,22 @@
<artifactId>logback-core</artifactId>
<version>1.3.0-beta0</version>
</dependency>
<dependency>
<groupId>com.automq.rocketmq</groupId>
<artifactId>rocketmq-controller</artifactId>
<version>5.1.3-automq-0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.automq.rocketmq</groupId>
<artifactId>rocketmq-cli</artifactId>
<version>5.1.3-automq-0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.3</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,98 @@

package org.apache.rocketmq.frame;

import apache.rocketmq.controller.v1.AcceptTypes;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import org.apache.rocketmq.utils.MQAdmin;
import org.apache.rocketmq.utils.RandomUtils;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

public class BaseOperate extends ResourceInit {
private static Logger logger = LoggerFactory.getLogger(BaseOperate.class);

protected static GrpcControllerClient client;

static {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
MQAdmin.mqAdminExt.shutdown();
logger.info("Shutdown Hook is running !");
}
});
try {
client = new GrpcControllerClient(new CliClientConfig());
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
MQAdmin.mqAdminExt.shutdown();
logger.info("Shutdown Hook is running !");
}
});
} catch (Throwable e) {
e.printStackTrace();
}

}

protected static String getTopic(String methodName) {
return getTopic(MessageType.NORMAL, methodName);
}

protected static String getTopic(MessageType messageType, String methodName) {
String topic = String.format("topic_%s_%s", methodName, RandomUtils.getStringWithCharacter(6));
logger.info("[Topic] topic:{}, methodName:{}", topic, methodName);
boolean result = MQAdmin.createTopic(namesrvAddr,cluster, topic, 8);
Assertions.assertTrue(result, String.format("Create topic:%s failed", topic));
try {
CreateTopicRequest request = CreateTopicRequest.newBuilder()
.setTopic(topic)
.setCount(8)
.setAcceptTypes(convertAcceptTypes(messageType))
.build();
Long topicId = client.createTopic(namesrvAddr, request).join();
logger.info("create topic: {} , topicId:{}", topic, topicId);
return topic;
} catch (Exception e) {
logger.error("create topic error", e);
}
return topic;
}

private static AcceptTypes convertAcceptTypes(MessageType messageType) {
return AcceptTypes.newBuilder().addTypes(messageType).build();
}

protected static String getGroupId(String methodName) {
return getGroupId(methodName, SubscriptionMode.SUB_MODE_POP);
}

protected static String getGroupId(String methodName, SubscriptionMode mode) {
String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6));
logger.info("[ConsumerGroupId] groupId:{}, methodName:{}", groupId, methodName);
// prepare consumer group
CreateGroupRequest request = CreateGroupRequest.newBuilder()
.setName(groupId)
.setMaxDeliveryAttempt(16)
.setGroupType(GroupType.GROUP_TYPE_STANDARD)
.setSubMode(mode)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
logger.info("[ConsumerGroupId] groupId:{} , methodName:{} , mode: {} , reply:{}", groupId, methodName, mode, reply);
return groupId;
}

private static CompletableFuture<CreateGroupReply> createConsumerGroup(CreateGroupRequest request) {
try {
CompletableFuture<CreateGroupReply> groupCf = client.createGroup(namesrvAddr, request);
return groupCf.exceptionally(throwable -> {
logger.error("Create group failed", throwable);
throw new CompletionException(throwable);
});
} catch (Exception e) {
logger.error("Create group failed", e);
return CompletableFuture.failedFuture(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public static void deleteTopic(String nameSrvAddr, String cluster, String topic)
try {
Set<String> set = new HashSet<>();
set.add(nameSrvAddr);
mqAdminExt.deleteTopicInNameServer(set, topic);
mqAdminExt.deleteTopicInNameServer(set, topic, cluster);

boolean isTopicExist = checkTopicExist(mqAdminExt, topic);
long startTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ protected static String getOrderlyGroupId(String methodName) {
.setName(groupId)
.setMaxDeliveryAttempt(16)
.setGroupType(GroupType.GROUP_TYPE_FIFO)
.setSubMode(SubscriptionMode.SUB_MODE_POP)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply);
Expand Down

0 comments on commit a4fd916

Please sign in to comment.