Skip to content

Commit

Permalink
fix consumer retry process function
Browse files Browse the repository at this point in the history
  • Loading branch information
xuxiaoahang2018 committed Jul 23, 2019
1 parent 32935d0 commit fa092a6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
2 changes: 2 additions & 0 deletions consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ShardConsumerWorker struct {
logger log.Logger
lastFetchTimeForForceFlushCpt int64
isFlushCheckpointDone bool
rollBackCheckpoint string
}

func (consumer *ShardConsumerWorker) setConsumerStatus(status string) {
Expand Down Expand Up @@ -55,6 +56,7 @@ func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, do fun
isFlushCheckpointDone: true,
logger: logger,
lastFetchTimeForForceFlushCpt: 0,
rollBackCheckpoint: "",
}
return shardConsumeWorker
}
Expand Down
28 changes: 24 additions & 4 deletions consumer/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log/level"
"time"
)

func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) {
Expand Down Expand Up @@ -49,16 +50,35 @@ func (consumer *ShardConsumerWorker) consumerFetchTask() (*sls.LogGroupList, str
}

func (consumer *ShardConsumerWorker) consumerProcessTask() string {
// If the user's consumption function reports a panic error, it will be captured and exited.
rollBackCheckpoint := ""
// If the user's consumption function reports a panic error, it will be captured and retry until sucessed.
defer func() {
if r := recover(); r != nil {
level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r)
for {
if consumer.consumerRetryProcessTask() == true {
break
} else {
time.Sleep(time.Second * 2)
}
}
}
}()
if consumer.lastFetchLogGroupList != nil {
rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList)
consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList)
consumer.consumerCheckPointTracker.flushCheck()
}
return rollBackCheckpoint
return consumer.rollBackCheckpoint
}

func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool {
level.Info(consumer.logger).Log("msg", "Start retrying the process function")
defer func() {
if r := recover(); r != nil {
level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r)
}
}()
consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList)
consumer.consumerCheckPointTracker.flushCheck()
return true

}

0 comments on commit fa092a6

Please sign in to comment.