diff --git a/proto/src/main/proto/proxy/proxy.proto b/proto/src/main/proto/proxy/proxy.proto index 743b43272..00a7979ea 100644 --- a/proto/src/main/proto/proxy/proxy.proto +++ b/proto/src/main/proto/proxy/proxy.proto @@ -136,7 +136,7 @@ message ConsumerClientConnection { message ConsumerClientConnectionReply { Status status = 1; - // Producer client connection + // consumer client connection repeated ConsumerClientConnection connection = 2; } @@ -165,6 +165,38 @@ message RelayReply { Status status = 1; } + + +message ConsumerConnectionRequest { + ProxyRequestContext context = 1; + // Consumer group name + string group = 2; +} + + +message ConsumerSubInfo { + // topic + string topic = 1; + // subExpression + string sub_expression = 2; +} + +message ConsumerGroupCliInfo { + string consume_type = 1; + string message_model = 2; + string consume_from_where = 3; + // consumer client connection + repeated ConsumerClientConnection connection = 4; + // consumer subscription info + repeated ConsumerSubInfo consumer_sub_info = 5; +} +message ConsumerConnectionReply { + Status status = 1; + // consumer group cli info + ConsumerGroupCliInfo consumer_group_cli_info = 2; +} + + service ProxyService { rpc resetConsumeOffset(ResetConsumeOffsetRequest) returns (ResetConsumeOffsetReply) {} rpc resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest) returns (ResetConsumeOffsetReply) {} @@ -172,4 +204,5 @@ service ProxyService { rpc producerClientConnection(ProducerClientConnectionRequest) returns (ProducerClientConnectionReply) {} rpc consumerClientConnection(ConsumerClientConnectionRequest) returns (ConsumerClientConnectionReply) {} rpc relay(RelayRequest) returns (RelayReply) {} + rpc consumerConnection(ConsumerConnectionRequest) returns (ConsumerConnectionReply) {} } \ No newline at end of file diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java index 97365c9a8..5a52e2d24 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java @@ -21,6 +21,10 @@ import apache.rocketmq.proxy.v1.ConsumerClientConnection; import apache.rocketmq.proxy.v1.ConsumerClientConnectionReply; import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest; +import apache.rocketmq.proxy.v1.ConsumerConnectionReply; +import apache.rocketmq.proxy.v1.ConsumerConnectionRequest; +import apache.rocketmq.proxy.v1.ConsumerGroupCliInfo; +import apache.rocketmq.proxy.v1.ConsumerSubInfo; import apache.rocketmq.proxy.v1.ProducerClientConnection; import apache.rocketmq.proxy.v1.ProducerClientConnectionReply; import apache.rocketmq.proxy.v1.ProducerClientConnectionRequest; @@ -52,6 +56,7 @@ import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.slf4j.Logger; public class ProxyServiceImpl extends ProxyServiceGrpc.ProxyServiceImplBase { @@ -241,4 +246,56 @@ public void relay(RelayRequest request, StreamObserver responseObser } } } + + @Override + public void consumerConnection(ConsumerConnectionRequest request, + StreamObserver responseObserver) { + ConsumerGroupInfo groupInfo = consumerManager.getConsumerGroupInfo(request.getGroup(), true); + if (groupInfo == null) { + responseObserver.onNext(ConsumerConnectionReply.newBuilder() + .setStatus(Status + .newBuilder() + .setCode(Code.BAD_REQUEST) + .setMessage("Consumer group not found: " + request.getGroup()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + ConsumerGroupCliInfo.Builder consumerBuilder = ConsumerGroupCliInfo.newBuilder(); + + consumerBuilder + .setConsumeType(groupInfo.getConsumeType().getTypeCN()) + .setMessageModel(groupInfo.getMessageModel().getModeCN()) + .setConsumeFromWhere(groupInfo.getConsumeFromWhere().name()); + + for (ClientChannelInfo info : groupInfo.getChannelInfoTable().values()) { + String protocolType = ChannelProtocolType.REMOTING.name(); + if (info.getChannel() instanceof GrpcClientChannel) { + protocolType = ChannelProtocolType.GRPC_V2.name(); + } + consumerBuilder.addConnection(ConsumerClientConnection.newBuilder() + .setClientId(info.getClientId()) + .setProtocol(protocolType) + .setAddress(NetworkUtil.socketAddress2String(info.getChannel().remoteAddress())) + .setLanguage(info.getLanguage().name()) + .setVersion(MQVersion.getVersionDesc(info.getVersion())) + .setLastUpdateTime(info.getLastUpdateTimestamp()) + .build()); + } + + for (SubscriptionData data : groupInfo.getSubscriptionTable().values()) { + consumerBuilder.addConsumerSubInfo(ConsumerSubInfo.newBuilder() + .setTopic(data.getTopic()) + .setSubExpression(data.getSubString()) + .build()); + } + ConsumerConnectionReply.Builder builder = ConsumerConnectionReply.newBuilder() + .setConsumerGroupCliInfo(consumerBuilder.build()); + + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } + } diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java index 942202a2a..5b0f1322d 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java @@ -18,6 +18,8 @@ package com.automq.rocketmq.proxy.grpc; import apache.rocketmq.common.v1.Code; +import apache.rocketmq.proxy.v1.ConsumerClientConnection; +import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest; import apache.rocketmq.proxy.v1.ProxyServiceGrpc; import apache.rocketmq.proxy.v1.Status; import com.automq.rocketmq.common.config.BrokerConfig; @@ -28,6 +30,7 @@ import com.automq.rocketmq.store.api.MessageStore; import com.automq.rocketmq.store.model.message.PutResult; import java.lang.reflect.Field; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.client.ConsumerManager; @@ -75,4 +78,14 @@ void relay() { Status status = proxyClient.relayMessage(TARGET, messageExt.message()).join(); assertEquals(Code.OK, status.getCode()); } + + + @Test + void ConsumerClientConnection() { + final String groupName = ""; + + ConsumerClientConnectionRequest request = ConsumerClientConnectionRequest.newBuilder().setGroup(groupName).build(); + List list = proxyClient.consumerClientConnection(TARGET, request).join(); + + } } \ No newline at end of file