diff --git a/context_test.go b/context_test.go index 764d6f3f..973c173b 100644 --- a/context_test.go +++ b/context_test.go @@ -327,8 +327,7 @@ func TestContext_Set(t *testing.T) { st: &storageProxy{ Storage: st, }, - stats: newTableStats(), - updateStats: make(chan func(), 10), + stats: newTableStats(), } ) st.EXPECT().Set(key, []byte(value)).Return(nil) @@ -376,9 +375,8 @@ func TestContext_GetSetStateful(t *testing.T) { st: &storageProxy{ Storage: st, }, - state: newPartitionTableState().SetState(State(PartitionRunning)), - stats: newTableStats(), - updateStats: make(chan func(), 10), + state: newPartitionTableState().SetState(State(PartitionRunning)), + stats: newTableStats(), } ) diff --git a/partition_processor.go b/partition_processor.go index d994f846..c9b19afe 100644 --- a/partition_processor.go +++ b/partition_processor.go @@ -80,11 +80,8 @@ type PartitionProcessor struct { consumer sarama.Consumer tmgr TopicManager - stats *PartitionProcStats - requestStats chan bool - responseStats chan *PartitionProcStats - updateStats chan func() - cancelStatsLoop context.CancelFunc + mStats sync.RWMutex + stats *PartitionProcStats commit commitCallback producer Producer @@ -137,38 +134,32 @@ func newPartitionProcessor(partition int32, log := logger.Prefix(fmt.Sprintf("PartitionProcessor (%d)", partition)) - statsLoopCtx, cancel := context.WithCancel(context.Background()) - for _, v := range graph.visitors { visitCallbacks[v.(*visitor).name] = v.(*visitor).cb } partProc := &PartitionProcessor{ - log: log, - opts: opts, - partition: partition, - state: NewSignal(PPStateIdle, PPStateRecovering, PPStateRunning, PPStateStopping, PPStateStopped).SetState(PPStateIdle), - callbacks: callbacks, - lookups: lookupTables, - consumer: consumer, - producer: producer, - tmgr: tmgr, - joins: make(map[string]*PartitionTable), - input: make(chan *message, opts.partitionChannelSize), - inputTopics: topicList, - visitInput: make(chan *visit, defaultPPVisitChannelSize), - visitCallbacks: visitCallbacks, - graph: graph, - stats: newPartitionProcStats(topicList, outputList), - requestStats: make(chan bool), - responseStats: make(chan *PartitionProcStats, 1), - updateStats: make(chan func(), 10), - cancelStatsLoop: cancel, - commit: commit, - runMode: runMode, - } - - go partProc.runStatsLoop(statsLoopCtx) + log: log, + opts: opts, + partition: partition, + state: NewSignal(PPStateIdle, PPStateRecovering, PPStateRunning, PPStateStopping, PPStateStopped).SetState(PPStateIdle), + callbacks: callbacks, + lookups: lookupTables, + consumer: consumer, + producer: producer, + tmgr: tmgr, + joins: make(map[string]*PartitionTable), + input: make(chan *message, opts.partitionChannelSize), + inputTopics: topicList, + visitInput: make(chan *visit, defaultPPVisitChannelSize), + visitCallbacks: visitCallbacks, + graph: graph, + + stats: newPartitionProcStats(topicList, outputList), + + commit: commit, + runMode: runMode, + } if graph.GroupTable() != nil { partProc.table = newPartitionTable(graph.GroupTable().Topic(), @@ -217,7 +208,6 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error { defer pp.state.SetState(PPStateRunning) if pp.table != nil { - go pp.table.RunStatsLoop(runnerCtx) setupErrg.Go(func() error { pp.log.Debugf("catching up table") defer pp.log.Debugf("catching up table done") @@ -238,7 +228,6 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error { ) pp.joins[join.Topic()] = table - go table.RunStatsLoop(runnerCtx) setupErrg.Go(func() error { return table.SetupAndRecover(setupCtx, false) }) @@ -316,9 +305,6 @@ func (pp *PartitionProcessor) Stop() error { pp.cancelRunnerGroup() } - // stop the stats updating/serving loop - pp.cancelStatsLoop() - // wait for the runner to be done runningErrs := multierror.Append(pp.runnerGroup.Wait().ErrorOrNil()) @@ -413,6 +399,9 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) { } }() + updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval) + defer updateHwmStatsTicker.Stop() + for { select { case ev, isOpen := <-pp.input: @@ -425,7 +414,15 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) { return newErrProcessing(pp.partition, err) } - pp.enqueueStatsUpdate(ctx, func() { pp.updateStatsWithMessage(ev) }) + pp.updateStats(func(stats *PartitionProcStats) { + ip := stats.Input[ev.topic] + ip.Bytes += len(ev.value) + ip.LastOffset = ev.offset + if !ev.timestamp.IsZero() { + ip.Delay = time.Since(ev.timestamp) + } + ip.Count++ + }) case <-ctx.Done(): pp.log.Debugf("exiting, context is cancelled") return @@ -440,60 +437,23 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) { case <-asyncErrs: pp.log.Debugf("Errors occurred asynchronously. Will exit partition processor") return - } - } -} - -func (pp *PartitionProcessor) enqueueStatsUpdate(ctx context.Context, updater func()) { - select { - case pp.updateStats <- updater: - case <-ctx.Done(): - default: - // going to default indicates the updateStats channel is not read, so so the stats - // loop is not actually running. - // We must not block here, so we'll skip the update - } -} - -func (pp *PartitionProcessor) runStatsLoop(ctx context.Context) { - updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval) - defer updateHwmStatsTicker.Stop() - for { - select { - case <-pp.requestStats: - stats := pp.collectStats(ctx) - select { - case pp.responseStats <- stats: - case <-ctx.Done(): - pp.log.Debugf("exiting, context is cancelled") - return - } - case update := <-pp.updateStats: - update() case <-updateHwmStatsTicker.C: - pp.updateHwmStats() - case <-ctx.Done(): - return + pp.updateStats(pp.updateHwmStats) } } } -// updateStatsWithMessage updates the stats with a received message -func (pp *PartitionProcessor) updateStatsWithMessage(ev *message) { - ip := pp.stats.Input[ev.topic] - ip.Bytes += len(ev.value) - ip.LastOffset = ev.offset - if !ev.timestamp.IsZero() { - ip.Delay = time.Since(ev.timestamp) - } - ip.Count++ +func (pp *PartitionProcessor) updateStats(updater func(stats *PartitionProcStats)) { + pp.mStats.Lock() + defer pp.mStats.Unlock() + updater(pp.stats) } // updateHwmStats updates the offset lag for all input topics based on the // highwatermarks obtained by the consumer. -func (pp *PartitionProcessor) updateHwmStats() { +func (pp *PartitionProcessor) updateHwmStats(stats *PartitionProcStats) { hwms := pp.consumer.HighWaterMarks() - for input, inputStats := range pp.stats.Input { + for input, inputStats := range stats.Input { hwm := hwms[input][pp.partition] if hwm != 0 && inputStats.LastOffset != 0 { inputStats.OffsetLag = hwm - inputStats.LastOffset @@ -501,14 +461,18 @@ func (pp *PartitionProcessor) updateHwmStats() { } } -func (pp *PartitionProcessor) collectStats(ctx context.Context) *PartitionProcStats { - var ( - stats = pp.stats.clone() - m sync.Mutex - ) +func (pp *PartitionProcessor) fetchStats(ctx context.Context) *PartitionProcStats { + pp.mStats.RLock() + stats := pp.stats.clone() + pp.mStats.RUnlock() + + // mutex for the local stats-clone so the + // error group below doesn't get a concurrent-map-access error + var m sync.Mutex errg, ctx := multierr.NewErrGroup(ctx) + // fetch join table stats for topic, join := range pp.joins { topic, join := topic, join errg.Go(func() error { @@ -523,6 +487,7 @@ func (pp *PartitionProcessor) collectStats(ctx context.Context) *PartitionProcSt }) } + // if we have processor state, get those stats if pp.table != nil { errg.Go(func() error { stats.TableStats = pp.table.fetchStats(ctx) @@ -541,31 +506,9 @@ func (pp *PartitionProcessor) collectStats(ctx context.Context) *PartitionProcSt return stats } -func (pp *PartitionProcessor) fetchStats(ctx context.Context) *PartitionProcStats { - select { - case <-ctx.Done(): - return nil - case <-time.After(fetchStatsTimeout): - pp.log.Printf("requesting stats timed out") - return nil - case pp.requestStats <- true: - } - - // retrieve from response-channel - select { - case <-ctx.Done(): - return nil - case <-time.After(fetchStatsTimeout): - pp.log.Printf("Fetching stats timed out") - return nil - case stats := <-pp.responseStats: - return stats - } -} - func (pp *PartitionProcessor) enqueueTrackOutputStats(ctx context.Context, topic string, size int) { - pp.enqueueStatsUpdate(ctx, func() { - pp.stats.trackOutput(topic, size) + pp.updateStats(func(stats *PartitionProcStats) { + stats.trackOutput(topic, size) }) } diff --git a/partition_table.go b/partition_table.go index 2c97b460..48bc7b6b 100644 --- a/partition_table.go +++ b/partition_table.go @@ -42,14 +42,8 @@ type PartitionTable struct { tmgr TopicManager updateCallback UpdateCallback - stats *TableStats - requestStats chan bool - responseStats chan *TableStats - updateStats chan func() - - // current offset - offset int64 - hwm int64 + mStats sync.RWMutex + stats *TableStats // stall config stallPeriod time.Duration @@ -92,10 +86,7 @@ func newPartitionTable(topic string, stallPeriod: defaultStallPeriod, stalledTimeout: defaultStalledTimeout, - stats: newTableStats(), - requestStats: make(chan bool), - responseStats: make(chan *TableStats, 1), - updateStats: make(chan func(), 10), + stats: newTableStats(), backoff: backoff, backoffResetTimeout: backoffResetTimeout, @@ -315,10 +306,10 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr // initialize recovery stats here, in case we don't do the recovery because // we're up to date already if stopAfterCatchup { - p.enqueueStatsUpdate(ctx, func() { - p.stats.Recovery.StartTime = time.Now() - p.stats.Recovery.Hwm = hwm - p.stats.Recovery.Offset = loadOffset + p.updateStats(func(stats *TableStats) { + stats.Recovery.StartTime = time.Now() + stats.Recovery.Hwm = hwm + stats.Recovery.Offset = loadOffset }) } @@ -363,7 +354,7 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr if stopAfterCatchup { err := p.markRecovered(ctx) - p.enqueueStatsUpdate(ctx, func() { p.stats.Recovery.RecoveryTime = time.Now() }) + p.updateStats(func(stats *TableStats) { stats.Recovery.RecoveryTime = time.Now() }) return err } return @@ -383,7 +374,7 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error { p.state.SetState(State(PartitionPreparing)) now := time.Now() - p.enqueueStatsUpdate(ctx, func() { p.stats.Recovery.RecoveryTime = now }) + p.updateStats(func(stats *TableStats) { stats.Recovery.RecoveryTime = now }) go func() { defer close(done) @@ -440,6 +431,9 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition messages := cons.Messages() errors := cons.Errors() + updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval) + defer updateHwmStatsTicker.Stop() + for { select { case err, ok := <-errors: @@ -473,10 +467,19 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition } if stopAfterCatchup { - p.enqueueStatsUpdate(ctx, func() { p.stats.Recovery.Offset = msg.Offset }) + p.updateStats(func(stats *TableStats) { stats.Recovery.Offset = msg.Offset }) } - p.enqueueStatsUpdate(ctx, func() { p.trackIncomingMessageStats(msg) }) + p.updateStats(func(stats *TableStats) { + ip := stats.Input + ip.Bytes += len(msg.Value) + ip.LastOffset = msg.Offset + if !msg.Timestamp.IsZero() { + ip.Delay = time.Since(msg.Timestamp) + } + ip.Count++ + stats.Stalled = false + }) if stopAfterCatchup && msg.Offset >= partitionHwm-1 { return nil @@ -486,8 +489,10 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition // only set to stalled, if the last message was earlier // than the stalled timeout if now.Sub(lastMessage) > p.stalledTimeout { - p.enqueueStatsUpdate(ctx, func() { p.stats.Stalled = true }) + p.updateStats(func(stats *TableStats) { stats.Stalled = true }) } + case <-updateHwmStatsTicker.C: + p.updateHwmStats() case <-ctx.Done(): return nil @@ -495,96 +500,40 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition } } -func (p *PartitionTable) enqueueStatsUpdate(ctx context.Context, updater func()) { - select { - case p.updateStats <- updater: - case <-ctx.Done(): - default: - // going to default indicates the updateStats channel is not read, so so the stats - // loop is not actually running. - // We must not block here, so we'll skip the update - } +func (p *PartitionTable) updateStats(updater func(stats *TableStats)) { + p.mStats.Lock() + defer p.mStats.Unlock() + updater(p.stats) } -// RunStatsLoop starts the handler for stats requests. This loop runs detached from the -// recover/catchup mechanism so clients can always request stats even if the partition table is not -// running (like a processor table after it's recovered). -func (p *PartitionTable) RunStatsLoop(ctx context.Context) { - updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval) - defer updateHwmStatsTicker.Stop() - for { - select { - case <-p.requestStats: - p.handleStatsRequest(ctx) - case update := <-p.updateStats: - update() - case <-updateHwmStatsTicker.C: - p.updateHwmStats() - case <-ctx.Done(): - return - } - } -} +func (p *PartitionTable) fetchStats(ctx context.Context) *TableStats { + p.mStats.RLock() + defer p.mStats.RUnlock() -func (p *PartitionTable) handleStatsRequest(ctx context.Context) { stats := p.stats.clone() stats.Status = PartitionStatus(p.state.State()) - select { - case p.responseStats <- stats: - case <-ctx.Done(): - p.log.Debugf("exiting, context is cancelled") - } -} -func (p *PartitionTable) fetchStats(ctx context.Context) *TableStats { - select { - case <-ctx.Done(): - return nil - case <-time.After(fetchStatsTimeout): - p.log.Printf("requesting stats timed out") - return nil - case p.requestStats <- true: - } - - // retrieve from response-channel - select { - case <-ctx.Done(): - return nil - case <-time.After(fetchStatsTimeout): - p.log.Printf("fetching stats timed out") - return nil - case stats := <-p.responseStats: - return stats - } + return stats } -func (p *PartitionTable) trackIncomingMessageStats(msg *sarama.ConsumerMessage) { - ip := p.stats.Input - ip.Bytes += len(msg.Value) - ip.LastOffset = msg.Offset - if !msg.Timestamp.IsZero() { - ip.Delay = time.Since(msg.Timestamp) +func (p *PartitionTable) updateHwmStats() { + hwms := p.consumer.HighWaterMarks() + hwm := hwms[p.topic][p.partition] + if hwm != 0 { + p.updateStats(func(stats *TableStats) { + stats.Input.OffsetLag = hwm - stats.Input.LastOffset + }) } - ip.Count++ - p.stats.Stalled = false } // TrackMessageWrite updates the write stats to passed length func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int) { - p.enqueueStatsUpdate(ctx, func() { - p.stats.Writes.Bytes += length - p.stats.Writes.Count++ + p.updateStats(func(stats *TableStats) { + stats.Writes.Bytes += length + stats.Writes.Count++ }) } -func (p *PartitionTable) updateHwmStats() { - hwms := p.consumer.HighWaterMarks() - hwm := hwms[p.topic][p.partition] - if hwm != 0 { - p.stats.Input.OffsetLag = hwm - p.stats.Input.LastOffset - } -} - func (p *PartitionTable) storeEvent(key string, value []byte, offset int64, headers []*sarama.RecordHeader) error { err := p.st.Update(&DefaultUpdateContext{ topic: p.st.topic, diff --git a/partition_table_test.go b/partition_table_test.go index ac1a4fef..94064480 100644 --- a/partition_table_test.go +++ b/partition_table_test.go @@ -631,6 +631,7 @@ func TestPT_loadMessages(t *testing.T) { topic = "some-topic" partition int32 consumer = defaultSaramaAutoConsumerMock(t) + ctx = context.Background() ) pt, bm, ctrl := defaultPT( t, @@ -656,7 +657,8 @@ func TestPT_loadMessages(t *testing.T) { return default: } - if pt.stats.Stalled { + + if pt.fetchStats(ctx).Stalled { return } } diff --git a/stats.go b/stats.go index 846a9ccc..61bc2486 100644 --- a/stats.go +++ b/stats.go @@ -28,7 +28,7 @@ const ( ) const ( - statsHwmUpdateInterval = 5 * time.Second + statsHwmUpdateInterval = 60 * time.Second fetchStatsTimeout = 10 * time.Second ) @@ -93,20 +93,22 @@ func newOutputStats() *OutputStats { } func (is *InputStats) clone() *InputStats { - var clone = *is + clone := *is return &clone } func (os *OutputStats) clone() *OutputStats { - var clone = *os + clone := *os return &clone } -type inputStatsMap map[string]*InputStats -type outputStatsMap map[string]*OutputStats +type ( + inputStatsMap map[string]*InputStats + outputStatsMap map[string]*OutputStats +) func (isp inputStatsMap) clone() map[string]*InputStats { - var c = map[string]*InputStats{} + c := map[string]*InputStats{} if isp == nil { return c } @@ -117,7 +119,7 @@ func (isp inputStatsMap) clone() map[string]*InputStats { } func (osp outputStatsMap) clone() map[string]*OutputStats { - var c = map[string]*OutputStats{} + c := map[string]*OutputStats{} if osp == nil { return c } @@ -132,7 +134,7 @@ func newRecoveryStats() *RecoveryStats { } func (rs *RecoveryStats) clone() *RecoveryStats { - var rsCopy = *rs + rsCopy := *rs return &rsCopy } diff --git a/view.go b/view.go index d76b3b3a..a614a219 100644 --- a/view.go +++ b/view.go @@ -149,7 +149,6 @@ func (v *View) createPartitions(brokers []string) (rerr error) { } func (v *View) runStateMerger(ctx context.Context) { - var ( states = make(map[int]PartitionStatus) m sync.Mutex @@ -164,14 +163,14 @@ func (v *View) runStateMerger(ctx context.Context) { defer m.Unlock() states[idx] = PartitionStatus(state) - var lowestState = PartitionStatus(-1) + lowestState := PartitionStatus(-1) for _, partitionState := range states { if lowestState == -1 || partitionState < lowestState { lowestState = partitionState } } - var newState = ViewState(-1) + newState := ViewState(-1) switch lowestState { case PartitionStopped: newState = ViewStateIdle @@ -249,7 +248,6 @@ func (v *View) Run(ctx context.Context) (rerr error) { for _, partition := range v.partitions { partition := partition - go partition.RunStatsLoop(ctx) recoverErrg.Go(func() error { return partition.SetupAndRecover(recoverCtx, v.opts.autoreconnect) }) @@ -333,7 +331,6 @@ func (v *View) Topic() string { // Get can be called by multiple goroutines concurrently. // Get can only be called after Recovered returns true. func (v *View) Get(key string) (interface{}, error) { - if v.state.IsState(State(ViewStateIdle)) || v.state.IsState(State(ViewStateInitializing)) { return nil, fmt.Errorf("View is either not running, not correctly initialized or stopped again. It's not safe to retrieve values") }