Skip to content

Commit

Permalink
Merge pull request #5 from apache/master
Browse files Browse the repository at this point in the history
from apache
  • Loading branch information
francisoliverlee authored Sep 4, 2024
2 parents ba17d1f + 447cf73 commit 6afaab0
Show file tree
Hide file tree
Showing 24 changed files with 515 additions and 82 deletions.
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache RocketMQ
Copyright 2016-2022 The Apache Software Foundation
Copyright 2016-2024 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
6 changes: 6 additions & 0 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func WithNamespace(namespace string) AdminOption {
}
}

func WithTls(useTls bool) AdminOption {
return func(options *adminOptions) {
options.ClientOptions.RemotingClientConfig.UseTls = useTls
}
}

type admin struct {
cli internal.RMQClient

Expand Down
110 changes: 105 additions & 5 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package consumer
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -57,6 +58,9 @@ const (

// Offset persistent interval for consumer
_PersistConsumerOffsetInterval = 5 * time.Second

// Timeout for sending message to retry topic
_SendMessageBackAsNormalTimeout = 3 * time.Second
)

type ConsumeType string
Expand All @@ -66,6 +70,8 @@ const (
_PushConsume = ConsumeType("CONSUME_PASSIVELY")

_SubAll = "*"

_ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
)

// Message model defines the way how messages are delivered to each consumer clients.
Expand Down Expand Up @@ -883,6 +889,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa
SubExpression: data.SubString,
// TODO: add subversion
ExpressionType: string(data.ExpType),
BrokerName: queue.BrokerName,
}

if data.ExpType == string(TAG) {
Expand Down Expand Up @@ -993,8 +1000,9 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er
}

request := &internal.GetMaxOffsetRequestHeader{
Topic: mq.Topic,
QueueId: mq.QueueId,
Topic: mq.Topic,
QueueId: mq.QueueId,
BrokerName: mq.BrokerName,
}

cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
Expand Down Expand Up @@ -1023,9 +1031,10 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t
}

request := &internal.SearchOffsetRequestHeader{
Topic: mq.Topic,
QueueId: mq.QueueId,
Timestamp: timestamp,
Topic: mq.Topic,
QueueId: mq.QueueId,
Timestamp: timestamp,
BrokerName: mq.BrokerName,
}

cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
Expand All @@ -1037,6 +1046,97 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t
return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
}

func (dc *defaultConsumer) sendMessageBackAsNormal(msg *primitive.MessageExt, maxReconsumeTimes int32) bool {
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
normalMsg := &primitive.Message{
Topic: retryTopic,
Body: msg.Body,
Flag: msg.Flag,
}
normalMsg.WithProperties(msg.GetProperties())
originMsgId := msg.GetProperty(primitive.PropertyOriginMessageId)
if len(originMsgId) == 0 {
originMsgId = msg.MsgId
}
normalMsg.WithProperty(primitive.PropertyOriginMessageId, originMsgId)
normalMsg.WithProperty(primitive.PropertyRetryTopic, msg.Topic)
normalMsg.RemoveProperty(primitive.PropertyTransactionPrepared)
normalMsg.WithDelayTimeLevel(int(3 + msg.ReconsumeTimes))

mq, err := dc.findPublishMessageQueue(retryTopic)
if err != nil {
rlog.Warning("sendMessageBackAsNormal find publish message queue error", map[string]interface{}{
rlog.LogKeyTopic: retryTopic,
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyUnderlayError: err.Error(),
})
return false
}

brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if len(brokerAddr) == 0 {
rlog.Warning("sendMessageBackAsNormal cannot find broker address", map[string]interface{}{
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyBroker: mq.BrokerName,
rlog.LogKeyUnderlayError: err.Error(),
})
return false
}

request := buildSendToRetryRequest(mq, normalMsg, msg.ReconsumeTimes+1, maxReconsumeTimes)
resp, err := dc.client.InvokeSync(context.Background(), brokerAddr, request, _SendMessageBackAsNormalTimeout)
if err != nil {
rlog.Warning("sendMessageBackAsNormal failed to invoke", map[string]interface{}{
rlog.LogKeyTopic: retryTopic,
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyBroker: brokerAddr,
rlog.LogKeyUnderlayError: err.Error(),
})
return false
}
if resp.Code != internal.ResSuccess {
rlog.Warning("sendMessageBackAsNormal failed to send", map[string]interface{}{
rlog.LogKeyTopic: retryTopic,
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyBroker: brokerAddr,
rlog.LogKeyUnderlayError: fmt.Errorf("CODE: %d, DESC: %s", resp.Code, resp.Remark),
})
return false
}

return true
}

func (dc *defaultConsumer) findPublishMessageQueue(topic string) (*primitive.MessageQueue, error) {
mqs, err := dc.client.GetNameSrv().FetchPublishMessageQueues(topic)
if err != nil {
return nil, err
}

if len(mqs) <= 0 {
return nil, fmt.Errorf("no writable queues")
}

return mqs[rand.Intn(len(mqs))], nil
}

func buildSendToRetryRequest(mq *primitive.MessageQueue, msg *primitive.Message, reconsumeTimes,
maxReconsumeTimes int32) *remote.RemotingCommand {
req := &internal.SendMessageRequestHeader{
ProducerGroup: _ClientInnerProducerGroup,
Topic: mq.Topic,
QueueId: mq.QueueId,
BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
Flag: msg.Flag,
Properties: msg.MarshallProperties(),
ReconsumeTimes: int(reconsumeTimes),
MaxReconsumeTimes: int(maxReconsumeTimes),
BrokerName: mq.BrokerName,
}

return remote.NewRemotingCommand(internal.ReqSendMessage, req, msg.Body)
}

func buildSubscriptionData(topic string, selector MessageSelector) *internal.SubscriptionData {
subData := &internal.SubscriptionData{
Topic: topic,
Expand Down
2 changes: 2 additions & 0 deletions consumer/offset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq
ConsumerGroup: group,
Topic: mq.Topic,
QueueId: mq.QueueId,
BrokerName: mq.BrokerName,
}
cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
Expand Down Expand Up @@ -429,6 +430,7 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq p
Topic: mq.Topic,
QueueId: mq.QueueId,
CommitOffset: off,
BrokerName: mq.BrokerName,
}
cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
Expand Down
27 changes: 21 additions & 6 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ type consumerOptions struct {
// Concurrently max span offset.it has no effect on sequential consumption
ConsumeConcurrentlyMaxSpan int

// Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
// Flow control threshold on queue level, each message queue will cache at most 1024 messages by default,
// Consider the {PullBatchSize}, the instantaneous value may exceed the limit
PullThresholdForQueue int64
PullThresholdForQueue atomic.Int64

// Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
// Limit the cached message size on queue level, each message queue will cache at most 512 MiB messages by default,
// Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
//
// The size of a message only measured by message body, so it's not accurate
PullThresholdSizeForQueue int
PullThresholdSizeForQueue atomic.Int32

// Flow control threshold on topic level, default value is -1(Unlimited)
//
Expand Down Expand Up @@ -198,13 +198,13 @@ func WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option {

func WithPullThresholdForQueue(pullThresholdForQueue int64) Option {
return func(options *consumerOptions) {
options.PullThresholdForQueue = pullThresholdForQueue
options.PullThresholdForQueue.Store(pullThresholdForQueue)
}
}

func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option {
return func(options *consumerOptions) {
options.PullThresholdSizeForQueue = pullThresholdSizeForQueue
options.PullThresholdSizeForQueue.Store(int32(pullThresholdSizeForQueue))
}
}

Expand Down Expand Up @@ -381,3 +381,18 @@ func WithLimiter(limiter Limiter) Option {
opts.Limiter = limiter
}
}

// WithRemotingTimeout set remote client timeout options
func WithRemotingTimeout(connectionTimeout, readTimeout, writeTimeout time.Duration) Option {
return func(opts *consumerOptions) {
opts.ClientOptions.RemotingClientConfig.ConnectionTimeout = connectionTimeout
opts.ClientOptions.RemotingClientConfig.ReadTimeout = readTimeout
opts.ClientOptions.RemotingClientConfig.WriteTimeout = writeTimeout
}
}

func WithTls(useTls bool) Option {
return func(opts *consumerOptions) {
opts.ClientOptions.RemotingClientConfig.UseTls = useTls
}
}
15 changes: 15 additions & 0 deletions consumer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consumer
import (
"reflect"
"testing"
"time"
)

func getFieldString(obj interface{}, field string) string {
Expand All @@ -12,6 +13,20 @@ func getFieldString(obj interface{}, field string) string {
}).String()
}

func TestWithRemotingTimeout(t *testing.T) {
opt := defaultPushConsumerOptions()
WithRemotingTimeout(3*time.Second, 4*time.Second, 5*time.Second)(&opt)
if timeout := opt.RemotingClientConfig.ConnectionTimeout; timeout != 3*time.Second {
t.Errorf("consumer option WithRemotingTimeout connectionTimeout. want:%s, got=%s", 3*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.ReadTimeout; timeout != 4*time.Second {
t.Errorf("consumer option WithRemotingTimeout readTimeout. want:%s, got=%s", 4*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.WriteTimeout; timeout != 5*time.Second {
t.Errorf("consumer option WithRemotingTimeout writeTimeout. want:%s, got=%s", 5*time.Second, timeout)
}
}

func TestWithUnitName(t *testing.T) {
opt := defaultPushConsumerOptions()
unitName := "unsh"
Expand Down
2 changes: 1 addition & 1 deletion consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
rlog.LogKeyQueueOffset: msg.QueueOffset,
})
pq.mutex.RUnlock()
if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
if !pc.sendMessageBack(msg.Queue.BrokerName, msg, int(3+msg.ReconsumeTimes)) {
rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
Expand Down
10 changes: 8 additions & 2 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,12 @@ func (pc *defaultPullConsumer) sendMessageBack(brokerName string, msg *primitive
} else {
brokerAddr = msg.StoreHost
}
_, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
return err == nil
resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
if err != nil || resp.Code != internal.ResSuccess {
// send back as a normal message
return pc.defaultConsumer.sendMessageBackAsNormal(msg, pc.getMaxReconsumeTimes())
}
return true
}

func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLevel int) *remote.RemotingCommand {
Expand All @@ -640,6 +644,7 @@ func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, d
DelayLevel: delayLevel,
OriginMsgId: msg.MsgId,
MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
BrokerName: msg.Queue.BrokerName,
}

return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil)
Expand Down Expand Up @@ -742,6 +747,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
SubExpression: sd.SubString,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
BrokerName: request.mq.BrokerName,
}

brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
Expand Down
Loading

0 comments on commit 6afaab0

Please sign in to comment.