diff --git a/cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumeMessage.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumeMessage.java index ec199ec24..907a7c592 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumeMessage.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/ConsumeMessage.java @@ -18,7 +18,6 @@ package com.automq.rocketmq.cli.consumer; import apache.rocketmq.common.v1.Code; -import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.GroupType; import apache.rocketmq.controller.v1.SubscriptionMode; @@ -181,7 +180,7 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException { .setSubMode(SubscriptionMode.SUB_MODE_POP) .build(); - CompletableFuture groupCf = client.createGroup(mqAdmin.getEndpoint(), request); + CompletableFuture groupCf = client.createGroup(mqAdmin.getEndpoint(), request); groupCf = groupCf.exceptionally(throwable -> { Throwable t = CliUtils.getRealException(throwable); if (t instanceof ControllerException controllerException) { diff --git a/cli/src/main/java/com/automq/rocketmq/cli/consumer/CreateGroup.java b/cli/src/main/java/com/automq/rocketmq/cli/consumer/CreateGroup.java index 6c00d5769..1ac4a2f2c 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/consumer/CreateGroup.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/consumer/CreateGroup.java @@ -17,7 +17,6 @@ package com.automq.rocketmq.cli.consumer; -import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.GroupType; import apache.rocketmq.controller.v1.SubscriptionMode; @@ -55,8 +54,8 @@ public Void call() throws Exception { .setSubMode(subMode) .build(); - CreateGroupReply groupReply = client.createGroup(mqAdmin.getEndpoint(), request).join(); - System.out.println("Group created: " + groupReply.getGroupId()); + long groupId = client.createGroup(mqAdmin.getEndpoint(), request).join(); + System.out.println("Group created: " + groupId); } return null; } diff --git a/cli/src/main/java/com/automq/rocketmq/cli/stream/DescribeStream.java b/cli/src/main/java/com/automq/rocketmq/cli/stream/DescribeStream.java index de0ed78d0..722cd2372 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/stream/DescribeStream.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/stream/DescribeStream.java @@ -17,9 +17,8 @@ package com.automq.rocketmq.cli.stream; -import apache.rocketmq.common.v1.Code; -import apache.rocketmq.controller.v1.DescribeStreamReply; import apache.rocketmq.controller.v1.DescribeStreamRequest; +import apache.rocketmq.controller.v1.StreamDescription; import com.automq.rocketmq.cli.CliClientConfig; import com.automq.rocketmq.cli.ConsoleHelper; import com.automq.rocketmq.cli.MQAdmin; @@ -43,12 +42,8 @@ public Void call() throws Exception { DescribeStreamRequest request = DescribeStreamRequest.newBuilder() .setStreamId(streamId) .build(); - DescribeStreamReply reply = client.describeStream(mqAdmin.getEndpoint(), request).join(); - if (reply.getStatus().getCode() == Code.OK) { - ConsoleHelper.printStream(reply.getStream(), reply.getRangesList()); - } else { - System.err.println(reply.getStatus().getMessage()); - } + StreamDescription description = client.describeStream(mqAdmin.getEndpoint(), request).join(); + ConsoleHelper.printStream(description.getStream(), description.getRangesList()); } return null; } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/ControllerClient.java b/controller/src/main/java/com/automq/rocketmq/controller/ControllerClient.java index c958f21ad..1e9444b4b 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/ControllerClient.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/ControllerClient.java @@ -17,23 +17,20 @@ package com.automq.rocketmq.controller; -import apache.rocketmq.controller.v1.CloseStreamReply; import apache.rocketmq.controller.v1.CloseStreamRequest; import apache.rocketmq.controller.v1.Cluster; import apache.rocketmq.controller.v1.ConsumerGroup; -import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.DescribeClusterRequest; -import apache.rocketmq.controller.v1.DescribeStreamReply; import apache.rocketmq.controller.v1.DescribeStreamRequest; import apache.rocketmq.controller.v1.ListGroupReply; import apache.rocketmq.controller.v1.ListGroupRequest; -import apache.rocketmq.controller.v1.ListOpenStreamsReply; import apache.rocketmq.controller.v1.ListOpenStreamsRequest; import apache.rocketmq.controller.v1.ListTopicsReply; import apache.rocketmq.controller.v1.ListTopicsRequest; -import apache.rocketmq.controller.v1.OpenStreamReply; import apache.rocketmq.controller.v1.OpenStreamRequest; +import apache.rocketmq.controller.v1.StreamDescription; +import apache.rocketmq.controller.v1.StreamMetadata; import apache.rocketmq.controller.v1.TerminateNodeReply; import apache.rocketmq.controller.v1.TerminateNodeRequest; import apache.rocketmq.controller.v1.Topic; @@ -44,6 +41,7 @@ import io.grpc.stub.StreamObserver; import java.io.Closeable; +import java.util.List; import java.util.concurrent.CompletableFuture; public interface ControllerClient extends Closeable { @@ -66,7 +64,7 @@ public interface ControllerClient extends Closeable { CompletableFuture notifyQueueClose(String target, long topicId, int queueId); - CompletableFuture createGroup(String target, CreateGroupRequest request); + CompletableFuture createGroup(String target, CreateGroupRequest request); CompletableFuture describeGroup(String target, String groupName); @@ -78,13 +76,13 @@ public interface ControllerClient extends Closeable { CompletableFuture commitOffset(String target, long groupId, long topicId, int queueId, long offset); - CompletableFuture openStream(String target, OpenStreamRequest request); + CompletableFuture openStream(String target, OpenStreamRequest request); - CompletableFuture closeStream(String target, CloseStreamRequest request); + CompletableFuture closeStream(String target, CloseStreamRequest request); - CompletableFuture listOpenStreams(String target, ListOpenStreamsRequest request); + CompletableFuture> listOpenStreams(String target, ListOpenStreamsRequest request); - CompletableFuture describeStream(String target, DescribeStreamRequest request); + CompletableFuture describeStream(String target, DescribeStreamRequest request); CompletableFuture updateTopic(String target, UpdateTopicRequest request); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/client/GrpcControllerClient.java b/controller/src/main/java/com/automq/rocketmq/controller/client/GrpcControllerClient.java index f478dc209..d29896a53 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/client/GrpcControllerClient.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/client/GrpcControllerClient.java @@ -58,6 +58,8 @@ import apache.rocketmq.controller.v1.OpenStreamRequest; import apache.rocketmq.controller.v1.ReassignMessageQueueReply; import apache.rocketmq.controller.v1.ReassignMessageQueueRequest; +import apache.rocketmq.controller.v1.StreamDescription; +import apache.rocketmq.controller.v1.StreamMetadata; import apache.rocketmq.controller.v1.TerminateNodeReply; import apache.rocketmq.controller.v1.TerminateNodeRequest; import apache.rocketmq.controller.v1.Topic; @@ -82,6 +84,7 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -469,7 +472,7 @@ public void onFailure(@Nonnull Throwable t) { } @Override - public CompletableFuture createGroup(String target, CreateGroupRequest request) { + public CompletableFuture createGroup(String target, CreateGroupRequest request) { ControllerServiceGrpc.ControllerServiceFutureStub stub; try { stub = getOrCreateStubForTarget(target); @@ -477,12 +480,12 @@ public CompletableFuture createGroup(String target, CreateGrou return CompletableFuture.failedFuture(e); } - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); Futures.addCallback(stub.createGroup(request), new FutureCallback<>() { @Override public void onSuccess(CreateGroupReply result) { switch (result.getStatus().getCode()) { - case OK -> future.complete(result); + case OK -> future.complete(result.getGroupId()); case DUPLICATED -> { LOGGER.info("Group name {} has been taken", request.getName()); ControllerException e = new ControllerException(result.getStatus().getCodeValue(), @@ -664,7 +667,7 @@ public void onFailure(@Nonnull Throwable t) { } @Override - public CompletableFuture openStream(String target, + public CompletableFuture openStream(String target, OpenStreamRequest request) { ControllerServiceGrpc.ControllerServiceFutureStub stub; try { @@ -673,11 +676,16 @@ public CompletableFuture openStream(String target, return CompletableFuture.failedFuture(e); } - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); Futures.addCallback(stub.openStream(request), new FutureCallback<>() { @Override public void onSuccess(OpenStreamReply result) { - future.complete(result); + if (result.getStatus().getCode() == Code.OK) { + future.complete(result.getStreamMetadata()); + } else { + future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(), + result.getStatus().getMessage())); + } } @Override @@ -690,7 +698,7 @@ public void onFailure(@Nonnull Throwable t) { } @Override - public CompletableFuture closeStream(String target, + public CompletableFuture closeStream(String target, CloseStreamRequest request) { ControllerServiceGrpc.ControllerServiceFutureStub stub; try { @@ -698,11 +706,16 @@ public CompletableFuture closeStream(String target, } catch (ControllerException e) { return CompletableFuture.failedFuture(e); } - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); Futures.addCallback(stub.closeStream(request), new FutureCallback<>() { @Override public void onSuccess(CloseStreamReply result) { - future.complete(result); + if (result.getStatus().getCode() == Code.OK) { + future.complete(null); + } else { + future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(), + result.getStatus().getMessage())); + } } @Override @@ -714,7 +727,7 @@ public void onFailure(@Nonnull Throwable t) { } @Override - public CompletableFuture listOpenStreams(String target, ListOpenStreamsRequest request) { + public CompletableFuture> listOpenStreams(String target, ListOpenStreamsRequest request) { ControllerServiceGrpc.ControllerServiceFutureStub stub; try { stub = getOrCreateStubForTarget(target); @@ -722,11 +735,16 @@ public CompletableFuture listOpenStreams(String target, Li return CompletableFuture.failedFuture(e); } - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture> future = new CompletableFuture<>(); Futures.addCallback(stub.listOpenStreams(request), new FutureCallback<>() { @Override public void onSuccess(ListOpenStreamsReply result) { - future.complete(result); + if (result.getStatus().getCode() == Code.OK) { + future.complete(result.getStreamMetadataList()); + } else { + future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(), + result.getStatus().getMessage())); + } } @Override @@ -738,18 +756,23 @@ public void onFailure(@Nonnull Throwable t) { } @Override - public CompletableFuture describeStream(String target, DescribeStreamRequest request) { + public CompletableFuture describeStream(String target, DescribeStreamRequest request) { ControllerServiceGrpc.ControllerServiceFutureStub stub; try { stub = getOrCreateStubForTarget(target); } catch (ControllerException e) { return CompletableFuture.failedFuture(e); } - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); Futures.addCallback(stub.describeStream(request), new FutureCallback<>() { @Override public void onSuccess(DescribeStreamReply result) { - future.complete(result); + if (result.getStatus().getCode() == Code.OK) { + future.complete(result.getDescription()); + } else { + future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(), + result.getStatus().getMessage())); + } } @Override diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/GroupManager.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/GroupManager.java index fae139c7d..c62c915cc 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/GroupManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/GroupManager.java @@ -118,16 +118,7 @@ public CompletableFuture createGroup(CreateGroupRequest request) { if (leaderAddress.isEmpty()) { return CompletableFuture.failedFuture(new ControllerException(Code.NO_LEADER_VALUE, "No leader is elected yet")); } - return metadataStore.controllerClient() - .createGroup(leaderAddress.get(), request) - .thenApply(reply -> { - if (reply.getStatus().getCode() == Code.OK) { - return reply.getGroupId(); - } else { - throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(), - reply.getStatus().getMessage())); - } - }); + return metadataStore.controllerClient().createGroup(leaderAddress.get(), request); } } } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java index 741a0d960..4a149a91b 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java @@ -25,6 +25,7 @@ import apache.rocketmq.controller.v1.ListOpenStreamsRequest; import apache.rocketmq.controller.v1.OpenStreamRequest; import apache.rocketmq.controller.v1.Status; +import apache.rocketmq.controller.v1.StreamDescription; import apache.rocketmq.controller.v1.StreamMetadata; import apache.rocketmq.controller.v1.StreamRole; import apache.rocketmq.controller.v1.StreamState; @@ -333,15 +334,7 @@ public CompletableFuture openStream(long streamId, long epoch, i .setStreamEpoch(epoch) .setBrokerId(nodeId) .build(); - return metadataStore.controllerClient() - .openStream(leaderAddress.get(), request) - .thenApply(reply -> { - if (reply.getStatus().getCode() == Code.OK) { - return reply.getStreamMetadata(); - } - throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(), - reply.getStatus().getMessage())); - }); + return metadataStore.controllerClient().openStream(leaderAddress.get(), request); } } } @@ -419,15 +412,7 @@ public CompletableFuture closeStream(long streamId, long streamEpoch, int .setStreamEpoch(streamEpoch) .setBrokerId(nodeId) .build(); - return metadataStore.controllerClient() - .closeStream(leaderAddress.get(), request) - .thenApply(reply -> { - if (reply.getStatus().getCode() == Code.OK) { - return null; - } - throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(), - reply.getStatus().getMessage())); - }); + return metadataStore.controllerClient().closeStream(leaderAddress.get(), request); } } return future; @@ -473,15 +458,7 @@ public CompletableFuture> listOpenStreams(int nodeId) { ListOpenStreamsRequest request = ListOpenStreamsRequest.newBuilder() .setBrokerId(nodeId) .build(); - return metadataStore.controllerClient() - .listOpenStreams(leaderAddress.get(), request) - .thenApply(reply -> { - if (reply.getStatus().getCode() == Code.OK) { - return reply.getStreamMetadataList(); - } - throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(), - reply.getStatus().getMessage())); - }); + return metadataStore.controllerClient().listOpenStreams(leaderAddress.get(), request); } } return future; @@ -527,15 +504,18 @@ public CompletableFuture describeStream(DescribeStreamReque if (endOffset.isPresent()) { streamBuilder.setEndOffset(endOffset.getAsLong()); } - builder.setStream(streamBuilder); - - builder.addAllRanges(ranges.stream().map(r -> apache.rocketmq.controller.v1.Range.newBuilder() - .setStreamId(r.getStreamId()) - .setStartOffset(r.getStartOffset()) - .setEndOffset(r.getEndOffset()) - .setBrokerId(r.getNodeId()) - .setEpoch(r.getEpoch()) - .build()).collect(Collectors.toList())); + + StreamDescription streamDescription = StreamDescription.newBuilder() + .setStream(streamBuilder) + .addAllRanges(ranges.stream().map(r -> apache.rocketmq.controller.v1.Range.newBuilder() + .setStreamId(r.getStreamId()) + .setStartOffset(r.getStartOffset()) + .setEndOffset(r.getEndOffset()) + .setBrokerId(r.getNodeId()) + .setEpoch(r.getEpoch()) + .build()).collect(Collectors.toList())) + .build(); + builder.setDescription(streamDescription); return builder.build(); } else { return DescribeStreamReply.newBuilder() diff --git a/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java index 0682fa8bc..06efb6447 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java @@ -19,12 +19,10 @@ import apache.rocketmq.controller.v1.AcceptTypes; import apache.rocketmq.controller.v1.AssignmentStatus; -import apache.rocketmq.controller.v1.CloseStreamReply; import apache.rocketmq.controller.v1.CloseStreamRequest; import apache.rocketmq.common.v1.Code; import apache.rocketmq.controller.v1.ConsumerGroup; import apache.rocketmq.controller.v1.ControllerServiceGrpc; -import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.CreateTopicReply; import apache.rocketmq.controller.v1.CreateTopicRequest; @@ -40,7 +38,6 @@ import apache.rocketmq.controller.v1.MessageType; import apache.rocketmq.controller.v1.NodeRegistrationReply; import apache.rocketmq.controller.v1.NodeRegistrationRequest; -import apache.rocketmq.controller.v1.OpenStreamReply; import apache.rocketmq.controller.v1.OpenStreamRequest; import apache.rocketmq.controller.v1.StreamMetadata; import apache.rocketmq.controller.v1.StreamRole; @@ -448,9 +445,8 @@ public void testCreateGroup() throws IOException, ExecutionException, Interrupte String target = String.format("localhost:%d", port); - CreateGroupReply reply = client.createGroup(target, request).get(); - Assertions.assertEquals(reply.getStatus().getCode(), Code.OK); - Assertions.assertTrue(reply.getGroupId() > 0); + long groupId = client.createGroup(target, request).get(); + Assertions.assertTrue(groupId > 0); // Test duplication client.createGroup(target, request).whenComplete( @@ -716,7 +712,7 @@ public void testOpenStream() throws IOException, ExecutionException, Interrupted } @Test - public void testOpenStream_NotFound() throws IOException, ExecutionException, InterruptedException { + public void testOpenStream_NotFound() throws IOException, InterruptedException { ControllerClient controllerClient = Mockito.mock(ControllerClient.class); long topicId = 1; @@ -769,14 +765,18 @@ public void testOpenStream_NotFound() throws IOException, ExecutionException, In .setStreamEpoch(1) .setBrokerEpoch(1) .build(); - OpenStreamReply reply = client.openStream(String.format("localhost:%d", port), request).get(); - Assertions.assertEquals(Code.NOT_FOUND, reply.getStatus().getCode()); + try { + client.openStream(String.format("localhost:%d", port), request).get(); + } catch (ExecutionException e) { + ControllerException cause = (ControllerException) e.getCause(); + Assertions.assertEquals(Code.NOT_FOUND_VALUE, cause.getErrorCode()); + } } } } @Test - public void testOpenStream_Fenced() throws IOException, ExecutionException, InterruptedException { + public void testOpenStream_Fenced() throws IOException, InterruptedException { ControllerClient controllerClient = Mockito.mock(ControllerClient.class); long topicId = 1; @@ -829,8 +829,12 @@ public void testOpenStream_Fenced() throws IOException, ExecutionException, Inte .setStreamEpoch(0) .setBrokerId(2) .build(); - OpenStreamReply reply = client.openStream(String.format("localhost:%d", port), request).get(); - Assertions.assertEquals(Code.FENCED, reply.getStatus().getCode()); + try { + client.openStream(String.format("localhost:%d", port), request).get(); + } catch (ExecutionException e) { + ControllerException cause = (ControllerException) e.getCause(); + Assertions.assertEquals(Code.FENCED_VALUE, cause.getErrorCode()); + } } } } @@ -907,7 +911,7 @@ public void testCloseStream() throws IOException, ExecutionException, Interrupte } @Test - public void testCloseStream_NotFound() throws IOException, ExecutionException, InterruptedException { + public void testCloseStream_NotFound() throws IOException, InterruptedException { ControllerClient controllerClient = Mockito.mock(ControllerClient.class); long topicId = 1; @@ -961,8 +965,12 @@ public void testCloseStream_NotFound() throws IOException, ExecutionException, I .setStreamEpoch(1) .setBrokerEpoch(1) .build(); - CloseStreamReply reply = client.closeStream(String.format("localhost:%d", port), request).get(); - Assertions.assertEquals(Code.NOT_FOUND, reply.getStatus().getCode()); + try { + client.closeStream(String.format("localhost:%d", port), request).get(); + } catch (ExecutionException e) { + ControllerException cause = (ControllerException) e.getCause(); + Assertions.assertEquals(Code.NOT_FOUND_VALUE, cause.getErrorCode()); + } } } } @@ -1010,7 +1018,7 @@ public void testListOpenStreams() throws IOException, ExecutionException, Interr } @Test - public void testCreateTopic_OpenStream_CloseStream() throws IOException, ExecutionException, InterruptedException, ControllerException { + public void testCreateTopic_OpenStream_CloseStream() throws IOException, ExecutionException, InterruptedException { ControllerClient controllerClient = Mockito.mock(ControllerClient.class); long streamId; @@ -1068,8 +1076,7 @@ public void testCreateTopic_OpenStream_CloseStream() throws IOException, Executi .setBrokerId(nodeId) .build(); - OpenStreamReply reply = client.openStream(String.format("localhost:%d", port), request).get(); - StreamMetadata openStream = reply.getStreamMetadata(); + StreamMetadata openStream = client.openStream(String.format("localhost:%d", port), request).get(); Assertions.assertEquals(0, openStream.getStartOffset()); Assertions.assertEquals(metadata.getEpoch() + 1, openStream.getEpoch()); Assertions.assertEquals(0, openStream.getRangeId()); diff --git a/proto/src/main/proto/controller/controller.proto b/proto/src/main/proto/controller/controller.proto index 71d74e79d..c5bf68d45 100644 --- a/proto/src/main/proto/controller/controller.proto +++ b/proto/src/main/proto/controller/controller.proto @@ -411,10 +411,14 @@ message DescribeStreamRequest { int64 stream_id = 2; } +message StreamDescription { + StreamMetadata stream = 1; + repeated Range ranges = 2; +} + message DescribeStreamReply { Status status = 1; - StreamMetadata stream = 2; - repeated Range ranges = 3; + StreamDescription description = 2; } service ControllerService {