diff --git a/cli/src/main/java/com/automq/rocketmq/cli/ConsoleHelper.java b/cli/src/main/java/com/automq/rocketmq/cli/ConsoleHelper.java index 710991a9d..cadb9f958 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/ConsoleHelper.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/ConsoleHelper.java @@ -45,7 +45,7 @@ private static Date toDate(Timestamp timestamp) { return calendar.getTime(); } - static void alignCentral(AT_Row row) { + public static void alignCentral(AT_Row row) { for (AT_Cell cell : row.getCells()) { cell.getContext().setTextAlignment(TextAlignment.CENTER); } diff --git a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java index 0b27c0356..934ac36f2 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java @@ -17,12 +17,27 @@ package com.automq.rocketmq.cli; +import com.automq.rocketmq.cli.broker.DescribeCluster; +import com.automq.rocketmq.cli.broker.TerminateNode; +import com.automq.rocketmq.cli.consumer.ConsumeMessage; +import com.automq.rocketmq.cli.consumer.CreateGroup; +import com.automq.rocketmq.cli.consumer.DeleteGroup; +import com.automq.rocketmq.cli.consumer.DescribeGroup; +import com.automq.rocketmq.cli.consumer.ListGroup; +import com.automq.rocketmq.cli.consumer.UpdateGroup; +import com.automq.rocketmq.cli.producer.ProduceMessage; +import com.automq.rocketmq.cli.stream.DescribeStream; +import com.automq.rocketmq.cli.topic.CreateTopic; +import com.automq.rocketmq.cli.topic.DeleteTopic; +import com.automq.rocketmq.cli.topic.DescribeTopic; +import com.automq.rocketmq.cli.topic.ListTopic; +import com.automq.rocketmq.cli.topic.UpdateTopic; import picocli.CommandLine; @CommandLine.Command(name = "mqadmin", mixinStandardHelpOptions = true, - version = "S3RocketMQ 1.0", - description = "Command line tools for S3RocketMQ", + version = "AutoMQ for RocketMQ 1.0", + description = "Command line tools for AutoMQ for RocketMQ", showDefaultValues = true, subcommands = { DescribeCluster.class, @@ -55,6 +70,19 @@ public class MQAdmin implements Runnable { @CommandLine.Option(names = {"-s", "--secret-key"}, description = "The authentication secret key") String secretKey = ""; + public String getEndpoint() { + return endpoint; + } + + + public String getAccessKey() { + return accessKey; + } + + public String getSecretKey() { + return secretKey; + } + public void run() { throw new CommandLine.ParameterException(spec.commandLine(), "Missing required subcommand"); } diff --git a/cli/src/main/java/com/automq/rocketmq/cli/DescribeCluster.java b/cli/src/main/java/com/automq/rocketmq/cli/broker/DescribeCluster.java similarity index 86% rename from cli/src/main/java/com/automq/rocketmq/cli/DescribeCluster.java rename to cli/src/main/java/com/automq/rocketmq/cli/broker/DescribeCluster.java index 568af55bf..81b826981 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/DescribeCluster.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/broker/DescribeCluster.java @@ -15,10 +15,13 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.broker; import apache.rocketmq.controller.v1.Cluster; import apache.rocketmq.controller.v1.DescribeClusterRequest; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.ConsoleHelper; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import java.util.concurrent.Callable; @@ -35,7 +38,7 @@ public Void call() throws Exception { try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) { DescribeClusterRequest request = DescribeClusterRequest.newBuilder() .build(); - Cluster cluster = client.describeCluster(mqAdmin.endpoint, request).join(); + Cluster cluster = client.describeCluster(mqAdmin.getEndpoint(), request).join(); ConsoleHelper.printCluster(cluster); } return null; diff --git a/cli/src/main/java/com/automq/rocketmq/cli/TerminateNode.java b/cli/src/main/java/com/automq/rocketmq/cli/broker/TerminateNode.java similarity index 93% rename from cli/src/main/java/com/automq/rocketmq/cli/TerminateNode.java rename to cli/src/main/java/com/automq/rocketmq/cli/broker/TerminateNode.java index 5f085591e..de7790371 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/TerminateNode.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/broker/TerminateNode.java @@ -15,10 +15,12 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.broker; import apache.rocketmq.controller.v1.TerminateNodeReply; import apache.rocketmq.controller.v1.TerminateNodeRequest; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import io.grpc.stub.StreamObserver; @@ -41,7 +43,7 @@ public Void call() throws Exception { final CountDownLatch latch = new CountDownLatch(1); try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) { TerminateNodeRequest request = TerminateNodeRequest.newBuilder().setNodeId(nodeId).build(); - client.terminateNode(mqAdmin.endpoint, request, new StreamObserver<>() { + client.terminateNode(mqAdmin.getEndpoint(), request, new StreamObserver<>() { @Override public void onNext(TerminateNodeReply value) { switch (value.getStatus().getCode()) { diff --git a/cli/src/main/java/com/automq/rocketmq/cli/ConsumeMessage.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumeMessage.java similarity index 96% rename from cli/src/main/java/com/automq/rocketmq/cli/ConsumeMessage.java rename to cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumeMessage.java index fa9f1db03..ec199ec24 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/ConsumeMessage.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumeMessage.java @@ -15,13 +15,15 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.consumer; import apache.rocketmq.common.v1.Code; import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.GroupType; import apache.rocketmq.controller.v1.SubscriptionMode; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.cli.tools.CliUtils; import com.automq.rocketmq.common.PrefixThreadFactory; import com.automq.rocketmq.common.exception.ControllerException; @@ -179,7 +181,7 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException { .setSubMode(SubscriptionMode.SUB_MODE_POP) .build(); - CompletableFuture groupCf = client.createGroup(mqAdmin.endpoint, request); + CompletableFuture groupCf = client.createGroup(mqAdmin.getEndpoint(), request); groupCf = groupCf.exceptionally(throwable -> { Throwable t = CliUtils.getRealException(throwable); if (t instanceof ControllerException controllerException) { @@ -200,10 +202,10 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException { } private SimpleConsumer prepareConsumer(ClientServiceProvider provider, String consumerGroup) { StaticSessionCredentialsProvider staticSessionCredentialsProvider = - new StaticSessionCredentialsProvider(mqAdmin.accessKey, mqAdmin.secretKey); + new StaticSessionCredentialsProvider(mqAdmin.getAccessKey(), mqAdmin.getSecretKey()); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() - .setEndpoints(mqAdmin.endpoint) + .setEndpoints(mqAdmin.getEndpoint()) .setCredentialProvider(staticSessionCredentialsProvider) .setRequestTimeout(Duration.ofSeconds(10)) .build(); diff --git a/cli/src/main/java/com/automq/rocketmq/cli/CreateGroup.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/CreateGroup.java similarity index 93% rename from cli/src/main/java/com/automq/rocketmq/cli/CreateGroup.java rename to cli/src/main/java/com/automq/rocketmq/cli/consumer/CreateGroup.java index ca9178be2..6c00d5769 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/CreateGroup.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/CreateGroup.java @@ -15,12 +15,14 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.consumer; import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.GroupType; import apache.rocketmq.controller.v1.SubscriptionMode; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import java.util.concurrent.Callable; @@ -53,7 +55,7 @@ public Void call() throws Exception { .setSubMode(subMode) .build(); - CreateGroupReply groupReply = client.createGroup(mqAdmin.endpoint, request).join(); + CreateGroupReply groupReply = client.createGroup(mqAdmin.getEndpoint(), request).join(); System.out.println("Group created: " + groupReply.getGroupId()); } return null; diff --git a/cli/src/main/java/com/automq/rocketmq/cli/DeleteGroup.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/DeleteGroup.java similarity index 89% rename from cli/src/main/java/com/automq/rocketmq/cli/DeleteGroup.java rename to cli/src/main/java/com/automq/rocketmq/cli/consumer/DeleteGroup.java index c0f3e3eae..7a641846d 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/DeleteGroup.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/DeleteGroup.java @@ -15,8 +15,10 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.consumer; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import java.util.concurrent.Callable; @@ -34,7 +36,7 @@ public class DeleteGroup implements Callable { @Override public Void call() throws Exception { try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) { - client.deleteGroup(mqAdmin.endpoint, id) + client.deleteGroup(mqAdmin.getEndpoint(), id) .thenRun(() -> { System.out.println("Deleted group whose group-id=" + id); }) diff --git a/cli/src/main/java/com/automq/rocketmq/cli/DescribeGroup.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/DescribeGroup.java similarity index 94% rename from cli/src/main/java/com/automq/rocketmq/cli/DescribeGroup.java rename to cli/src/main/java/com/automq/rocketmq/cli/consumer/DescribeGroup.java index 8cd8b5fc3..0255d8c9e 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/DescribeGroup.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/DescribeGroup.java @@ -15,9 +15,11 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.consumer; import apache.rocketmq.controller.v1.ConsumerGroup; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import de.vandermeer.asciitable.AT_Row; @@ -40,7 +42,7 @@ public class DescribeGroup implements Callable { @Override public Void call() throws Exception { try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) { - ConsumerGroup group = client.describeGroup(mqAdmin.endpoint, groupName) + ConsumerGroup group = client.describeGroup(mqAdmin.getEndpoint(), groupName) .join(); if (null == group) { System.err.printf("Group '%s' is not found%n%n", groupName); diff --git a/cli/src/main/java/com/automq/rocketmq/cli/ListGroup.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ListGroup.java similarity index 92% rename from cli/src/main/java/com/automq/rocketmq/cli/ListGroup.java rename to cli/src/main/java/com/automq/rocketmq/cli/consumer/ListGroup.java index f4e11e3b3..df2dc6ea3 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/ListGroup.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ListGroup.java @@ -15,11 +15,14 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.consumer; import apache.rocketmq.controller.v1.ConsumerGroup; import apache.rocketmq.controller.v1.ListGroupReply; import apache.rocketmq.controller.v1.ListGroupRequest; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.ConsoleHelper; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import de.vandermeer.asciitable.AT_Row; @@ -49,7 +52,7 @@ public Void call() throws Exception { ConsoleHelper.alignCentral(row); listGroups.addRule(); - client.listGroups(mqAdmin.endpoint, request, new StreamObserver<>() { + client.listGroups(mqAdmin.getEndpoint(), request, new StreamObserver<>() { @Override public void onNext(ListGroupReply value) { ConsumerGroup group = value.getGroup(); diff --git a/cli/src/main/java/com/automq/rocketmq/cli/ResetConsumeOffset.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ResetConsumeOffset.java similarity index 95% rename from cli/src/main/java/com/automq/rocketmq/cli/ResetConsumeOffset.java rename to cli/src/main/java/com/automq/rocketmq/cli/consumer/ResetConsumeOffset.java index 1147cce62..0270bebc8 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/ResetConsumeOffset.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ResetConsumeOffset.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.consumer; import apache.rocketmq.controller.v1.Cluster; import apache.rocketmq.controller.v1.DescribeClusterRequest; @@ -24,6 +24,8 @@ import apache.rocketmq.controller.v1.Node; import apache.rocketmq.controller.v1.Topic; import apache.rocketmq.proxy.v1.ResetConsumeOffsetRequest; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.client.GrpcControllerClient; import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient; import com.google.protobuf.TextFormat; @@ -57,9 +59,9 @@ public Void call() throws Exception { // TODO: support retrying when failed because of cluster's state change at the same time GrpcControllerClient controllerClient = new GrpcControllerClient(new CliClientConfig()); GrpcProxyClient proxyClient = new GrpcProxyClient(new CliClientConfig()); - CompletableFuture clusterCf = controllerClient.describeCluster(mqAdmin.endpoint, DescribeClusterRequest.newBuilder().build()); + CompletableFuture clusterCf = controllerClient.describeCluster(mqAdmin.getEndpoint(), DescribeClusterRequest.newBuilder().build()); - CompletableFuture topicCf = controllerClient.describeTopic(mqAdmin.endpoint, null, topicName); + CompletableFuture topicCf = controllerClient.describeTopic(mqAdmin.getEndpoint(), null, topicName); clusterCf.thenCombine(topicCf, Pair::of) .thenComposeAsync(pair -> { Cluster cluster = pair.getLeft(); diff --git a/cli/src/main/java/com/automq/rocketmq/cli/UpdateGroup.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/UpdateGroup.java similarity index 92% rename from cli/src/main/java/com/automq/rocketmq/cli/UpdateGroup.java rename to cli/src/main/java/com/automq/rocketmq/cli/consumer/UpdateGroup.java index b04b819ee..178cd8ba4 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/UpdateGroup.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/UpdateGroup.java @@ -15,10 +15,12 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.consumer; import apache.rocketmq.controller.v1.GroupType; import apache.rocketmq.controller.v1.UpdateGroupRequest; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import com.google.common.base.Strings; @@ -66,7 +68,7 @@ public Void call() throws Exception { builder.setGroupType(groupType); } - client.updateGroup(mqAdmin.endpoint, builder.build()).join(); + client.updateGroup(mqAdmin.getEndpoint(), builder.build()).join(); } return null; } diff --git a/cli/src/main/java/com/automq/rocketmq/cli/ProduceMessage.java b/cli/src/main/java/com/automq/rocketmq/cli/producer/ProduceMessage.java similarity index 96% rename from cli/src/main/java/com/automq/rocketmq/cli/ProduceMessage.java rename to cli/src/main/java/com/automq/rocketmq/cli/producer/ProduceMessage.java index c6a15c339..442e1096d 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/ProduceMessage.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/producer/ProduceMessage.java @@ -15,12 +15,14 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.producer; import apache.rocketmq.controller.v1.AcceptTypes; import apache.rocketmq.common.v1.Code; import apache.rocketmq.controller.v1.CreateTopicRequest; import apache.rocketmq.controller.v1.MessageType; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.cli.tools.CliUtils; import com.automq.rocketmq.common.PrefixThreadFactory; import com.automq.rocketmq.common.exception.ControllerException; @@ -141,7 +143,7 @@ private void prepareTopics() throws IOException, ControllerException { .setAcceptTypes(AcceptTypes.newBuilder().addTypes(messageType).build()) .build(); - CompletableFuture topicCf = client.createTopic(mqAdmin.endpoint, request); + CompletableFuture topicCf = client.createTopic(mqAdmin.getEndpoint(), request); topicCf = topicCf.exceptionally(throwable -> { Throwable t = CliUtils.getRealException(throwable); @@ -162,10 +164,10 @@ private void prepareTopics() throws IOException, ControllerException { private Producer prepareProducer(ClientServiceProvider provider) { StaticSessionCredentialsProvider staticSessionCredentialsProvider = - new StaticSessionCredentialsProvider(mqAdmin.accessKey, mqAdmin.secretKey); + new StaticSessionCredentialsProvider(mqAdmin.getAccessKey(), mqAdmin.getSecretKey()); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() - .setEndpoints(mqAdmin.endpoint) + .setEndpoints(mqAdmin.getEndpoint()) .setCredentialProvider(staticSessionCredentialsProvider) .setRequestTimeout(Duration.ofSeconds(10)) .build(); diff --git a/cli/src/main/java/com/automq/rocketmq/cli/DescribeStream.java b/cli/src/main/java/com/automq/rocketmq/cli/stream/DescribeStream.java similarity index 90% rename from cli/src/main/java/com/automq/rocketmq/cli/DescribeStream.java rename to cli/src/main/java/com/automq/rocketmq/cli/stream/DescribeStream.java index 4f1e3dfde..de0ed78d0 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/DescribeStream.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/stream/DescribeStream.java @@ -15,11 +15,14 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.stream; import apache.rocketmq.common.v1.Code; import apache.rocketmq.controller.v1.DescribeStreamReply; import apache.rocketmq.controller.v1.DescribeStreamRequest; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.ConsoleHelper; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import java.util.concurrent.Callable; @@ -40,7 +43,7 @@ public Void call() throws Exception { DescribeStreamRequest request = DescribeStreamRequest.newBuilder() .setStreamId(streamId) .build(); - DescribeStreamReply reply = client.describeStream(mqAdmin.endpoint, request).join(); + DescribeStreamReply reply = client.describeStream(mqAdmin.getEndpoint(), request).join(); if (reply.getStatus().getCode() == Code.OK) { ConsoleHelper.printStream(reply.getStream(), reply.getRangesList()); } else { diff --git a/cli/src/main/java/com/automq/rocketmq/cli/CreateTopic.java b/cli/src/main/java/com/automq/rocketmq/cli/topic/CreateTopic.java similarity index 93% rename from cli/src/main/java/com/automq/rocketmq/cli/CreateTopic.java rename to cli/src/main/java/com/automq/rocketmq/cli/topic/CreateTopic.java index 2dd0779bb..f42f1668d 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/CreateTopic.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/topic/CreateTopic.java @@ -15,11 +15,13 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.topic; import apache.rocketmq.controller.v1.AcceptTypes; import apache.rocketmq.controller.v1.CreateTopicRequest; import apache.rocketmq.controller.v1.MessageType; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.common.util.DurationUtil; import com.automq.rocketmq.controller.client.GrpcControllerClient; import java.util.concurrent.Callable; @@ -68,7 +70,7 @@ public Void call() throws Exception { .setRetentionHours((int) retentionHours) .build(); - Long topicId = client.createTopic(mqAdmin.endpoint, request).join(); + Long topicId = client.createTopic(mqAdmin.getEndpoint(), request).join(); System.out.println("Topic created: " + topicId); client.close(); return null; diff --git a/cli/src/main/java/com/automq/rocketmq/cli/DeleteTopic.java b/cli/src/main/java/com/automq/rocketmq/cli/topic/DeleteTopic.java similarity index 89% rename from cli/src/main/java/com/automq/rocketmq/cli/DeleteTopic.java rename to cli/src/main/java/com/automq/rocketmq/cli/topic/DeleteTopic.java index 1b228975f..2f7e579b4 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/DeleteTopic.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/topic/DeleteTopic.java @@ -15,8 +15,10 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.topic; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import java.util.concurrent.Callable; @@ -34,7 +36,7 @@ public class DeleteTopic implements Callable { @Override public Void call() throws Exception { try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) { - client.deleteTopic(mqAdmin.endpoint, id) + client.deleteTopic(mqAdmin.getEndpoint(), id) .thenRun(() -> { System.out.println("Deleted topic whose topic-id=" + id); }) diff --git a/cli/src/main/java/com/automq/rocketmq/cli/DescribeTopic.java b/cli/src/main/java/com/automq/rocketmq/cli/topic/DescribeTopic.java similarity index 87% rename from cli/src/main/java/com/automq/rocketmq/cli/DescribeTopic.java rename to cli/src/main/java/com/automq/rocketmq/cli/topic/DescribeTopic.java index 5e339203a..da6c5c8b0 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/DescribeTopic.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/topic/DescribeTopic.java @@ -15,9 +15,12 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.topic; import apache.rocketmq.controller.v1.Topic; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.ConsoleHelper; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import java.util.concurrent.Callable; @@ -35,7 +38,7 @@ public class DescribeTopic implements Callable { @Override public Void call() throws Exception { try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) { - Topic topic = client.describeTopic(mqAdmin.endpoint, null, topicName) + Topic topic = client.describeTopic(mqAdmin.getEndpoint(), null, topicName) .join(); if (null == topic) { System.err.printf("Topic '%s' is not found%n%n", topicName); diff --git a/cli/src/main/java/com/automq/rocketmq/cli/ListTopic.java b/cli/src/main/java/com/automq/rocketmq/cli/topic/ListTopic.java similarity index 93% rename from cli/src/main/java/com/automq/rocketmq/cli/ListTopic.java rename to cli/src/main/java/com/automq/rocketmq/cli/topic/ListTopic.java index 5f0a80f8a..de6661201 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/ListTopic.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/topic/ListTopic.java @@ -15,12 +15,15 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.topic; import apache.rocketmq.controller.v1.ListTopicsReply; import apache.rocketmq.controller.v1.ListTopicsRequest; import apache.rocketmq.controller.v1.MessageType; import apache.rocketmq.controller.v1.Topic; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.ConsoleHelper; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import de.vandermeer.asciitable.AT_Row; @@ -51,7 +54,7 @@ public Void call() throws Exception { CWC_LongestLine cwc = new CWC_LongestLine(); - client.listTopics(mqAdmin.endpoint, request, new StreamObserver<>() { + client.listTopics(mqAdmin.getEndpoint(), request, new StreamObserver<>() { @Override public void onNext(ListTopicsReply reply) { Topic topic = reply.getTopic(); diff --git a/cli/src/main/java/com/automq/rocketmq/cli/UpdateTopic.java b/cli/src/main/java/com/automq/rocketmq/cli/topic/UpdateTopic.java similarity index 85% rename from cli/src/main/java/com/automq/rocketmq/cli/UpdateTopic.java rename to cli/src/main/java/com/automq/rocketmq/cli/topic/UpdateTopic.java index 22ed334fd..99062e496 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/UpdateTopic.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/topic/UpdateTopic.java @@ -15,10 +15,13 @@ * limitations under the License. */ -package com.automq.rocketmq.cli; +package com.automq.rocketmq.cli.topic; import apache.rocketmq.controller.v1.Topic; import apache.rocketmq.controller.v1.UpdateTopicRequest; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.ConsoleHelper; +import com.automq.rocketmq.cli.MQAdmin; import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.client.GrpcControllerClient; import com.google.common.base.Strings; @@ -43,7 +46,7 @@ public class UpdateTopic implements Callable { @Override public Void call() throws Exception { try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) { - Topic topic = client.describeTopic(mqAdmin.endpoint, topicId, null) + Topic topic = client.describeTopic(mqAdmin.getEndpoint(), topicId, null) .join(); if (null == topic) { System.err.printf("Topic '%s' is not found%n%n", topicName); @@ -59,8 +62,8 @@ public Void call() throws Exception { if (!Strings.isNullOrEmpty(topicName)) { builder.setName(topicName); } - client.updateTopic(mqAdmin.endpoint, builder.build()).join(); - topic = client.describeTopic(mqAdmin.endpoint, topicId, null).join(); + client.updateTopic(mqAdmin.getEndpoint(), builder.build()).join(); + topic = client.describeTopic(mqAdmin.getEndpoint(), topicId, null).join(); ConsoleHelper.printTable(topic); } return null;