Skip to content

Commit

Permalink
fix: unify controller error handling while forwarding requests to lea…
Browse files Browse the repository at this point in the history
…der node (#758)

Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 29, 2023
1 parent ba18c3c commit 3954f7a
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.SubscriptionMode;
Expand Down Expand Up @@ -181,7 +180,7 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException {
.setSubMode(SubscriptionMode.SUB_MODE_POP)
.build();

CompletableFuture<CreateGroupReply> groupCf = client.createGroup(mqAdmin.getEndpoint(), request);
CompletableFuture<Long> groupCf = client.createGroup(mqAdmin.getEndpoint(), request);
groupCf = groupCf.exceptionally(throwable -> {
Throwable t = CliUtils.getRealException(throwable);
if (t instanceof ControllerException controllerException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.automq.rocketmq.cli.consumer;

import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.SubscriptionMode;
Expand Down Expand Up @@ -55,8 +54,8 @@ public Void call() throws Exception {
.setSubMode(subMode)
.build();

CreateGroupReply groupReply = client.createGroup(mqAdmin.getEndpoint(), request).join();
System.out.println("Group created: " + groupReply.getGroupId());
long groupId = client.createGroup(mqAdmin.getEndpoint(), request).join();
System.out.println("Group created: " + groupId);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package com.automq.rocketmq.cli.stream;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.DescribeStreamReply;
import apache.rocketmq.controller.v1.DescribeStreamRequest;
import apache.rocketmq.controller.v1.StreamDescription;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.ConsoleHelper;
import com.automq.rocketmq.cli.MQAdmin;
Expand All @@ -43,12 +42,8 @@ public Void call() throws Exception {
DescribeStreamRequest request = DescribeStreamRequest.newBuilder()
.setStreamId(streamId)
.build();
DescribeStreamReply reply = client.describeStream(mqAdmin.getEndpoint(), request).join();
if (reply.getStatus().getCode() == Code.OK) {
ConsoleHelper.printStream(reply.getStream(), reply.getRangesList());
} else {
System.err.println(reply.getStatus().getMessage());
}
StreamDescription description = client.describeStream(mqAdmin.getEndpoint(), request).join();
ConsoleHelper.printStream(description.getStream(), description.getRangesList());
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,20 @@

package com.automq.rocketmq.controller;

import apache.rocketmq.controller.v1.CloseStreamReply;
import apache.rocketmq.controller.v1.CloseStreamRequest;
import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.DescribeClusterRequest;
import apache.rocketmq.controller.v1.DescribeStreamReply;
import apache.rocketmq.controller.v1.DescribeStreamRequest;
import apache.rocketmq.controller.v1.ListGroupReply;
import apache.rocketmq.controller.v1.ListGroupRequest;
import apache.rocketmq.controller.v1.ListOpenStreamsReply;
import apache.rocketmq.controller.v1.ListOpenStreamsRequest;
import apache.rocketmq.controller.v1.ListTopicsReply;
import apache.rocketmq.controller.v1.ListTopicsRequest;
import apache.rocketmq.controller.v1.OpenStreamReply;
import apache.rocketmq.controller.v1.OpenStreamRequest;
import apache.rocketmq.controller.v1.StreamDescription;
import apache.rocketmq.controller.v1.StreamMetadata;
import apache.rocketmq.controller.v1.TerminateNodeReply;
import apache.rocketmq.controller.v1.TerminateNodeRequest;
import apache.rocketmq.controller.v1.Topic;
Expand All @@ -44,6 +41,7 @@

import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public interface ControllerClient extends Closeable {
Expand All @@ -66,7 +64,7 @@ public interface ControllerClient extends Closeable {

CompletableFuture<Void> notifyQueueClose(String target, long topicId, int queueId);

CompletableFuture<CreateGroupReply> createGroup(String target, CreateGroupRequest request);
CompletableFuture<Long> createGroup(String target, CreateGroupRequest request);

CompletableFuture<ConsumerGroup> describeGroup(String target, String groupName);

Expand All @@ -78,13 +76,13 @@ public interface ControllerClient extends Closeable {

CompletableFuture<Void> commitOffset(String target, long groupId, long topicId, int queueId, long offset);

CompletableFuture<OpenStreamReply> openStream(String target, OpenStreamRequest request);
CompletableFuture<StreamMetadata> openStream(String target, OpenStreamRequest request);

CompletableFuture<CloseStreamReply> closeStream(String target, CloseStreamRequest request);
CompletableFuture<Void> closeStream(String target, CloseStreamRequest request);

CompletableFuture<ListOpenStreamsReply> listOpenStreams(String target, ListOpenStreamsRequest request);
CompletableFuture<List<StreamMetadata>> listOpenStreams(String target, ListOpenStreamsRequest request);

CompletableFuture<DescribeStreamReply> describeStream(String target, DescribeStreamRequest request);
CompletableFuture<StreamDescription> describeStream(String target, DescribeStreamRequest request);

CompletableFuture<Topic> updateTopic(String target, UpdateTopicRequest request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import apache.rocketmq.controller.v1.OpenStreamRequest;
import apache.rocketmq.controller.v1.ReassignMessageQueueReply;
import apache.rocketmq.controller.v1.ReassignMessageQueueRequest;
import apache.rocketmq.controller.v1.StreamDescription;
import apache.rocketmq.controller.v1.StreamMetadata;
import apache.rocketmq.controller.v1.TerminateNodeReply;
import apache.rocketmq.controller.v1.TerminateNodeRequest;
import apache.rocketmq.controller.v1.Topic;
Expand All @@ -82,6 +84,7 @@

import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -469,20 +472,20 @@ public void onFailure(@Nonnull Throwable t) {
}

@Override
public CompletableFuture<CreateGroupReply> createGroup(String target, CreateGroupRequest request) {
public CompletableFuture<Long> createGroup(String target, CreateGroupRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<CreateGroupReply> future = new CompletableFuture<>();
CompletableFuture<Long> future = new CompletableFuture<>();
Futures.addCallback(stub.createGroup(request), new FutureCallback<>() {
@Override
public void onSuccess(CreateGroupReply result) {
switch (result.getStatus().getCode()) {
case OK -> future.complete(result);
case OK -> future.complete(result.getGroupId());
case DUPLICATED -> {
LOGGER.info("Group name {} has been taken", request.getName());
ControllerException e = new ControllerException(result.getStatus().getCodeValue(),
Expand Down Expand Up @@ -664,7 +667,7 @@ public void onFailure(@Nonnull Throwable t) {
}

@Override
public CompletableFuture<OpenStreamReply> openStream(String target,
public CompletableFuture<StreamMetadata> openStream(String target,
OpenStreamRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
Expand All @@ -673,11 +676,16 @@ public CompletableFuture<OpenStreamReply> openStream(String target,
return CompletableFuture.failedFuture(e);
}

CompletableFuture<OpenStreamReply> future = new CompletableFuture<>();
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
Futures.addCallback(stub.openStream(request), new FutureCallback<>() {
@Override
public void onSuccess(OpenStreamReply result) {
future.complete(result);
if (result.getStatus().getCode() == Code.OK) {
future.complete(result.getStreamMetadata());
} else {
future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(),
result.getStatus().getMessage()));
}
}

@Override
Expand All @@ -690,19 +698,24 @@ public void onFailure(@Nonnull Throwable t) {
}

@Override
public CompletableFuture<CloseStreamReply> closeStream(String target,
public CompletableFuture<Void> closeStream(String target,
CloseStreamRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}
CompletableFuture<CloseStreamReply> future = new CompletableFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();
Futures.addCallback(stub.closeStream(request), new FutureCallback<>() {
@Override
public void onSuccess(CloseStreamReply result) {
future.complete(result);
if (result.getStatus().getCode() == Code.OK) {
future.complete(null);
} else {
future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(),
result.getStatus().getMessage()));
}
}

@Override
Expand All @@ -714,19 +727,24 @@ public void onFailure(@Nonnull Throwable t) {
}

@Override
public CompletableFuture<ListOpenStreamsReply> listOpenStreams(String target, ListOpenStreamsRequest request) {
public CompletableFuture<List<StreamMetadata>> listOpenStreams(String target, ListOpenStreamsRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<ListOpenStreamsReply> future = new CompletableFuture<>();
CompletableFuture<List<StreamMetadata>> future = new CompletableFuture<>();
Futures.addCallback(stub.listOpenStreams(request), new FutureCallback<>() {
@Override
public void onSuccess(ListOpenStreamsReply result) {
future.complete(result);
if (result.getStatus().getCode() == Code.OK) {
future.complete(result.getStreamMetadataList());
} else {
future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(),
result.getStatus().getMessage()));
}
}

@Override
Expand All @@ -738,18 +756,23 @@ public void onFailure(@Nonnull Throwable t) {
}

@Override
public CompletableFuture<DescribeStreamReply> describeStream(String target, DescribeStreamRequest request) {
public CompletableFuture<StreamDescription> describeStream(String target, DescribeStreamRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}
CompletableFuture<DescribeStreamReply> future = new CompletableFuture<>();
CompletableFuture<StreamDescription> future = new CompletableFuture<>();
Futures.addCallback(stub.describeStream(request), new FutureCallback<>() {
@Override
public void onSuccess(DescribeStreamReply result) {
future.complete(result);
if (result.getStatus().getCode() == Code.OK) {
future.complete(result.getDescription());
} else {
future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(),
result.getStatus().getMessage()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,7 @@ public CompletableFuture<Long> createGroup(CreateGroupRequest request) {
if (leaderAddress.isEmpty()) {
return CompletableFuture.failedFuture(new ControllerException(Code.NO_LEADER_VALUE, "No leader is elected yet"));
}
return metadataStore.controllerClient()
.createGroup(leaderAddress.get(), request)
.thenApply(reply -> {
if (reply.getStatus().getCode() == Code.OK) {
return reply.getGroupId();
} else {
throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(),
reply.getStatus().getMessage()));
}
});
return metadataStore.controllerClient().createGroup(leaderAddress.get(), request);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import apache.rocketmq.controller.v1.ListOpenStreamsRequest;
import apache.rocketmq.controller.v1.OpenStreamRequest;
import apache.rocketmq.controller.v1.Status;
import apache.rocketmq.controller.v1.StreamDescription;
import apache.rocketmq.controller.v1.StreamMetadata;
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.StreamState;
Expand Down Expand Up @@ -333,15 +334,7 @@ public CompletableFuture<StreamMetadata> openStream(long streamId, long epoch, i
.setStreamEpoch(epoch)
.setBrokerId(nodeId)
.build();
return metadataStore.controllerClient()
.openStream(leaderAddress.get(), request)
.thenApply(reply -> {
if (reply.getStatus().getCode() == Code.OK) {
return reply.getStreamMetadata();
}
throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(),
reply.getStatus().getMessage()));
});
return metadataStore.controllerClient().openStream(leaderAddress.get(), request);
}
}
}
Expand Down Expand Up @@ -419,15 +412,7 @@ public CompletableFuture<Void> closeStream(long streamId, long streamEpoch, int
.setStreamEpoch(streamEpoch)
.setBrokerId(nodeId)
.build();
return metadataStore.controllerClient()
.closeStream(leaderAddress.get(), request)
.thenApply(reply -> {
if (reply.getStatus().getCode() == Code.OK) {
return null;
}
throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(),
reply.getStatus().getMessage()));
});
return metadataStore.controllerClient().closeStream(leaderAddress.get(), request);
}
}
return future;
Expand Down Expand Up @@ -473,15 +458,7 @@ public CompletableFuture<List<StreamMetadata>> listOpenStreams(int nodeId) {
ListOpenStreamsRequest request = ListOpenStreamsRequest.newBuilder()
.setBrokerId(nodeId)
.build();
return metadataStore.controllerClient()
.listOpenStreams(leaderAddress.get(), request)
.thenApply(reply -> {
if (reply.getStatus().getCode() == Code.OK) {
return reply.getStreamMetadataList();
}
throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(),
reply.getStatus().getMessage()));
});
return metadataStore.controllerClient().listOpenStreams(leaderAddress.get(), request);
}
}
return future;
Expand Down Expand Up @@ -527,15 +504,18 @@ public CompletableFuture<DescribeStreamReply> describeStream(DescribeStreamReque
if (endOffset.isPresent()) {
streamBuilder.setEndOffset(endOffset.getAsLong());
}
builder.setStream(streamBuilder);

builder.addAllRanges(ranges.stream().map(r -> apache.rocketmq.controller.v1.Range.newBuilder()
.setStreamId(r.getStreamId())
.setStartOffset(r.getStartOffset())
.setEndOffset(r.getEndOffset())
.setBrokerId(r.getNodeId())
.setEpoch(r.getEpoch())
.build()).collect(Collectors.toList()));

StreamDescription streamDescription = StreamDescription.newBuilder()
.setStream(streamBuilder)
.addAllRanges(ranges.stream().map(r -> apache.rocketmq.controller.v1.Range.newBuilder()
.setStreamId(r.getStreamId())
.setStartOffset(r.getStartOffset())
.setEndOffset(r.getEndOffset())
.setBrokerId(r.getNodeId())
.setEpoch(r.getEpoch())
.build()).collect(Collectors.toList()))
.build();
builder.setDescription(streamDescription);
return builder.build();
} else {
return DescribeStreamReply.newBuilder()
Expand Down
Loading

0 comments on commit 3954f7a

Please sign in to comment.