Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix stuck fetchstats on slow views/processors #439

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ func TestContext_Set(t *testing.T) {
st: &storageProxy{
Storage: st,
},
stats: newTableStats(),
updateStats: make(chan func(), 10),
stats: newTableStats(),
}
)
st.EXPECT().Set(key, []byte(value)).Return(nil)
Expand Down Expand Up @@ -376,9 +375,8 @@ func TestContext_GetSetStateful(t *testing.T) {
st: &storageProxy{
Storage: st,
},
state: newPartitionTableState().SetState(State(PartitionRunning)),
stats: newTableStats(),
updateStats: make(chan func(), 10),
state: newPartitionTableState().SetState(State(PartitionRunning)),
stats: newTableStats(),
}
)

Expand Down
165 changes: 54 additions & 111 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,8 @@ type PartitionProcessor struct {
consumer sarama.Consumer
tmgr TopicManager

stats *PartitionProcStats
requestStats chan bool
responseStats chan *PartitionProcStats
updateStats chan func()
cancelStatsLoop context.CancelFunc
mStats sync.RWMutex
stats *PartitionProcStats

commit commitCallback
producer Producer
Expand Down Expand Up @@ -137,38 +134,32 @@ func newPartitionProcessor(partition int32,

log := logger.Prefix(fmt.Sprintf("PartitionProcessor (%d)", partition))

statsLoopCtx, cancel := context.WithCancel(context.Background())

for _, v := range graph.visitors {
visitCallbacks[v.(*visitor).name] = v.(*visitor).cb
}

partProc := &PartitionProcessor{
log: log,
opts: opts,
partition: partition,
state: NewSignal(PPStateIdle, PPStateRecovering, PPStateRunning, PPStateStopping, PPStateStopped).SetState(PPStateIdle),
callbacks: callbacks,
lookups: lookupTables,
consumer: consumer,
producer: producer,
tmgr: tmgr,
joins: make(map[string]*PartitionTable),
input: make(chan *message, opts.partitionChannelSize),
inputTopics: topicList,
visitInput: make(chan *visit, defaultPPVisitChannelSize),
visitCallbacks: visitCallbacks,
graph: graph,
stats: newPartitionProcStats(topicList, outputList),
requestStats: make(chan bool),
responseStats: make(chan *PartitionProcStats, 1),
updateStats: make(chan func(), 10),
cancelStatsLoop: cancel,
commit: commit,
runMode: runMode,
}

go partProc.runStatsLoop(statsLoopCtx)
log: log,
opts: opts,
partition: partition,
state: NewSignal(PPStateIdle, PPStateRecovering, PPStateRunning, PPStateStopping, PPStateStopped).SetState(PPStateIdle),
callbacks: callbacks,
lookups: lookupTables,
consumer: consumer,
producer: producer,
tmgr: tmgr,
joins: make(map[string]*PartitionTable),
input: make(chan *message, opts.partitionChannelSize),
inputTopics: topicList,
visitInput: make(chan *visit, defaultPPVisitChannelSize),
visitCallbacks: visitCallbacks,
graph: graph,

stats: newPartitionProcStats(topicList, outputList),

commit: commit,
runMode: runMode,
}

if graph.GroupTable() != nil {
partProc.table = newPartitionTable(graph.GroupTable().Topic(),
Expand Down Expand Up @@ -217,7 +208,6 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
defer pp.state.SetState(PPStateRunning)

if pp.table != nil {
go pp.table.RunStatsLoop(runnerCtx)
setupErrg.Go(func() error {
pp.log.Debugf("catching up table")
defer pp.log.Debugf("catching up table done")
Expand All @@ -238,7 +228,6 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
)
pp.joins[join.Topic()] = table

go table.RunStatsLoop(runnerCtx)
setupErrg.Go(func() error {
return table.SetupAndRecover(setupCtx, false)
})
Expand Down Expand Up @@ -316,9 +305,6 @@ func (pp *PartitionProcessor) Stop() error {
pp.cancelRunnerGroup()
}

// stop the stats updating/serving loop
pp.cancelStatsLoop()

// wait for the runner to be done
runningErrs := multierror.Append(pp.runnerGroup.Wait().ErrorOrNil())

Expand Down Expand Up @@ -413,6 +399,9 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) {
}
}()

updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval)
defer updateHwmStatsTicker.Stop()

for {
select {
case ev, isOpen := <-pp.input:
Expand All @@ -425,7 +414,15 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) {
return newErrProcessing(pp.partition, err)
}

pp.enqueueStatsUpdate(ctx, func() { pp.updateStatsWithMessage(ev) })
pp.updateStats(func(stats *PartitionProcStats) {
ip := stats.Input[ev.topic]
ip.Bytes += len(ev.value)
ip.LastOffset = ev.offset
if !ev.timestamp.IsZero() {
ip.Delay = time.Since(ev.timestamp)
}
ip.Count++
})
case <-ctx.Done():
pp.log.Debugf("exiting, context is cancelled")
return
Expand All @@ -440,75 +437,42 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) {
case <-asyncErrs:
pp.log.Debugf("Errors occurred asynchronously. Will exit partition processor")
return
}
}
}

func (pp *PartitionProcessor) enqueueStatsUpdate(ctx context.Context, updater func()) {
select {
case pp.updateStats <- updater:
case <-ctx.Done():
default:
// going to default indicates the updateStats channel is not read, so so the stats
// loop is not actually running.
// We must not block here, so we'll skip the update
}
}

func (pp *PartitionProcessor) runStatsLoop(ctx context.Context) {
updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval)
defer updateHwmStatsTicker.Stop()
for {
select {
case <-pp.requestStats:
stats := pp.collectStats(ctx)
select {
case pp.responseStats <- stats:
case <-ctx.Done():
pp.log.Debugf("exiting, context is cancelled")
return
}
case update := <-pp.updateStats:
update()
case <-updateHwmStatsTicker.C:
pp.updateHwmStats()
case <-ctx.Done():
return
pp.updateStats(pp.updateHwmStats)
}
}
}

// updateStatsWithMessage updates the stats with a received message
func (pp *PartitionProcessor) updateStatsWithMessage(ev *message) {
ip := pp.stats.Input[ev.topic]
ip.Bytes += len(ev.value)
ip.LastOffset = ev.offset
if !ev.timestamp.IsZero() {
ip.Delay = time.Since(ev.timestamp)
}
ip.Count++
func (pp *PartitionProcessor) updateStats(updater func(stats *PartitionProcStats)) {
pp.mStats.Lock()
defer pp.mStats.Unlock()
updater(pp.stats)
}

// updateHwmStats updates the offset lag for all input topics based on the
// highwatermarks obtained by the consumer.
func (pp *PartitionProcessor) updateHwmStats() {
func (pp *PartitionProcessor) updateHwmStats(stats *PartitionProcStats) {
hwms := pp.consumer.HighWaterMarks()
for input, inputStats := range pp.stats.Input {
for input, inputStats := range stats.Input {
hwm := hwms[input][pp.partition]
if hwm != 0 && inputStats.LastOffset != 0 {
inputStats.OffsetLag = hwm - inputStats.LastOffset
}
}
}

func (pp *PartitionProcessor) collectStats(ctx context.Context) *PartitionProcStats {
var (
stats = pp.stats.clone()
m sync.Mutex
)
func (pp *PartitionProcessor) fetchStats(ctx context.Context) *PartitionProcStats {
pp.mStats.RLock()
stats := pp.stats.clone()
pp.mStats.RUnlock()

// mutex for the local stats-clone so the
// error group below doesn't get a concurrent-map-access error
var m sync.Mutex

errg, ctx := multierr.NewErrGroup(ctx)

// fetch join table stats
for topic, join := range pp.joins {
topic, join := topic, join
errg.Go(func() error {
Expand All @@ -523,6 +487,7 @@ func (pp *PartitionProcessor) collectStats(ctx context.Context) *PartitionProcSt
})
}

// if we have processor state, get those stats
if pp.table != nil {
errg.Go(func() error {
stats.TableStats = pp.table.fetchStats(ctx)
Expand All @@ -541,31 +506,9 @@ func (pp *PartitionProcessor) collectStats(ctx context.Context) *PartitionProcSt
return stats
}

func (pp *PartitionProcessor) fetchStats(ctx context.Context) *PartitionProcStats {
select {
case <-ctx.Done():
return nil
case <-time.After(fetchStatsTimeout):
pp.log.Printf("requesting stats timed out")
return nil
case pp.requestStats <- true:
}

// retrieve from response-channel
select {
case <-ctx.Done():
return nil
case <-time.After(fetchStatsTimeout):
pp.log.Printf("Fetching stats timed out")
return nil
case stats := <-pp.responseStats:
return stats
}
}

func (pp *PartitionProcessor) enqueueTrackOutputStats(ctx context.Context, topic string, size int) {
pp.enqueueStatsUpdate(ctx, func() {
pp.stats.trackOutput(topic, size)
pp.updateStats(func(stats *PartitionProcStats) {
stats.trackOutput(topic, size)
})
}

Expand Down
Loading