Skip to content

Commit

Permalink
Iimprove batch consumer error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Ravi Atluri committed Nov 14, 2024
1 parent 892741b commit b556d7e
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 40 deletions.
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ test-cov: gocov
test-xml: test-cov gocov-xml
@jq -n '{ Packages: [ inputs.Packages ] | add }' $(shell find . -type f -name 'coverage.json' | sort) | $(GOCOVXML) > coverage.xml

.PHONY: test-html

test-html: test-cov gocov-html
@jq -n '{ Packages: [ inputs.Packages ] | add }' $(shell find . -type f -name 'coverage.json' | sort) | $(GOCOVHTML) -t kit -r > coverage.html
@open coverage.html

.PHONY: check
check: fmt vet imports lint
@git diff --quiet || test $$(git diff --name-only | grep -v -e 'go.mod$$' -e 'go.sum$$' | wc -l) -eq 0 || ( echo "The following changes (result of code generators and code checks) have been detected:" && git --no-pager diff && false ) # fail if Git working tree is dirty
Expand Down Expand Up @@ -84,6 +90,10 @@ GOCOVXML = $(BIN_DIR)/gocov-xml
gocov-xml:
$(call go-get-tool,$(GOCOVXML),github.com/AlekSi/[email protected])

GOCOVHTML = $(BIN_DIR)/gocov-html
gocov-html:
$(call go-get-tool,$(GOCOVHTML),github.com/matm/gocov-html/cmd/[email protected])

MOCKERY = $(BIN_DIR)/mockery
mockery:
$(call go-get-tool,$(MOCKERY),github.com/vektra/mockery/[email protected])
Expand Down
8 changes: 4 additions & 4 deletions examples/xkafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@ $ docker-compose up -d
The basic consumer reads messages from a Kafka topic and prints them to the console. It simulates a process crash by restarting the consumer after a random number of messages have been consumed.

```bash
$ go run *.go basic --partitions=2 --consumers=2 --messages=10
go run *.go basic --partitions=2 --consumers=2 --messages=10
```

### Async Consumer

The async consumer reads messages concurrently using a configurable pool of goroutines.

```bash
$ go run *.go basic --partitions=2 --consumers=2 --messages=10 --concurrency=4
go run *.go basic --partitions=2 --consumers=2 --messages=10 --concurrency=4
```

### Batch Consumer

The batch consumer reads messages in batches of a configurable size.

```bash
$ go run *.go batch --partitions=2 --consumers=2 --messages=1000 --batch-size=100
go run *.go batch --partitions=2 --consumers=2 --messages=20 --batch-size=3
```

### Async Batch Consumer

The async batch consumer processes batches concurrently using a configurable pool of goroutines.

```bash
$ go run *.go batch --partitions=2 --consumers=2 --messages=1000 --batch-size=100 --concurrency=4
go run *.go batch --partitions=2 --consumers=2 --messages=20 --batch-size=3 --concurrency=4
```
6 changes: 5 additions & 1 deletion examples/xkafka/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"context"
"time"

"github.com/urfave/cli/v2"
"log/slog"

"github.com/urfave/cli/v2"

"github.com/gojekfarm/xrun"
"github.com/gojekfarm/xtools/xkafka"
slogmw "github.com/gojekfarm/xtools/xkafka/middleware/slog"
Expand Down Expand Up @@ -38,8 +40,10 @@ func runBatch(c *cli.Context) error {
xkafka.ConfigMap{
"auto.offset.reset": "earliest",
},
xkafka.PollTimeout(10 * time.Second),
xkafka.Concurrency(concurrency),
xkafka.BatchSize(size),
xkafka.BatchTimeout(10 * time.Second),
xkafka.ErrorHandler(func(err error) error {
// return error to stop consumer
return err
Expand Down
2 changes: 2 additions & 0 deletions examples/xkafka/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (t *Tracker) CancelIfDone() {
defer t.mu.Unlock()

if len(t.received) == len(t.expect) {
slog.Info("[TRACKER] all messages received, cancelling context")

t.cancel()
}
}
Expand Down
4 changes: 3 additions & 1 deletion xkafka/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ func (b *Batch) AckSuccess() {
}

// AckFail marks the batch as failed to process.
func (b *Batch) AckFail(err error) {
func (b *Batch) AckFail(err error) error {
b.lock.Lock()
defer b.lock.Unlock()

b.Status = Fail
b.err = err

return err
}

// AckSkip marks the batch as skipped.
Expand Down
111 changes: 77 additions & 34 deletions xkafka/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/sourcegraph/conc/pool"
"github.com/sourcegraph/conc/stream"
)

Expand Down Expand Up @@ -66,16 +65,19 @@ func (c *BatchConsumer) Use(mws ...BatchMiddlewarer) {
}

// Run starts the consumer and blocks until context is cancelled.
func (c *BatchConsumer) Run(ctx context.Context) error {
func (c *BatchConsumer) Run(ctx context.Context) (err error) {
defer func() {
cerr := c.close()
err = errors.Join(err, cerr)
}()

if err := c.subscribe(); err != nil {
return err
}

if err := c.start(ctx); err != nil {
return err
}
err = c.start(ctx)

return c.close()
return err
}

// Start subscribes to the configured topics and starts consuming messages.
Expand Down Expand Up @@ -110,24 +112,54 @@ func (c *BatchConsumer) Close() {
func (c *BatchConsumer) start(ctx context.Context) error {
c.handler = c.concatMiddlewares(c.handler)

pool := pool.New().
WithContext(ctx).
WithMaxGoroutines(2).
WithCancelOnError()
// Create a context that can be cancelled with cause
ctx, cancel := context.WithCancelCause(ctx)
defer func() {
cancel(nil)
c.batch.Stop()
}()

errChan := make(chan error, 2)
var wg sync.WaitGroup
wg.Add(2)

pool.Go(func(ctx context.Context) error {
// Start process goroutine
go func() {
defer wg.Done()
var err error
if c.config.concurrency > 1 {
return c.processAsync(ctx)
err = c.processAsync(ctx)
} else {
err = c.process(ctx)
}
if err != nil {
cancel(err)
errChan <- err
}
}()

return c.process(ctx)
})
// Start consume goroutine
go func() {
defer wg.Done()
err := c.consume(ctx)
if err != nil {
cancel(err)
errChan <- err
}
}()

pool.Go(func(ctx context.Context) error {
return c.consume(ctx)
})
// Wait for completion and collect errors
go func() {
wg.Wait()
close(errChan)
}()

return pool.Wait()
// Return the first error that occurred
for err := range errChan {
return err
}

return context.Cause(ctx)
}

func (c *BatchConsumer) process(ctx context.Context) error {
Expand Down Expand Up @@ -157,7 +189,6 @@ func (c *BatchConsumer) processAsync(ctx context.Context) error {
select {
case <-ctx.Done():
st.Wait()

err := context.Cause(ctx)
if errors.Is(err, context.Canceled) {
return nil
Expand Down Expand Up @@ -197,7 +228,7 @@ func (c *BatchConsumer) consume(ctx context.Context) (err error) {
for {
select {
case <-ctx.Done():
return err
return context.Cause(ctx)
default:
km, err := c.kafka.ReadMessage(c.config.pollTimeout)
if err != nil {
Expand All @@ -216,7 +247,6 @@ func (c *BatchConsumer) consume(ctx context.Context) (err error) {
}

msg := newMessage(c.name, km)

c.batch.Add(msg)
}
}
Expand Down Expand Up @@ -274,18 +304,18 @@ type BatchManager struct {
batch *Batch
mutex *sync.RWMutex
flushChan chan *Batch
done chan struct{}
}

// NewBatchManager creates a new BatchManager.
func NewBatchManager(size int, timeout time.Duration) *BatchManager {
b := &BatchManager{
size: size,
timeout: timeout,
mutex: &sync.RWMutex{},
batch: &Batch{
Messages: make([]*Message, 0, size),
},
size: size,
timeout: timeout,
mutex: &sync.RWMutex{},
batch: NewBatch(),
flushChan: make(chan *Batch),
done: make(chan struct{}),
}

go b.runFlushByTime()
Expand All @@ -312,11 +342,21 @@ func (b *BatchManager) Receive() <-chan *Batch {

func (b *BatchManager) runFlushByTime() {
t := time.NewTicker(b.timeout)
defer t.Stop()

for range t.C {
b.mutex.Lock()
b.flush()
b.mutex.Unlock()
for {
select {
case <-b.done:
b.mutex.Lock()
b.flush()
close(b.flushChan)
b.mutex.Unlock()
return
case <-t.C:
b.mutex.Lock()
b.flush()
b.mutex.Unlock()
}
}
}

Expand All @@ -330,7 +370,10 @@ func (b *BatchManager) flush() {

b.flushChan <- b.batch

b.batch = &Batch{
Messages: make([]*Message, 0, b.size),
}
b.batch = NewBatch()
}

// Stop signals the batch manager to stop and clean up
func (b *BatchManager) Stop() {
close(b.done)
}
52 changes: 52 additions & 0 deletions xkafka/batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,58 @@ func TestBatchConsumer_HandleBatch(t *testing.T) {
mockKafka.AssertExpectations(t)
}

func TestBatchConsumer_HandleBatchError(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
options []ConsumerOption
}{
{
name: "sequential",
options: []ConsumerOption{
BatchSize(10),
BatchTimeout(testTimeout),
},
},
{
name: "async",
options: []ConsumerOption{
Concurrency(2),
BatchSize(10),
BatchTimeout(testTimeout),
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
consumer, mockKafka := newTestBatchConsumer(t, append(defaultOpts, tc.options...)...)

km := newFakeKafkaMessage()
ctx := context.Background()

handler := BatchHandlerFunc(func(ctx context.Context, b *Batch) error {
err := assert.AnError

return b.AckFail(err)
})

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)
mockKafka.On("Close").Return(nil)

consumer.handler = handler
err := consumer.Run(ctx)
assert.ErrorIs(t, err, assert.AnError)

mockKafka.AssertExpectations(t)
})
}
}

func TestBatchConsumer_Async(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit b556d7e

Please sign in to comment.