Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1178] feat:pull consumer support Assign subscription type #1179

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,18 @@ func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) {
type PullConsumer interface {
// Start the PullConsumer for consuming message
Start() error
// GetTopicRouteInfo get topic route info
GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error)

// Subscribe a topic for consuming
Subscribe(topic string, selector consumer.MessageSelector) error

// Unsubscribe a topic
Unsubscribe(topic string) error

// Assign assign message queue to consumer
Assign(topic string, mqs []*primitive.MessageQueue) error

// Shutdown the PullConsumer, all offset of MessageQueue will be commit to broker before process exit
Shutdown() error

Expand All @@ -104,6 +109,12 @@ type PullConsumer interface {
// PullFrom pull messages of queue from the offset to offset + numbers
PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

// SeekOffset seek offset for specific queue
SeekOffset(queue *primitive.MessageQueue, offset int64)

// OffsetForTimestamp get offset of specific queue with timestamp
OffsetForTimestamp(queue *primitive.MessageQueue, timestamp int64) (int64, error)

// UpdateOffset updateOffset update offset of queue in mem
UpdateOffset(queue *primitive.MessageQueue, offset int64) error

Expand Down
8 changes: 8 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ func (dc *defaultConsumer) shutdown() error {
return nil
}

func (dc *defaultConsumer) isRunning() bool {
return atomic.LoadInt32(&dc.state) == int32(internal.StateRunning)
}

func (dc *defaultConsumer) isStopped() bool {
return atomic.LoadInt32(&dc.state) == int32(internal.StateShutdown)
}

func (dc *defaultConsumer) persistConsumerOffset() error {
err := dc.makeSureStateOK()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions consumer/mock_offset_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 122 additions & 3 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,26 @@ func (cr *ConsumeRequest) GetPQ() *processQueue {
return cr.processQueue
}

type SubscriptionType int

const (
None SubscriptionType = iota
Subscribe
Assign
)

type defaultPullConsumer struct {
*defaultConsumer

topic string
selector MessageSelector
GroupName string
Model MessageModel
SubType SubscriptionType
UnitMode bool
nextQueueSequence int64
allocateQueues []*primitive.MessageQueue
mq2seekOffset sync.Map // key:primitive.MessageQueue,value:seekOffset

done chan struct{}
closeOnce sync.Once
Expand Down Expand Up @@ -116,18 +126,40 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
defaultConsumer: dc,
done: make(chan struct{}, 1),
consumeRequestCache: make(chan *ConsumeRequest, 4),
GroupName: dc.option.GroupName,
}
dc.mqChanged = c.messageQueueChanged
c.submitToConsume = c.consumeMessageConcurrently
c.interceptor = primitive.ChainInterceptors(c.option.Interceptors...)
return c, nil
}

func (pc *defaultPullConsumer) GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error) {
topicWithNs := utils.WrapNamespace(pc.option.Namespace, topic)
value, exist := pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
if exist {
return value.([]*primitive.MessageQueue), nil
}
pc.client.UpdateTopicRouteInfo()
value, exist = pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
if !exist {
return nil, errors2.ErrRouteNotFound
}
return value.([]*primitive.MessageQueue), nil
}

func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector) error {
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
return errors2.ErrStartTopic
}
if pc.SubType == Assign {
return errors2.ErrSubscriptionType
}

if pc.SubType == None {
pc.SubType = Subscribe
}
topic = utils.WrapNamespace(pc.option.Namespace, topic)

data := buildSubscriptionData(topic, selector)
Expand All @@ -139,11 +171,53 @@ func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector)
}

func (pc *defaultPullConsumer) Unsubscribe(topic string) error {
if pc.SubType == Assign {
return errors2.ErrSubscriptionType
}
topic = utils.WrapNamespace(pc.option.Namespace, topic)
pc.subscriptionDataTable.Delete(topic)
return nil
}

func (pc *defaultPullConsumer) Assign(topic string, mqs []*primitive.MessageQueue) error {
if pc.SubType == Subscribe {
return errors2.ErrSubscriptionType
}
if pc.SubType == None {
pc.SubType = Assign
}
topic = utils.WrapNamespace(pc.option.Namespace, topic)
data := buildSubscriptionData(topic, MessageSelector{TAG, _SubAll})
pc.topic = topic
pc.subscriptionDataTable.Store(topic, data)
oldQueues := pc.allocateQueues
pc.allocateQueues = mqs
rlog.Info("pull consumer assign new mqs", map[string]interface{}{
"topic": topic,
"group": pc.GroupName,
"oldMqs": oldQueues,
"newMqs": mqs,
})
if pc.isRunning() {
pc.Rebalance()
}
return nil
}

func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, originOffset int64) int64 {
if pc.SubType != Assign {
return originOffset
}
value, exist := pc.mq2seekOffset.LoadAndDelete(mq)
if !exist {
return originOffset
} else {
nextOffset := value.(int64)
_ = pc.updateOffset(mq, nextOffset)
return nextOffset
}
}

func (pc *defaultPullConsumer) Start() error {
var err error
pc.once.Do(func() {
Expand Down Expand Up @@ -546,11 +620,34 @@ func (pc *defaultPullConsumer) GetWhere() string {
}

func (pc *defaultPullConsumer) Rebalance() {
pc.defaultConsumer.doBalance()
switch pc.SubType {
case Assign:
pc.RebalanceViaTopic()
break
case Subscribe:
pc.defaultConsumer.doBalance()
break
}
}

func (pc *defaultPullConsumer) RebalanceIfNotPaused() {
pc.defaultConsumer.doBalanceIfNotPaused()
switch pc.SubType {
case Assign:
pc.RebalanceViaTopic()
break
case Subscribe:
pc.defaultConsumer.doBalanceIfNotPaused()
break
}
}

func (pc *defaultPullConsumer) RebalanceViaTopic() {
changed := pc.defaultConsumer.updateProcessQueueTable(pc.topic, pc.allocateQueues)
if changed {
rlog.Info("PullConsumer rebalance result changed ", map[string]interface{}{
rlog.LogKeyAllocateMessageQueue: pc.allocateQueues,
})
}
}

func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
Expand Down Expand Up @@ -613,7 +710,23 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.Mes

}

func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset int64) {
pc.mq2seekOffset.Store(mq, offset)
rlog.Info("pull consumer seek offset", map[string]interface{}{
"mq": mq,
"offset": offset,
})
}

func (pc *defaultPullConsumer) OffsetForTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) {
return pc.searchOffsetByTimestamp(mq, timestamp)
}

func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
if pc.SubType == Assign {
return
}

var allocateQueues []*primitive.MessageQueue
pc.defaultConsumer.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
Expand Down Expand Up @@ -734,6 +847,8 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
sleepTime = _PullDelayTimeWhenError
goto NEXT
}

nextOffset := pc.nextPullOffset(request.mq, request.nextOffset)
beginTime := time.Now()
sd := v.(*internal.SubscriptionData)

Expand All @@ -743,7 +858,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
QueueOffset: nextOffset,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: 0,
Expand Down Expand Up @@ -880,5 +995,9 @@ func (pc *defaultPullConsumer) validate() error {
return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup)
}

if pc.SubType == None {
return errors2.ErrBlankSubType
}

return nil
}
4 changes: 2 additions & 2 deletions consumer/statistics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ func TestNewStatsManager(t *testing.T) {
stats := NewStatsManager()

st := time.Now()
for {
for {
stats.increasePullTPS("rocketmq", "default", 1)
time.Sleep(500*time.Millisecond)
time.Sleep(500 * time.Millisecond)
if time.Now().Sub(st) > 5*time.Minute {
break
}
Expand Down
3 changes: 3 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ var (
ErrCreated = errors.New("consumer group has been created")
ErrBrokerNotFound = errors.New("broker can not found")
ErrStartTopic = errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
ErrSubscriptionType = errors.New("subscribe type is not matched")
ErrBlankSubType = errors.New("subscribe type should not be blank")
ErrResponse = errors.New("response error")
ErrCompressLevel = errors.New("unsupported compress level")
ErrUnknownIP = errors.New("unknown IP address")
ErrService = errors.New("service close is not running, please check")
ErrTopicNotExist = errors.New("topic not exist")
ErrRouteNotFound = errors.New("topic route not found")
ErrNotExisted = errors.New("not existed")
ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
ErrMultiIP = errors.New("multiple IP addr does not support")
Expand Down
Loading
Loading