Skip to content

Commit

Permalink
chore(benchmark): use periodic package for stats loop
Browse files Browse the repository at this point in the history
  • Loading branch information
ooesili committed Nov 5, 2024
1 parent 71155f4 commit 747a1c1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 33 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/Jeffail/shutdown v1.0.0
github.com/OneOfOne/xxhash v1.2.8
github.com/cenkalti/backoff/v4 v4.3.0
github.com/dustin/go-humanize v1.0.1
github.com/fatih/color v1.17.0
github.com/fsnotify/fsnotify v1.7.0
github.com/gofrs/uuid v4.4.0+incompatible
Expand Down Expand Up @@ -52,7 +53,6 @@ require (
github.com/cockroachdb/apd/v3 v3.2.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
63 changes: 31 additions & 32 deletions internal/impl/pure/processor_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/dustin/go-humanize"
"github.com/redpanda-data/benthos/v4/internal/periodic"
"github.com/redpanda-data/benthos/v4/public/service"
)

Expand Down Expand Up @@ -39,31 +40,18 @@ func newBenchmarkProcFromConfig(conf *service.ParsedConfig, mgr *service.Resourc
return nil, err
}

done := make(chan struct{})
b := &benchmarkProc{
startTime: time.Now(),
rollingInterval: interval,
logger: mgr.Logger(),
done: done,
}

if interval.String() != "0s" {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
defer b.wg.Done()

for {
select {
case <-done:
break

case <-ticker.C:
stats := b.sampleRolling()
b.printStats("rolling", stats, b.rollingInterval)
}
}
}()
b.periodic = periodic.New(interval, func() {
stats := b.sampleRolling()
b.printStats("rolling", stats, b.rollingInterval)
})
b.periodic.Start()
}

return b, nil
Expand All @@ -77,9 +65,9 @@ type benchmarkProc struct {
lock sync.Mutex
rollingStats benchmarkStats
totalStats benchmarkStats
closed bool

wg sync.WaitGroup
done chan<- struct{}
periodic *periodic.Periodic
}

func (b *benchmarkProc) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
Expand All @@ -99,15 +87,19 @@ func (b *benchmarkProc) Process(ctx context.Context, msg *service.Message) (serv
}

func (b *benchmarkProc) Close(ctx context.Context) error {
if b.done == nil {
// 2024-11-05: We have to guard against Close being from multiple goroutines
// at the same time.
b.lock.Lock()
defer b.lock.Unlock()

if b.closed {
return nil
}

close(b.done)
b.wg.Wait()
b.done = nil

if b.periodic != nil {
b.periodic.Stop()
}
b.printStats("total", b.totalStats, time.Since(b.startTime))
b.closed = true
return nil
}

Expand All @@ -123,12 +115,19 @@ func (b *benchmarkProc) sampleRolling() benchmarkStats {

func (b *benchmarkProc) printStats(window string, stats benchmarkStats, interval time.Duration) {
secs := interval.Seconds()
b.logger.Infof(
"%s stats: %s msg/sec, %s/sec",
window,
humanize.Ftoa(stats.msgCount/secs),
humanize.Bytes(uint64(stats.msgBytesCount/secs)),
)
msgsPerSec := stats.msgCount / secs
bytesPerSec := stats.msgBytesCount / secs
b.logger.
With(
"msg/sec", msgsPerSec,
"bytes/sec", bytesPerSec,
).
Infof(
"%s stats: %s msg/sec, %s/sec",
window,
humanize.Ftoa(msgsPerSec),
humanize.Bytes(uint64(bytesPerSec)),
)
}

type benchmarkStats struct {
Expand Down

0 comments on commit 747a1c1

Please sign in to comment.