From 02e351d48868d008302e4639e190126eca6a215f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=83=E4=B9=99?= Date: Sat, 6 Jun 2020 18:39:14 +0800 Subject: [PATCH] [fix] data race --- consumer/heart_beat.go | 5 +++-- go.mod | 7 +++---- go.sum | 27 ++++++++++++++++++++++----- producer/io_thread_pool.go | 7 ++++--- producer/io_worker.go | 7 ++++--- producer/log_accumulator.go | 14 ++++++++------ producer/mover.go | 16 +++++++++------- producer/producer.go | 10 +++++----- 8 files changed, 58 insertions(+), 35 deletions(-) diff --git a/consumer/heart_beat.go b/consumer/heart_beat.go index 8f8f2205..1a8a0a48 100644 --- a/consumer/heart_beat.go +++ b/consumer/heart_beat.go @@ -2,10 +2,11 @@ package consumerLibrary import ( "fmt" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "sync" "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" ) var shardLock sync.RWMutex diff --git a/go.mod b/go.mod index 0f6d1edf..b0765a8b 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/BurntSushi/toml v0.3.1 // indirect github.com/cenkalti/backoff v1.0.0 github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 - github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-kit/kit v0.8.1-0.20190225011659-a8cc1630e08a github.com/go-logfmt/logfmt v0.4.0 // indirect github.com/go-stack/stack v1.8.0 // indirect @@ -15,9 +14,9 @@ require ( github.com/pierrec/lz4 v2.0.2+incompatible github.com/pierrec/xxHash v0.0.0-20170714082455-a0006b13c722 // indirect github.com/pkg/errors v0.8.0 - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/testify v1.1.5-0.20171018052257-2aa2c176b9da - golang.org/x/net v0.0.0-20160826235738-6250b4127982 + github.com/stretchr/testify v1.3.0 + go.uber.org/atomic v1.4.0 + golang.org/x/net v0.0.0-20190620200207-3b0461eec859 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 gopkg.in/yaml.v2 v2.2.2 // indirect diff --git a/go.sum b/go.sum index a96c5227..a8c1276e 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,11 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/aliyun/aliyun-log-go-sdk v0.0.0-20180712123623-7663ea59fab7/go.mod h1:s3YpfQC+iMbiVR7vog8NoYAbdfWdRxeUG03a6as4U34= github.com/cenkalti/backoff v1.0.0 h1:2XeuDgvPv/6QDyzIuxb6n36ADVocyqTLlOSpYBGYtvM= github.com/cenkalti/backoff v1.0.0/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-kit/kit v0.8.1-0.20190225011659-a8cc1630e08a h1:88BHsYIH88xo2+wjDobSFxOReDC8TdbXsg0m1znRvrA= @@ -27,12 +28,28 @@ github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.1.5-0.20171018052257-2aa2c176b9da h1:glZmY4mCDpnJuNJ4z+wbu5y2Qir8LgfkvYgv5as+LBY= -github.com/stretchr/testify v1.1.5-0.20171018052257-2aa2c176b9da/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -golang.org/x/net v0.0.0-20160826235738-6250b4127982 h1:ovQ1AutJRN5Z9h6qirof5A0+p3nYUYr61mkoytd27rE= -golang.org/x/net v0.0.0-20160826235738-6250b4127982/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 h1:AFxeG48hTWHhDTQDk/m2gorfVHUEa9vo3tp3D7TzwjI= diff --git a/producer/io_thread_pool.go b/producer/io_thread_pool.go index 2b71265d..24803145 100644 --- a/producer/io_thread_pool.go +++ b/producer/io_thread_pool.go @@ -7,10 +7,11 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "go.uber.org/atomic" ) type IoThreadPool struct { - threadPoolShutDownFlag bool + threadPoolShutDownFlag *atomic.Bool queue *list.List lock sync.RWMutex ioworker *IoWorker @@ -19,7 +20,7 @@ type IoThreadPool struct { func initIoThreadPool(ioworker *IoWorker, logger log.Logger) *IoThreadPool { return &IoThreadPool{ - threadPoolShutDownFlag: false, + threadPoolShutDownFlag: atomic.NewBool(false), queue: list.New(), ioworker: ioworker, logger: logger, @@ -56,7 +57,7 @@ func (threadPool *IoThreadPool) start(ioWorkerWaitGroup *sync.WaitGroup, ioThrea go threadPool.ioworker.sendToServer(threadPool.popTask(), ioWorkerWaitGroup) } } else { - if !threadPool.threadPoolShutDownFlag { + if !threadPool.threadPoolShutDownFlag.Load() { time.Sleep(100 * time.Millisecond) } else { level.Info(threadPool.logger).Log("msg", "All cache tasks in the thread pool have been successfully sent") diff --git a/producer/io_worker.go b/producer/io_worker.go index 7540bd8b..bbf93e07 100644 --- a/producer/io_worker.go +++ b/producer/io_worker.go @@ -9,6 +9,7 @@ import ( sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + uberatomic "go.uber.org/atomic" ) type CallBack interface { @@ -20,7 +21,7 @@ type IoWorker struct { taskCount int64 client sls.ClientInterface retryQueue *RetryQueue - retryQueueShutDownFlag bool + retryQueueShutDownFlag *uberatomic.Bool logger log.Logger maxIoWorker chan int64 noRetryStatusCodeMap map[int]*string @@ -31,7 +32,7 @@ func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log client: client, retryQueue: retryQueue, taskCount: 0, - retryQueueShutDownFlag: false, + retryQueueShutDownFlag: uberatomic.NewBool(false), logger: logger, maxIoWorker: make(chan int64, maxIoWorkerCount), noRetryStatusCodeMap: errorStatusMap, @@ -63,7 +64,7 @@ func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch, ioWorkerWai } } } else { - if ioWorker.retryQueueShutDownFlag { + if ioWorker.retryQueueShutDownFlag.Load() { if len(producerBatch.callBackList) > 0 { for _, callBack := range producerBatch.callBackList { callBack.Fail(producerBatch.result) diff --git a/producer/log_accumulator.go b/producer/log_accumulator.go index 761baf84..338e6968 100644 --- a/producer/log_accumulator.go +++ b/producer/log_accumulator.go @@ -2,12 +2,14 @@ package producer import ( "errors" - "github.com/aliyun/aliyun-log-go-sdk" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "strings" "sync" "sync/atomic" + + sls "github.com/aliyun/aliyun-log-go-sdk" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + uberatomic "go.uber.org/atomic" ) type LogAccumulator struct { @@ -15,7 +17,7 @@ type LogAccumulator struct { logGroupData sync.Map //map[string]*ProducerBatch, producerConfig *ProducerConfig ioWorker *IoWorker - shutDownFlag bool + shutDownFlag *uberatomic.Bool logger log.Logger threadPool *IoThreadPool } @@ -24,7 +26,7 @@ func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.L return &LogAccumulator{ producerConfig: config, ioWorker: ioWorker, - shutDownFlag: false, + shutDownFlag: uberatomic.NewBool(false), logger: logger, threadPool: threadPool, } @@ -54,7 +56,7 @@ func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, s logData interface{}, callback CallBack) error { defer logAccumulator.lock.Unlock() logAccumulator.lock.Lock() - if logAccumulator.shutDownFlag { + if logAccumulator.shutDownFlag.Load() { level.Warn(logAccumulator.logger).Log("msg", "Producer has started and shut down and cannot write to new logs") return errors.New("Producer has started and shut down and cannot write to new logs") } diff --git a/producer/mover.go b/producer/mover.go index bd8227d7..ef4ec597 100644 --- a/producer/mover.go +++ b/producer/mover.go @@ -1,14 +1,16 @@ package producer import ( - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "sync" "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "go.uber.org/atomic" ) type Mover struct { - moverShutDownFlag bool + moverShutDownFlag *atomic.Bool retryQueue *RetryQueue ioWorker *IoWorker logAccumulator *LogAccumulator @@ -18,7 +20,7 @@ type Mover struct { func initMover(logAccumulator *LogAccumulator, retryQueue *RetryQueue, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool) *Mover { mover := &Mover{ - moverShutDownFlag: false, + moverShutDownFlag: atomic.NewBool(false), retryQueue: retryQueue, ioWorker: ioWorker, logAccumulator: logAccumulator, @@ -43,7 +45,7 @@ func (mover *Mover) sendToServer(key interface{}, batch *ProducerBatch, config * func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) { defer moverWaitGroup.Done() - for !mover.moverShutDownFlag { + for !mover.moverShutDownFlag.Load() { sleepMs := config.LingerMs mapCount := 0 mover.logAccumulator.logGroupData.Range(func(key, value interface{}) bool { @@ -66,7 +68,7 @@ func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) sleepMs = config.LingerMs } - retryProducerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag) + retryProducerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load()) if retryProducerBatchList == nil { // If there is nothing to send in the retry queue, just wait for the minimum time that was given to me last time. time.Sleep(time.Duration(sleepMs) * time.Millisecond) @@ -84,7 +86,7 @@ func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) return true }) - producerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag) + producerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load()) count := len(producerBatchList) for i := 0; i < count; i++ { mover.threadPool.addTask(producerBatchList[i]) diff --git a/producer/producer.go b/producer/producer.go index bf716354..9bffbdc7 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -244,7 +244,7 @@ func (producer *Producer) Close(timeoutMs int64) error { startCloseTime := time.Now() producer.sendCloseProdcerSignal() producer.moverWaitGroup.Wait() - producer.threadPool.threadPoolShutDownFlag = true + producer.threadPool.threadPoolShutDownFlag.Store(true) for { if atomic.LoadInt64(&producer.mover.ioWorker.taskCount) == 0 && !producer.threadPool.hasTask() { level.Info(producer.logger).Log("msg", "All groutines of producer have been shutdown") @@ -262,7 +262,7 @@ func (producer *Producer) Close(timeoutMs int64) error { func (producer *Producer) SafeClose() { producer.sendCloseProdcerSignal() producer.moverWaitGroup.Wait() - producer.threadPool.threadPoolShutDownFlag = true + producer.threadPool.threadPoolShutDownFlag.Store(true) producer.ioThreadPoolWaitGroup.Wait() producer.ioWorkerWaitGroup.Wait() level.Info(producer.logger).Log("msg", "Producer close finish") @@ -271,9 +271,9 @@ func (producer *Producer) SafeClose() { func (producer *Producer) sendCloseProdcerSignal() { level.Info(producer.logger).Log("msg", "producer start closing") producer.closeStstokenChannel() - producer.mover.moverShutDownFlag = true - producer.logAccumulator.shutDownFlag = true - producer.mover.ioWorker.retryQueueShutDownFlag = true + producer.mover.moverShutDownFlag.Store(true) + producer.logAccumulator.shutDownFlag.Store(true) + producer.mover.ioWorker.retryQueueShutDownFlag.Store(true) } func (producer *Producer) closeStstokenChannel() {