Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(proxy): proxy message to the node assigned to its queue #812

Merged
merged 2 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
LockService lockService = new LockService(brokerConfig.proxy());

ProducerManager producerManager = new ProducerManager();
MessageServiceImpl messageServiceImpl = new MessageServiceImpl(brokerConfig.proxy(), messageStore, proxyMetadataService, lockService, dlqService, producerManager);
MessageServiceImpl messageServiceImpl = new MessageServiceImpl(brokerConfig, messageStore, proxyMetadataService, lockService, dlqService, producerManager, relayClient);
this.messageService = messageServiceImpl;
this.extendMessageService = messageServiceImpl;
ConsumerManager consumerManager = new ConsumerManager(new DefaultServiceManager.ConsumerIdsChangeListenerImpl(), brokerConfig.proxy().channelExpiredTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.Topic;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.common.system.MessageConstants;
import com.automq.rocketmq.common.trace.TraceContext;
Expand Down Expand Up @@ -66,24 +65,24 @@ public void init(MessageStore messageStore) {

@Override
@WithSpan
public CompletableFuture<Void> send(TraceContext context, long consumerGroupId, FlatMessageExt flatMessageExt) {
public CompletableFuture<Void> send(TraceContext context, long consumerGroupId, FlatMessage message) {
if (messageStore == null) {
return CompletableFuture.failedFuture(new IllegalStateException("Message store is not initialized"));
}

CompletableFuture<Topic> dlqQueryCf = metadataService.consumerGroupOf(consumerGroupId)
.thenComposeAsync(consumerGroup -> {
long deadLetterTopicId = consumerGroup.getDeadLetterTopicId();
long topicId = flatMessageExt.message().topicId();
long topicId = message.topicId();
if (deadLetterTopicId == MessageConstants.UNINITIALIZED_TOPIC_ID) {
// not allow to send to DLQ
LOGGER.warn("Message: {} is dropped because the consumer group: {} doesn't have DLQ topic",
flatMessageExt, consumerGroupId);
message.systemProperties().messageId(), consumerGroupId);
return CompletableFuture.completedFuture(null);
}
if (deadLetterTopicId == topicId) {
LOGGER.error("Message: {} is dropped because the consumer group: {} has the same DLQ topic: {} with original topic",
flatMessageExt, consumerGroupId, topicId);
message.systemProperties().messageId(), consumerGroupId, topicId);
return CompletableFuture.completedFuture(null);
}
// get dlq topic info
Expand All @@ -98,17 +97,16 @@ public CompletableFuture<Void> send(TraceContext context, long consumerGroupId,
if (!(dlqTopic.getAcceptTypes().getTypesList().contains(MessageType.NORMAL)
|| dlqTopic.getAcceptTypes().getTypesList().contains(MessageType.FIFO))) {
LOGGER.error("Message: {} is dropped because the consumer group: {} has invalid DLQ topic: {}",
flatMessageExt, consumerGroupId, dlqTopic);
message.systemProperties().messageId(), consumerGroupId, dlqTopic);
return CompletableFuture.completedFuture(null);
}

FlatMessage message = flatMessageExt.message();
message.mutateTopicId(dlqTopic.getTopicId());

List<MessageQueueAssignment> assignmentsList = new ArrayList<>(dlqTopic.getAssignmentsList());
if (assignmentsList.isEmpty()) {
LOGGER.error("Message: {} is dropped because the consumer group: {} has empty DLQ topic: {}",
flatMessageExt, consumerGroupId, dlqTopic);
message.systemProperties().messageId(), consumerGroupId, dlqTopic);
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.MessageQueueAssignment;
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.SubscriptionMode;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.proxy.v1.QueueStats;
import apache.rocketmq.proxy.v1.StreamStats;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.config.ProxyConfig;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.common.util.CommonUtil;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.proxy.exception.ProxyException;
import com.automq.rocketmq.proxy.grpc.ProxyClient;
import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
import com.automq.rocketmq.proxy.model.VirtualQueue;
Expand Down Expand Up @@ -118,26 +121,30 @@

public class MessageServiceImpl implements MessageService, ExtendMessageService {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);
private final BrokerConfig brokerConfig;
private final ProxyConfig config;
private final ProxyMetadataService metadataService;
private final MessageStore store;
private final LockService lockService;
private final DeadLetterSender deadLetterService;
private final SuspendRequestService suspendRequestService;
private final ProducerManager producerManager;
private final ProxyClient relayClient;
private final ExecutorService executorService = ThreadPoolMonitor.createAndMonitor(2, 5, 100, TimeUnit.SECONDS,
"Transaction-msg-check-thread", 2000);

public MessageServiceImpl(ProxyConfig config, MessageStore store, ProxyMetadataService metadataService,
LockService lockService, DeadLetterSender deadLetterService,
ProducerManager producerManager) throws StoreException {
this.config = config;
public MessageServiceImpl(BrokerConfig config, MessageStore store, ProxyMetadataService metadataService,
LockService lockService, DeadLetterSender deadLetterService, ProducerManager producerManager,
ProxyClient relayClient) throws StoreException {
this.brokerConfig = config;
this.config = config.proxy();
this.store = store;
this.metadataService = metadataService;
this.deadLetterService = deadLetterService;
this.lockService = lockService;
this.suspendRequestService = SuspendRequestService.getInstance();
this.producerManager = producerManager;
this.relayClient = relayClient;
store.registerTransactionCheckHandler(timerTag -> executorService.execute(() -> {
try {
checkTransactionStatus(timerTag);
Expand Down Expand Up @@ -187,20 +194,24 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx,
ProxyContextExt contextExt = (ProxyContextExt) ctx;
FlatMessage flatMessage = FlatMessageUtil.convertTo(contextExt, topic.getTopicId(), virtualQueue.physicalQueueId(), config.hostName(), message);

Optional<MessageQueueAssignment> optional = topic.getAssignmentsList().stream().filter(assignment -> assignment.getQueue().getQueueId() == flatMessage.queueId()).findFirst();
if (optional.isEmpty()) {
LOGGER.error("Message: {} is dropped because the topic: {} doesn't have queue: {}",
messageId, topic.getName(), flatMessage.queueId());
return CompletableFuture.failedFuture(new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Queue " + flatMessage.queueId() + " is not assigned to any node."));
}
MessageQueueAssignment assignment = optional.get();

if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
flatMessage.systemProperties().mutateDeliveryAttempts(requestHeader.getReconsumeTimes() + 1);
if (requestHeader.getReconsumeTimes() > requestHeader.getMaxReconsumeTimes()) {
String groupName = requestHeader.getTopic().replace(MixAll.RETRY_GROUP_TOPIC_PREFIX, "");
FlatMessageExt flatMessageExt = FlatMessageExt.Builder.builder()
.message(flatMessage)
.offset(0)
.build();
contextExt.span().ifPresent(span -> {
span.setAttribute("deadLetter", true);
span.setAttribute("group", groupName);
});
return consumerGroupOf(groupName)
.thenCompose(group -> deadLetterService.send(contextExt, group.getGroupId(), flatMessageExt))
.thenCompose(group -> deadLetterService.send(contextExt, group.getGroupId(), flatMessage))
.thenApply(ignore -> new PutResult(PutResult.Status.PUT_OK, 0));
} else {
String groupName = requestHeader.getTopic().replace(MixAll.RETRY_GROUP_TOPIC_PREFIX, "");
Expand All @@ -210,10 +221,20 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx,
span.setAttribute("reconsumeTimes", requestHeader.getReconsumeTimes());
span.setAttribute("deliveryTimestamp", flatMessage.systemProperties().deliveryTimestamp());
});
if (assignment.getNodeId() != brokerConfig.nodeId()) {
return metadataService.addressOf(assignment.getNodeId())
.thenCompose(address -> relayClient.relayMessage(address, flatMessage))
.thenApply(status -> new PutResult(PutResult.Status.PUT_OK, 0));
}
return store.put(ContextUtil.buildStoreContext(ctx, topic.getName(), groupName), flatMessage);
}
}

if (assignment.getNodeId() != brokerConfig.nodeId()) {
return metadataService.addressOf(assignment.getNodeId())
.thenCompose(address -> relayClient.relayMessage(address, flatMessage))
.thenApply(status -> new PutResult(PutResult.Status.PUT_OK, 0));
}
return store.put(ContextUtil.buildStoreContext(ctx, topic.getName(), ""), flatMessage);
});

Expand Down Expand Up @@ -274,15 +295,16 @@ public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext ctx, Rece
throw new ProxyException(apache.rocketmq.v2.Code.MESSAGE_NOT_FOUND, "Message not found from server.");
}).thenCompose(messageExt -> {
if (messageExt.deliveryAttempts() > group.getMaxDeliveryAttempt()) {
return deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt);
return deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt.message());
}

// Message consume retry strategy
// <0: no retry,put into DLQ directly
// =0: broker control retry frequency
// >0: client control retry frequency
return switch (Integer.compare(delayLevel, 0)) {
case -1 -> deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt);
case -1 ->
deadLetterService.send((ProxyContextExt) ctx, group.getGroupId(), messageExt.message());
case 0 -> topicOf(MixAll.RETRY_GROUP_TOPIC_PREFIX + requestHeader.getGroup())
.thenCompose(retryTopic -> {
// Keep the same logic as apache RocketMQ.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void send_normal() {
return CompletableFuture.completedFuture(new PutResult(PutResult.Status.PUT_OK, 0));
}).when(messageStore).put(Mockito.any(), Mockito.any(FlatMessage.class));

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(1)).put(Mockito.any(), Mockito.any(FlatMessage.class));
}

Expand All @@ -132,7 +132,7 @@ public void send_group_not_allowed_dlq() {

FlatMessageExt msg = MockMessageUtil.buildMessage(TOPIC_ID, QUEUE_ID, "TAG_DLQ");

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));

// 2. DLQ topic is configured but has no assignment
Expand All @@ -147,7 +147,7 @@ public void send_group_not_allowed_dlq() {
Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup))
.when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID);

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));

// 3. DLQ topic is the same as original topic
Expand All @@ -163,7 +163,7 @@ public void send_group_not_allowed_dlq() {
Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup))
.when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID);

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));

// 4. DLQ topic doesn't accept DLQ message
Expand All @@ -187,7 +187,7 @@ public void send_group_not_allowed_dlq() {
Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup))
.when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID);

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));

// 5. DLQ topic not exist
Expand All @@ -203,7 +203,7 @@ public void send_group_not_allowed_dlq() {
Mockito.doReturn(CompletableFuture.completedFuture(consumerGroup))
.when(metadataService).consumerGroupOf(CONSUMER_GROUP_ID);

dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg).join();
dlqService.send(StoreContext.EMPTY, CONSUMER_GROUP_ID, msg.message()).join();
Mockito.verify(messageStore, Mockito.times(0)).put(Mockito.any(), Mockito.any(FlatMessage.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package com.automq.rocketmq.proxy.service;

import apache.rocketmq.v2.Code;
import com.automq.rocketmq.common.config.ProxyConfig;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.proxy.exception.ProxyException;
import com.automq.rocketmq.proxy.grpc.client.GrpcProxyClient;
import com.automq.rocketmq.proxy.mock.MockMessageStore;
import com.automq.rocketmq.proxy.mock.MockProxyMetadataService;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
Expand Down Expand Up @@ -100,10 +101,10 @@ public static void setUpAll() throws Exception {
public void setUp() throws StoreException {
metadataService = new MockProxyMetadataService();
messageStore = new MockMessageStore();
ProxyConfig config = new ProxyConfig();
BrokerConfig config = new BrokerConfig();
deadLetterSender = Mockito.mock(DeadLetterSender.class);
Mockito.doReturn(CompletableFuture.completedFuture(null)).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any());
messageService = new MessageServiceImpl(config, messageStore, metadataService, new LockService(config), deadLetterSender, new ProducerManager());
messageService = new MessageServiceImpl(config, messageStore, metadataService, new LockService(config.proxy()), deadLetterSender, new ProducerManager(), new GrpcProxyClient(config));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.store.api;

import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.common.trace.TraceContext;
import java.util.concurrent.CompletableFuture;

Expand All @@ -27,7 +27,7 @@ public interface DeadLetterSender {
*
* @param context trace context
* @param consumerGroupId consumer group id
* @param originalFlatMessage original message
* @param message original message
*/
CompletableFuture<Void> send(TraceContext context, long consumerGroupId, FlatMessageExt originalFlatMessage);
CompletableFuture<Void> send(TraceContext context, long consumerGroupId, FlatMessage message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void run() {
if (consumeTimes >= maxDeliveryAttempts) {
messageExt.setDeliveryAttempts(consumeTimes);
// Send to dead letter topic specified in consumer group config.
return deadLetterSender.send(context, consumerGroupId, messageExt)
return deadLetterSender.send(context, consumerGroupId, messageExt.message())
.thenApply(nil -> Pair.of(true, logicQueue));
}
return CompletableFuture.completedFuture(Pair.of(false, logicQueue));
Expand All @@ -203,7 +203,7 @@ public void run() {

if (messageExt.deliveryAttempts() >= maxDeliveryAttempts) {
// Send to dead letter topic specified in consumer group config.
return deadLetterSender.send(context, consumerGroupId, messageExt)
return deadLetterSender.send(context, consumerGroupId, messageExt.message())
.thenApply(nil -> Pair.of(true, logicQueue));
}
messageExt.setOriginalQueueOffset(messageExt.originalOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void setUp() throws Exception {
logicQueueManager = new DefaultLogicQueueManager(config, streamStore, kvService, timerService, metadataService, operationLogService, inflightService, streamReclaimService);
DeadLetterSender deadLetterSender = Mockito.mock(DeadLetterSender.class);
Mockito.doReturn(CompletableFuture.completedFuture(null))
.when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessageExt.class));
.when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class));
MessageArrivalNotificationService messageArrivalNotificationService = new MessageArrivalNotificationService();
reviveService = new ReviveService(KV_NAMESPACE_CHECK_POINT, kvService, timerService, metadataService, messageArrivalNotificationService,
logicQueueManager, deadLetterSender);
Expand Down
Loading