Skip to content

Commit

Permalink
Merge pull request #4 from apache/master
Browse files Browse the repository at this point in the history
from apache master
  • Loading branch information
francisoliverlee authored Oct 29, 2023
2 parents 4d54c51 + 49f0448 commit ba17d1f
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 30 deletions.
8 changes: 8 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,14 @@ func (dc *defaultConsumer) doBalance() {
return (mqAll[i].QueueId - mqAll[j].QueueId) < 0
})
allocateResult := dc.allocate(dc.consumerGroup, dc.client.ClientID(), mqAll, cidAll)

// Principle of flow control: pull TPS = 1000ms/PullInterval * BatchSize * len(allocateResult)
if consumeTPS := dc.option.ConsumeTPS.Load(); consumeTPS > 0 && len(allocateResult) > 0 {
pullBatchSize := dc.option.PullBatchSize.Load()
pullTimesPerSecond := float64(consumeTPS) / float64(pullBatchSize*int32(len(allocateResult)))
dc.option.PullInterval.Store(time.Duration(float64(time.Second) / pullTimesPerSecond))
}

changed := dc.updateProcessQueueTable(topic, allocateResult)
if changed {
dc.mqChanged(topic, mqAll, allocateResult)
Expand Down
28 changes: 20 additions & 8 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"
"time"

"go.uber.org/atomic"

"github.com/apache/rocketmq-client-go/v2/hooks"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
Expand Down Expand Up @@ -55,30 +57,33 @@ type consumerOptions struct {

// Flow control threshold on topic level, default value is -1(Unlimited)
//
// The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
// {@code pullThresholdForTopic} if it is't unlimited
// The value of {@code pullThresholdForQueue} will be overwritten and calculated based on
// {@code pullThresholdForTopic} if it isn't unlimited
//
// For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
// then pullThresholdForQueue will be set to 100
PullThresholdForTopic int

// Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
//
// The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
// {@code pullThresholdSizeForTopic} if it is't unlimited
// The value of {@code pullThresholdSizeForQueue} will be overwritten and calculated based on
// {@code pullThresholdSizeForTopic} if it isn't unlimited
//
// For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
// assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
PullThresholdSizeForTopic int

// Message pull Interval
PullInterval time.Duration
PullInterval atomic.Duration

// Message consumer tps
ConsumeTPS atomic.Int32

// Batch consumption size
ConsumeMessageBatchMaxSize int

// Batch pull size
PullBatchSize int32
PullBatchSize atomic.Int32

// Whether update subscription relationship when every pull
PostSubscriptionWhenPull bool
Expand Down Expand Up @@ -283,7 +288,7 @@ func WithStrategy(strategy AllocateStrategy) Option {

func WithPullBatchSize(batchSize int32) Option {
return func(options *consumerOptions) {
options.PullBatchSize = batchSize
options.PullBatchSize.Store(batchSize)
}
}

Expand All @@ -307,7 +312,14 @@ func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option {

func WithPullInterval(interval time.Duration) Option {
return func(options *consumerOptions) {
options.PullInterval = interval
options.PullInterval.Store(interval)
}
}

// WithConsumeTPS set single-machine consumption tps
func WithConsumeTPS(tps int32) Option {
return func(options *consumerOptions) {
options.ConsumeTPS.Store(tps)
}
}

Expand Down
4 changes: 2 additions & 2 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
time.Sleep(sleepTime)
}
// reset time
sleepTime = pc.option.PullInterval
sleepTime = pc.option.PullInterval.Load()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
Expand Down Expand Up @@ -736,7 +736,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: 0,
SubExpression: sd.SubString,
Expand Down
12 changes: 6 additions & 6 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (pc *pushConsumer) validate() error {
}
}

if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535*time.Millisecond {
if interval := pc.option.PullInterval.Load(); interval < 0 || interval > 65535*time.Millisecond {
return errors.New("option.PullInterval out of range [0, 65535]")
}

Expand All @@ -608,9 +608,9 @@ func (pc *pushConsumer) validate() error {
}
}

if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
if pc.option.PullBatchSize == 0 {
pc.option.PullBatchSize = 32
if pullBatchSize := pc.option.PullBatchSize.Load(); pullBatchSize < 1 || pullBatchSize > 1024 {
if pullBatchSize == 0 {
pc.option.PullBatchSize.Store(32)
} else {
return errors.New("option.PullBatchSize out of range [1, 1024]")
}
Expand Down Expand Up @@ -674,7 +674,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
time.Sleep(sleepTime)
}
// reset time
sleepTime = pc.option.PullInterval
sleepTime = pc.option.PullInterval.Load()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
Expand Down Expand Up @@ -813,7 +813,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: commitOffsetValue,
SubExpression: subExpression,
Expand Down
5 changes: 3 additions & 2 deletions internal/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,9 @@ func (s *namesrvs) routeData2PublishInfo(topic string, data *TopicRouteData) *To
}

qds := data.QueueDataList
sort.Slice(qds, func(i, j int) bool {
return i-j >= 0
sort.SliceStable(qds, func(i, j int) bool {
// sort by increase
return strings.Compare(qds[i].BrokerName, qds[j].BrokerName) < 0
})

for _, qd := range qds {
Expand Down
14 changes: 9 additions & 5 deletions primitive/nsresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package primitive

import (
"crypto/md5"
"encoding/hex"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -139,7 +141,7 @@ func (h *HttpResolver) Resolve() []string {
}

func (h *HttpResolver) Description() string {
return fmt.Sprintf("passthrough resolver of domain:%v instance:%v", h.domain, h.instance)
return fmt.Sprintf("http resolver of domain:%v", h.domain)
}

func (h *HttpResolver) get() []string {
Expand Down Expand Up @@ -177,7 +179,7 @@ func (h *HttpResolver) get() []string {
}

func (h *HttpResolver) saveSnapshot(body []byte) error {
filePath := h.getSnapshotFilePath(h.instance)
filePath := h.getSnapshotFilePath()
err := ioutil.WriteFile(filePath, body, 0644)
if err != nil {
rlog.Error("name server snapshot save failed", map[string]interface{}{
Expand All @@ -194,7 +196,7 @@ func (h *HttpResolver) saveSnapshot(body []byte) error {
}

func (h *HttpResolver) loadSnapshot() []string {
filePath := h.getSnapshotFilePath(h.instance)
filePath := h.getSnapshotFilePath()
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
rlog.Warning("name server snapshot local file not exists", map[string]interface{}{
Expand All @@ -214,7 +216,7 @@ func (h *HttpResolver) loadSnapshot() []string {
return strings.Split(string(bs), ";")
}

func (h *HttpResolver) getSnapshotFilePath(instanceName string) string {
func (h *HttpResolver) getSnapshotFilePath() string {
homeDir := ""
if usr, err := user.Current(); err == nil {
homeDir = usr.HomeDir
Expand All @@ -232,6 +234,8 @@ func (h *HttpResolver) getSnapshotFilePath(instanceName string) string {
})
}
}
filePath := path.Join(storePath, fmt.Sprintf("nameserver_addr-%s", instanceName))
hash := md5.Sum([]byte(h.domain))
domainHash := hex.EncodeToString(hash[:])
filePath := path.Join(storePath, fmt.Sprintf("nameserver_addr-%s", domainHash))
return filePath
}
14 changes: 7 additions & 7 deletions primitive/nsresolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestHttpResolverWithGet(t *testing.T) {
resolver.Resolve()

// check snapshot saved
filePath := resolver.getSnapshotFilePath("DEFAULT")
filePath := resolver.getSnapshotFilePath()
body := strings.Join(srvs, ";")
bs, _ := ioutil.ReadFile(filePath)
So(string(bs), ShouldEqual, body)
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestHttpResolverWithGetUnitName(t *testing.T) {
resolver.Resolve()

// check snapshot saved
filePath := resolver.getSnapshotFilePath("DEFAULT")
filePath := resolver.getSnapshotFilePath()
body := strings.Join(srvs, ";")
bs, _ := ioutil.ReadFile(filePath)
So(string(bs), ShouldEqual, body)
Expand All @@ -133,7 +133,7 @@ func TestHttpResolverWithSnapshotFile(t *testing.T) {

os.Setenv("NAMESRV_ADDR", "") // clear env
// setup local snapshot file
filePath := resolver.getSnapshotFilePath("DEFAULT")
filePath := resolver.getSnapshotFilePath()
body := strings.Join(srvs, ";")
_ = ioutil.WriteFile(filePath, []byte(body), 0644)

Expand All @@ -143,7 +143,7 @@ func TestHttpResolverWithSnapshotFile(t *testing.T) {
})
}

func TesHttpReslverWithSnapshotFileOnce(t *testing.T) {
func TestHttpResolverWithSnapshotFileOnce(t *testing.T) {
Convey("Test UpdateNameServerAddress Load Local Snapshot Once", t, func() {
srvs := []string{
"192.168.100.1",
Expand All @@ -157,18 +157,18 @@ func TesHttpReslverWithSnapshotFileOnce(t *testing.T) {

os.Setenv("NAMESRV_ADDR", "") // clear env
// setup local snapshot file
filePath := resolver.getSnapshotFilePath("DEFAULT")
filePath := resolver.getSnapshotFilePath()
body := strings.Join(srvs, ";")
_ = ioutil.WriteFile(filePath, []byte(body), 0644)
// load local snapshot file first time
addrs1 := resolver.Resolve()

// change the local snapshot file to check load once
// change the local snapshot file
_ = ioutil.WriteFile(filePath, []byte("127.0.0.1;127.0.0.2"), 0644)

addrs2 := resolver.Resolve()

So(Diff(addrs1, addrs2), ShouldBeFalse)
So(Diff(addrs1, addrs2), ShouldBeTrue)
So(Diff(addrs1, srvs), ShouldBeFalse)
})
}
1 change: 1 addition & 0 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
cancel()
if err != nil {
h(ctx, nil, err)
return
}

resp := primitive.NewSendResult()
Expand Down

0 comments on commit ba17d1f

Please sign in to comment.