Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor cmd line in cli #744

Merged
merged 2 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private static Date toDate(Timestamp timestamp) {
return calendar.getTime();
}

static void alignCentral(AT_Row row) {
public static void alignCentral(AT_Row row) {
for (AT_Cell cell : row.getCells()) {
cell.getContext().setTextAlignment(TextAlignment.CENTER);
}
Expand Down
32 changes: 30 additions & 2 deletions cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,27 @@

package com.automq.rocketmq.cli;

import com.automq.rocketmq.cli.broker.DescribeCluster;
import com.automq.rocketmq.cli.broker.TerminateNode;
import com.automq.rocketmq.cli.consumer.ConsumeMessage;
import com.automq.rocketmq.cli.consumer.CreateGroup;
import com.automq.rocketmq.cli.consumer.DeleteGroup;
import com.automq.rocketmq.cli.consumer.DescribeGroup;
import com.automq.rocketmq.cli.consumer.ListGroup;
import com.automq.rocketmq.cli.consumer.UpdateGroup;
import com.automq.rocketmq.cli.producer.ProduceMessage;
import com.automq.rocketmq.cli.stream.DescribeStream;
import com.automq.rocketmq.cli.topic.CreateTopic;
import com.automq.rocketmq.cli.topic.DeleteTopic;
import com.automq.rocketmq.cli.topic.DescribeTopic;
import com.automq.rocketmq.cli.topic.ListTopic;
import com.automq.rocketmq.cli.topic.UpdateTopic;
import picocli.CommandLine;

@CommandLine.Command(name = "mqadmin",
mixinStandardHelpOptions = true,
version = "S3RocketMQ 1.0",
description = "Command line tools for S3RocketMQ",
version = "AutoMQ for RocketMQ 1.0",
description = "Command line tools for AutoMQ for RocketMQ",
showDefaultValues = true,
subcommands = {
DescribeCluster.class,
Expand Down Expand Up @@ -55,6 +70,19 @@ public class MQAdmin implements Runnable {
@CommandLine.Option(names = {"-s", "--secret-key"}, description = "The authentication secret key")
String secretKey = "";

public String getEndpoint() {
return endpoint;
}


public String getAccessKey() {
return accessKey;
}

public String getSecretKey() {
return secretKey;
}

public void run() {
throw new CommandLine.ParameterException(spec.commandLine(), "Missing required subcommand");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.broker;

import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.DescribeClusterRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.ConsoleHelper;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand All @@ -35,7 +38,7 @@ public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
DescribeClusterRequest request = DescribeClusterRequest.newBuilder()
.build();
Cluster cluster = client.describeCluster(mqAdmin.endpoint, request).join();
Cluster cluster = client.describeCluster(mqAdmin.getEndpoint(), request).join();
ConsoleHelper.printCluster(cluster);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.broker;

import apache.rocketmq.controller.v1.TerminateNodeReply;
import apache.rocketmq.controller.v1.TerminateNodeRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import io.grpc.stub.StreamObserver;
Expand All @@ -41,7 +43,7 @@ public Void call() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
TerminateNodeRequest request = TerminateNodeRequest.newBuilder().setNodeId(nodeId).build();
client.terminateNode(mqAdmin.endpoint, request, new StreamObserver<>() {
client.terminateNode(mqAdmin.getEndpoint(), request, new StreamObserver<>() {
@Override
public void onNext(TerminateNodeReply value) {
switch (value.getStatus().getCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.cli.tools.CliUtils;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.common.exception.ControllerException;
Expand Down Expand Up @@ -179,7 +181,7 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException {
.setSubMode(SubscriptionMode.SUB_MODE_POP)
.build();

CompletableFuture<CreateGroupReply> groupCf = client.createGroup(mqAdmin.endpoint, request);
CompletableFuture<CreateGroupReply> groupCf = client.createGroup(mqAdmin.getEndpoint(), request);
groupCf = groupCf.exceptionally(throwable -> {
Throwable t = CliUtils.getRealException(throwable);
if (t instanceof ControllerException controllerException) {
Expand All @@ -200,10 +202,10 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException {
}
private SimpleConsumer prepareConsumer(ClientServiceProvider provider, String consumerGroup) {
StaticSessionCredentialsProvider staticSessionCredentialsProvider =
new StaticSessionCredentialsProvider(mqAdmin.accessKey, mqAdmin.secretKey);
new StaticSessionCredentialsProvider(mqAdmin.getAccessKey(), mqAdmin.getSecretKey());

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(mqAdmin.endpoint)
.setEndpoints(mqAdmin.getEndpoint())
.setCredentialProvider(staticSessionCredentialsProvider)
.setRequestTimeout(Duration.ofSeconds(10))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -53,7 +55,7 @@ public Void call() throws Exception {
.setSubMode(subMode)
.build();

CreateGroupReply groupReply = client.createGroup(mqAdmin.endpoint, request).join();
CreateGroupReply groupReply = client.createGroup(mqAdmin.getEndpoint(), request).join();
System.out.println("Group created: " + groupReply.getGroupId());
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand All @@ -34,7 +36,7 @@ public class DeleteGroup implements Callable<Void> {
@Override
public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
client.deleteGroup(mqAdmin.endpoint, id)
client.deleteGroup(mqAdmin.getEndpoint(), id)
.thenRun(() -> {
System.out.println("Deleted group whose group-id=" + id);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.ConsumerGroup;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import de.vandermeer.asciitable.AT_Row;
Expand All @@ -40,7 +42,7 @@ public class DescribeGroup implements Callable<Void> {
@Override
public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
ConsumerGroup group = client.describeGroup(mqAdmin.endpoint, groupName)
ConsumerGroup group = client.describeGroup(mqAdmin.getEndpoint(), groupName)
.join();
if (null == group) {
System.err.printf("Group '%s' is not found%n%n", groupName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.ListGroupReply;
import apache.rocketmq.controller.v1.ListGroupRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.ConsoleHelper;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import de.vandermeer.asciitable.AT_Row;
Expand Down Expand Up @@ -49,7 +52,7 @@ public Void call() throws Exception {
ConsoleHelper.alignCentral(row);
listGroups.addRule();

client.listGroups(mqAdmin.endpoint, request, new StreamObserver<>() {
client.listGroups(mqAdmin.getEndpoint(), request, new StreamObserver<>() {
@Override
public void onNext(ListGroupReply value) {
ConsumerGroup group = value.getGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.DescribeClusterRequest;
Expand All @@ -24,6 +24,8 @@
import apache.rocketmq.controller.v1.Node;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient;
import com.google.protobuf.TextFormat;
Expand Down Expand Up @@ -57,9 +59,9 @@ public Void call() throws Exception {
// TODO: support retrying when failed because of cluster's state change at the same time
GrpcControllerClient controllerClient = new GrpcControllerClient(new CliClientConfig());
GrpcProxyClient proxyClient = new GrpcProxyClient(new CliClientConfig());
CompletableFuture<Cluster> clusterCf = controllerClient.describeCluster(mqAdmin.endpoint, DescribeClusterRequest.newBuilder().build());
CompletableFuture<Cluster> clusterCf = controllerClient.describeCluster(mqAdmin.getEndpoint(), DescribeClusterRequest.newBuilder().build());

CompletableFuture<Topic> topicCf = controllerClient.describeTopic(mqAdmin.endpoint, null, topicName);
CompletableFuture<Topic> topicCf = controllerClient.describeTopic(mqAdmin.getEndpoint(), null, topicName);
clusterCf.thenCombine(topicCf, Pair::of)
.thenComposeAsync(pair -> {
Cluster cluster = pair.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -66,7 +68,7 @@ public Void call() throws Exception {
builder.setGroupType(groupType);
}

client.updateGroup(mqAdmin.endpoint, builder.build()).join();
client.updateGroup(mqAdmin.getEndpoint(), builder.build()).join();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.producer;

import apache.rocketmq.controller.v1.AcceptTypes;
import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.MessageType;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.cli.tools.CliUtils;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.common.exception.ControllerException;
Expand Down Expand Up @@ -141,7 +143,7 @@ private void prepareTopics() throws IOException, ControllerException {
.setAcceptTypes(AcceptTypes.newBuilder().addTypes(messageType).build())
.build();

CompletableFuture<Long> topicCf = client.createTopic(mqAdmin.endpoint, request);
CompletableFuture<Long> topicCf = client.createTopic(mqAdmin.getEndpoint(), request);

topicCf = topicCf.exceptionally(throwable -> {
Throwable t = CliUtils.getRealException(throwable);
Expand All @@ -162,10 +164,10 @@ private void prepareTopics() throws IOException, ControllerException {

private Producer prepareProducer(ClientServiceProvider provider) {
StaticSessionCredentialsProvider staticSessionCredentialsProvider =
new StaticSessionCredentialsProvider(mqAdmin.accessKey, mqAdmin.secretKey);
new StaticSessionCredentialsProvider(mqAdmin.getAccessKey(), mqAdmin.getSecretKey());

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(mqAdmin.endpoint)
.setEndpoints(mqAdmin.getEndpoint())
.setCredentialProvider(staticSessionCredentialsProvider)
.setRequestTimeout(Duration.ofSeconds(10))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.stream;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.DescribeStreamReply;
import apache.rocketmq.controller.v1.DescribeStreamRequest;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.ConsoleHelper;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand All @@ -40,7 +43,7 @@ public Void call() throws Exception {
DescribeStreamRequest request = DescribeStreamRequest.newBuilder()
.setStreamId(streamId)
.build();
DescribeStreamReply reply = client.describeStream(mqAdmin.endpoint, request).join();
DescribeStreamReply reply = client.describeStream(mqAdmin.getEndpoint(), request).join();
if (reply.getStatus().getCode() == Code.OK) {
ConsoleHelper.printStream(reply.getStream(), reply.getRangesList());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.topic;

import apache.rocketmq.controller.v1.AcceptTypes;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.MessageType;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.common.util.DurationUtil;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -68,7 +70,7 @@ public Void call() throws Exception {
.setRetentionHours((int) retentionHours)
.build();

Long topicId = client.createTopic(mqAdmin.endpoint, request).join();
Long topicId = client.createTopic(mqAdmin.getEndpoint(), request).join();
System.out.println("Topic created: " + topicId);
client.close();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* limitations under the License.
*/

package com.automq.rocketmq.cli;
package com.automq.rocketmq.cli.topic;

import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.concurrent.Callable;
Expand All @@ -34,7 +36,7 @@ public class DeleteTopic implements Callable<Void> {
@Override
public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
client.deleteTopic(mqAdmin.endpoint, id)
client.deleteTopic(mqAdmin.getEndpoint(), id)
.thenRun(() -> {
System.out.println("Deleted topic whose topic-id=" + id);
})
Expand Down
Loading
Loading