Skip to content

Commit

Permalink
[fix] data race
Browse files Browse the repository at this point in the history
  • Loading branch information
shabicheng committed Jun 6, 2020
1 parent a9ae3dd commit 02e351d
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 35 deletions.
5 changes: 3 additions & 2 deletions consumer/heart_beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package consumerLibrary

import (
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)

var shardLock sync.RWMutex
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/cenkalti/backoff v1.0.0
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-kit/kit v0.8.1-0.20190225011659-a8cc1630e08a
github.com/go-logfmt/logfmt v0.4.0 // indirect
github.com/go-stack/stack v1.8.0 // indirect
Expand All @@ -15,9 +14,9 @@ require (
github.com/pierrec/lz4 v2.0.2+incompatible
github.com/pierrec/xxHash v0.0.0-20170714082455-a0006b13c722 // indirect
github.com/pkg/errors v0.8.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.1.5-0.20171018052257-2aa2c176b9da
golang.org/x/net v0.0.0-20160826235738-6250b4127982
github.com/stretchr/testify v1.3.0
go.uber.org/atomic v1.4.0
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3
gopkg.in/yaml.v2 v2.2.2 // indirect
Expand Down
27 changes: 22 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aliyun/aliyun-log-go-sdk v0.0.0-20180712123623-7663ea59fab7/go.mod h1:s3YpfQC+iMbiVR7vog8NoYAbdfWdRxeUG03a6as4U34=
github.com/cenkalti/backoff v1.0.0 h1:2XeuDgvPv/6QDyzIuxb6n36ADVocyqTLlOSpYBGYtvM=
github.com/cenkalti/backoff v1.0.0/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-kit/kit v0.8.1-0.20190225011659-a8cc1630e08a h1:88BHsYIH88xo2+wjDobSFxOReDC8TdbXsg0m1znRvrA=
Expand All @@ -27,12 +28,28 @@ github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.1.5-0.20171018052257-2aa2c176b9da h1:glZmY4mCDpnJuNJ4z+wbu5y2Qir8LgfkvYgv5as+LBY=
github.com/stretchr/testify v1.1.5-0.20171018052257-2aa2c176b9da/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/net v0.0.0-20160826235738-6250b4127982 h1:ovQ1AutJRN5Z9h6qirof5A0+p3nYUYr61mkoytd27rE=
golang.org/x/net v0.0.0-20160826235738-6250b4127982/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 h1:AFxeG48hTWHhDTQDk/m2gorfVHUEa9vo3tp3D7TzwjI=
Expand Down
7 changes: 4 additions & 3 deletions producer/io_thread_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.uber.org/atomic"
)

type IoThreadPool struct {
threadPoolShutDownFlag bool
threadPoolShutDownFlag *atomic.Bool
queue *list.List
lock sync.RWMutex
ioworker *IoWorker
Expand All @@ -19,7 +20,7 @@ type IoThreadPool struct {

func initIoThreadPool(ioworker *IoWorker, logger log.Logger) *IoThreadPool {
return &IoThreadPool{
threadPoolShutDownFlag: false,
threadPoolShutDownFlag: atomic.NewBool(false),
queue: list.New(),
ioworker: ioworker,
logger: logger,
Expand Down Expand Up @@ -56,7 +57,7 @@ func (threadPool *IoThreadPool) start(ioWorkerWaitGroup *sync.WaitGroup, ioThrea
go threadPool.ioworker.sendToServer(threadPool.popTask(), ioWorkerWaitGroup)
}
} else {
if !threadPool.threadPoolShutDownFlag {
if !threadPool.threadPoolShutDownFlag.Load() {
time.Sleep(100 * time.Millisecond)
} else {
level.Info(threadPool.logger).Log("msg", "All cache tasks in the thread pool have been successfully sent")
Expand Down
7 changes: 4 additions & 3 deletions producer/io_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
uberatomic "go.uber.org/atomic"
)

type CallBack interface {
Expand All @@ -20,7 +21,7 @@ type IoWorker struct {
taskCount int64
client sls.ClientInterface
retryQueue *RetryQueue
retryQueueShutDownFlag bool
retryQueueShutDownFlag *uberatomic.Bool
logger log.Logger
maxIoWorker chan int64
noRetryStatusCodeMap map[int]*string
Expand All @@ -31,7 +32,7 @@ func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log
client: client,
retryQueue: retryQueue,
taskCount: 0,
retryQueueShutDownFlag: false,
retryQueueShutDownFlag: uberatomic.NewBool(false),
logger: logger,
maxIoWorker: make(chan int64, maxIoWorkerCount),
noRetryStatusCodeMap: errorStatusMap,
Expand Down Expand Up @@ -63,7 +64,7 @@ func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch, ioWorkerWai
}
}
} else {
if ioWorker.retryQueueShutDownFlag {
if ioWorker.retryQueueShutDownFlag.Load() {
if len(producerBatch.callBackList) > 0 {
for _, callBack := range producerBatch.callBackList {
callBack.Fail(producerBatch.result)
Expand Down
14 changes: 8 additions & 6 deletions producer/log_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package producer

import (
"errors"
"github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"strings"
"sync"
"sync/atomic"

sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
uberatomic "go.uber.org/atomic"
)

type LogAccumulator struct {
lock sync.RWMutex
logGroupData sync.Map //map[string]*ProducerBatch,
producerConfig *ProducerConfig
ioWorker *IoWorker
shutDownFlag bool
shutDownFlag *uberatomic.Bool
logger log.Logger
threadPool *IoThreadPool
}
Expand All @@ -24,7 +26,7 @@ func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.L
return &LogAccumulator{
producerConfig: config,
ioWorker: ioWorker,
shutDownFlag: false,
shutDownFlag: uberatomic.NewBool(false),
logger: logger,
threadPool: threadPool,
}
Expand Down Expand Up @@ -54,7 +56,7 @@ func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, s
logData interface{}, callback CallBack) error {
defer logAccumulator.lock.Unlock()
logAccumulator.lock.Lock()
if logAccumulator.shutDownFlag {
if logAccumulator.shutDownFlag.Load() {
level.Warn(logAccumulator.logger).Log("msg", "Producer has started and shut down and cannot write to new logs")
return errors.New("Producer has started and shut down and cannot write to new logs")
}
Expand Down
16 changes: 9 additions & 7 deletions producer/mover.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package producer

import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.uber.org/atomic"
)

type Mover struct {
moverShutDownFlag bool
moverShutDownFlag *atomic.Bool
retryQueue *RetryQueue
ioWorker *IoWorker
logAccumulator *LogAccumulator
Expand All @@ -18,7 +20,7 @@ type Mover struct {

func initMover(logAccumulator *LogAccumulator, retryQueue *RetryQueue, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool) *Mover {
mover := &Mover{
moverShutDownFlag: false,
moverShutDownFlag: atomic.NewBool(false),
retryQueue: retryQueue,
ioWorker: ioWorker,
logAccumulator: logAccumulator,
Expand All @@ -43,7 +45,7 @@ func (mover *Mover) sendToServer(key interface{}, batch *ProducerBatch, config *

func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) {
defer moverWaitGroup.Done()
for !mover.moverShutDownFlag {
for !mover.moverShutDownFlag.Load() {
sleepMs := config.LingerMs
mapCount := 0
mover.logAccumulator.logGroupData.Range(func(key, value interface{}) bool {
Expand All @@ -66,7 +68,7 @@ func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig)
sleepMs = config.LingerMs
}

retryProducerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag)
retryProducerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load())
if retryProducerBatchList == nil {
// If there is nothing to send in the retry queue, just wait for the minimum time that was given to me last time.
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
Expand All @@ -84,7 +86,7 @@ func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig)
return true
})

producerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag)
producerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load())
count := len(producerBatchList)
for i := 0; i < count; i++ {
mover.threadPool.addTask(producerBatchList[i])
Expand Down
10 changes: 5 additions & 5 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (producer *Producer) Close(timeoutMs int64) error {
startCloseTime := time.Now()
producer.sendCloseProdcerSignal()
producer.moverWaitGroup.Wait()
producer.threadPool.threadPoolShutDownFlag = true
producer.threadPool.threadPoolShutDownFlag.Store(true)
for {
if atomic.LoadInt64(&producer.mover.ioWorker.taskCount) == 0 && !producer.threadPool.hasTask() {
level.Info(producer.logger).Log("msg", "All groutines of producer have been shutdown")
Expand All @@ -262,7 +262,7 @@ func (producer *Producer) Close(timeoutMs int64) error {
func (producer *Producer) SafeClose() {
producer.sendCloseProdcerSignal()
producer.moverWaitGroup.Wait()
producer.threadPool.threadPoolShutDownFlag = true
producer.threadPool.threadPoolShutDownFlag.Store(true)
producer.ioThreadPoolWaitGroup.Wait()
producer.ioWorkerWaitGroup.Wait()
level.Info(producer.logger).Log("msg", "Producer close finish")
Expand All @@ -271,9 +271,9 @@ func (producer *Producer) SafeClose() {
func (producer *Producer) sendCloseProdcerSignal() {
level.Info(producer.logger).Log("msg", "producer start closing")
producer.closeStstokenChannel()
producer.mover.moverShutDownFlag = true
producer.logAccumulator.shutDownFlag = true
producer.mover.ioWorker.retryQueueShutDownFlag = true
producer.mover.moverShutDownFlag.Store(true)
producer.logAccumulator.shutDownFlag.Store(true)
producer.mover.ioWorker.retryQueueShutDownFlag.Store(true)
}

func (producer *Producer) closeStstokenChannel() {
Expand Down

0 comments on commit 02e351d

Please sign in to comment.