Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add middleware for xkafka.BatchConsumer #61

Open
wants to merge 4 commits into
base: xkafka/batch
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions xkafka/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (c *BatchConsumer) runAsync(ctx context.Context) error {
st := stream.New().WithMaxGoroutines(c.config.concurrency)
ctx, cancel := context.WithCancelCause(ctx)

defer cancel(nil)

batch := NewBatch()
timer := time.NewTimer(c.config.batchTimeout)

Expand Down
81 changes: 50 additions & 31 deletions xkafka/batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,49 +312,68 @@ func TestBatchConsumer_Async(t *testing.T) {
func TestBatchConsumer_StopOffsetOnError(t *testing.T) {
t.Parallel()

opts := append(defaultOpts,
Concurrency(2),
BatchSize(3),
)
consumer, mockKafka := newTestBatchConsumer(t, opts...)
testcases := []struct {
name string
options []ConsumerOption
}{
{
name: "sequential",
options: []ConsumerOption{
BatchSize(3),
},
},
{
name: "async",
options: []ConsumerOption{
Concurrency(2),
BatchSize(3),
},
},
}

km := newFakeKafkaMessage()
ctx, cancel := context.WithCancel(context.Background())
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
consumer, mockKafka := newTestBatchConsumer(t, append(defaultOpts, tc.options...)...)

count := atomic.Int32{}
km := newFakeKafkaMessage()
ctx, cancel := context.WithCancel(context.Background())

handler := BatchHandlerFunc(func(ctx context.Context, b *Batch) error {
assert.NotNil(t, b)
count := atomic.Int32{}

n := count.Add(1)
handler := BatchHandlerFunc(func(ctx context.Context, b *Batch) error {
assert.NotNil(t, b)

if n > 2 {
err := assert.AnError
cancel()
n := count.Add(1)

return b.AckFail(err)
}
if n > 2 {
err := assert.AnError
cancel()

b.AckSuccess()
return b.AckFail(err)
}

return nil
})
b.AckSuccess()

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)
mockKafka.On("Close").Return(nil)
return nil
})

mockKafka.On("StoreOffsets", mock.Anything).
Return(nil, nil).
Times(2)
mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)
mockKafka.On("Close").Return(nil)

consumer.handler = handler
err := consumer.Run(ctx)
assert.ErrorIs(t, err, assert.AnError)
mockKafka.On("StoreOffsets", mock.Anything).
Return(nil, nil).
Times(2)

mockKafka.AssertExpectations(t)
consumer.handler = handler
err := consumer.Run(ctx)
assert.ErrorIs(t, err, assert.AnError)

mockKafka.AssertExpectations(t)
})
}
}

func TestBatchConsumer_BatchTimeout(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions xkafka/middleware/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,21 @@ func RecoverMiddleware() xkafka.MiddlewareFunc {
})
}
}

// BatchRecoverMiddleware catches panics and prevents the request goroutine from crashing
// for xkafka.BatchConsumer.
func BatchRecoverMiddleware() xkafka.BatchMiddlewareFunc {
return func(next xkafka.BatchHandler) xkafka.BatchHandler {
return xkafka.BatchHandlerFunc(func(ctx context.Context, batch *xkafka.Batch) error {
defer func() {
if r := recover(); r != nil {
_ = batch.AckFail(fmt.Errorf("%+v", r))

return
}
}()

return next.HandleBatch(ctx, batch)
})
}
}
17 changes: 17 additions & 0 deletions xkafka/middleware/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,20 @@ func TestRecoverMiddleware(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, xkafka.Fail, msg.Status)
}

func TestBatchRecoverMiddleware(t *testing.T) {
var handler xkafka.BatchHandler

handler = xkafka.BatchHandlerFunc(func(ctx context.Context, batch *xkafka.Batch) error {
panic("test panic")
})

m := middleware.BatchRecoverMiddleware()

handler = m.BatchMiddleware(handler)
batch := xkafka.Batch{}

err := handler.HandleBatch(context.Background(), &batch)
assert.NoError(t, err)
assert.Equal(t, xkafka.Fail, batch.Status)
}
23 changes: 23 additions & 0 deletions xkafka/middleware/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,26 @@ func ExponentialBackoff(opts ...Option) xkafka.MiddlewareFunc {
})
}
}

// BatchExponentialBackoff is a middleware with exponential backoff retry strategy
// for xkafka.BatchConsumer.
func BatchExponentialBackoff(opts ...Option) xkafka.BatchMiddlewareFunc {
cfg := newConfig(opts...)

return func(next xkafka.BatchHandler) xkafka.BatchHandler {
return xkafka.BatchHandlerFunc(func(ctx context.Context, batch *xkafka.Batch) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = cfg.delay
expBackoff.MaxElapsedTime = cfg.maxDuration
expBackoff.RandomizationFactor = float64(cfg.jitter) / float64(cfg.delay)
expBackoff.Multiplier = cfg.multiplier

b := backoff.WithMaxRetries(expBackoff, uint64(cfg.maxRetries))
b = backoff.WithContext(b, ctx)

return backoff.Retry(func() error {
return next.HandleBatch(ctx, batch)
}, b)
})
}
}
40 changes: 40 additions & 0 deletions xkafka/middleware/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,43 @@ func TestExponentialBackoff_PermanentError(t *testing.T) {
err := mw(handler).Handle(context.TODO(), msg)
assert.ErrorIs(t, err, ErrPermanent)
}

func TestBatchExponentialBackoff(t *testing.T) {
batch := xkafka.NewBatch()
msg := &xkafka.Message{
Topic: "test-topic",
Group: "test-group",
Partition: 2,
Key: []byte("key"),
Value: []byte("value"),
}

batch.Messages = append(batch.Messages, msg)

mw := BatchExponentialBackoff(MaxRetries(3))

err := mw(xkafka.BatchHandlerFunc(func(ctx context.Context, b *xkafka.Batch) error {
return assert.AnError
})).HandleBatch(context.TODO(), batch)
assert.ErrorIs(t, err, assert.AnError)
}

func TestBatchExponentialBackoff_PermanentError(t *testing.T) {
batch := xkafka.NewBatch()
msg := &xkafka.Message{
Topic: "test-topic",
Group: "test-group",
Partition: 2,
Key: []byte("key"),
Value: []byte("value"),
}

batch.Messages = append(batch.Messages, msg)

mw := BatchExponentialBackoff(MaxRetries(3))

err := mw(xkafka.BatchHandlerFunc(func(ctx context.Context, b *xkafka.Batch) error {
return ErrPermanent
})).HandleBatch(context.TODO(), batch)
assert.ErrorIs(t, err, ErrPermanent)
}
67 changes: 67 additions & 0 deletions xprom/xpromkafka/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,73 @@ func (c *Collector) ConsumerMiddleware(opts ...Option) xkafka.MiddlewareFunc {
}
}

// BatchConsumerMiddleware returns a middleware that instruments xkafka.BatchConsumer.
// Options passed to this function will override the Collector options.
func (c *Collector) BatchConsumerMiddleware(opts ...Option) xkafka.BatchMiddlewareFunc {
mwopts := &options{
errFn: c.opts.errFn,
address: c.opts.address,
port: c.opts.port,
}

for _, opt := range opts {
opt.apply(mwopts)
}

return func(next xkafka.BatchHandler) xkafka.BatchHandler {
return xkafka.BatchHandlerFunc(func(ctx context.Context, batch *xkafka.Batch) error {
start := time.Now()

for _, msg := range batch.Messages {
labels := prometheus.Labels{
semconv.MessagingOperationName: semconv.OperationConsume,
semconv.ServerAddress: mwopts.address,
semconv.MessagingKafkaConsumerGroup: msg.Group,
semconv.MessagingKafkaTopic: msg.Topic,
semconv.MessagingKafkaPartition: fmt.Sprintf("%d", msg.Partition),
}

if mwopts.port != 0 {
labels[semconv.ServerPort] = fmt.Sprintf("%d", mwopts.port)
}

inflight := c.inflight.With(labels)

inflight.Inc()
defer inflight.Dec()
}

err := next.HandleBatch(ctx, batch)

for _, msg := range batch.Messages {
labels := prometheus.Labels{
semconv.MessagingOperationName: semconv.OperationConsume,
semconv.ServerAddress: mwopts.address,
semconv.MessagingKafkaConsumerGroup: msg.Group,
semconv.MessagingKafkaTopic: msg.Topic,
semconv.MessagingKafkaPartition: fmt.Sprintf("%d", msg.Partition),
}

if mwopts.port != 0 {
labels[semconv.ServerPort] = fmt.Sprintf("%d", mwopts.port)
}

labels[semconv.MessagingKafkaMessageStatus] = batch.Status.String()
labels[semconv.ErrorType] = ""

if batch.Err() != nil && mwopts.errFn != nil {
labels[semconv.ErrorType] = mwopts.errFn(batch.Err())
}

c.duration.With(labels).Observe(time.Since(start).Seconds())
c.consumed.With(labels).Inc()
}

return err
})
}
}

// ProducerMiddleware returns a middleware that instruments xkafka.Producer.
// Options passed to this function will override the Collector options.
func (c *Collector) ProducerMiddleware(opts ...Option) xkafka.MiddlewareFunc {
Expand Down
66 changes: 66 additions & 0 deletions xprom/xpromkafka/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,72 @@ func TestConsumerMiddleware(t *testing.T) {
assert.NoError(t, err)
}

func TestBatchConsumerMiddleware(t *testing.T) {
batch := xkafka.NewBatch()
msg1 := &xkafka.Message{
Topic: "test-topic-1",
Group: "test-group-1",
Partition: 12,
Key: []byte("key"),
Value: []byte("value"),
}

msg2 := &xkafka.Message{
Topic: "test-topic-2",
Group: "test-group-2",
Partition: 13,
Key: []byte("key"),
Value: []byte("value"),
}

batch.Messages = append(batch.Messages, msg1, msg2)

handler := xkafka.BatchHandlerFunc(func(ctx context.Context, b *xkafka.Batch) error {
err := errors.New("some error")

b.AckFail(err)

return err
})

reg := prometheus.NewRegistry()
collector := NewCollector(
LatencyBuckets{0.1, 0.5, 1, 2, 5},
)

collector.Register(reg)

instrumentedHandler := collector.BatchConsumerMiddleware(
Address("localhost"),
Port(9092),
ErrorClassifer(func(err error) string {
return "CustomError"
}),
).BatchMiddleware(handler)

err := instrumentedHandler.HandleBatch(context.TODO(), batch)
assert.Error(t, err)

expectedMetrics := []string{
"messaging_inflight_messages",
"messaging_client_consumed_messages",
}

expected := `
# HELP messaging_client_consumed_messages Messages consumed.
# TYPE messaging_client_consumed_messages counter
messaging_client_consumed_messages{error_type="CustomError",messaging_consumer_group_name="test-group-1",messaging_destination_name="test-topic-1",messaging_destination_partition_id="12",messaging_kafka_message_status="FAIL",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 1
messaging_client_consumed_messages{error_type="CustomError",messaging_consumer_group_name="test-group-2",messaging_destination_name="test-topic-2",messaging_destination_partition_id="13",messaging_kafka_message_status="FAIL",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 1
# HELP messaging_inflight_messages Messages currently being processed.
# TYPE messaging_inflight_messages gauge
messaging_inflight_messages{messaging_consumer_group_name="test-group-1",messaging_destination_name="test-topic-1",messaging_destination_partition_id="12",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 0
messaging_inflight_messages{messaging_consumer_group_name="test-group-2",messaging_destination_name="test-topic-2",messaging_destination_partition_id="13",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 0
`

err = testutil.GatherAndCompare(reg, strings.NewReader(expected), expectedMetrics...)
assert.NoError(t, err)
}

func TestProducerMiddleware(t *testing.T) {
msg := &xkafka.Message{
Topic: "test-topic",
Expand Down
Loading