Skip to content

Commit

Permalink
Add xpromkafka.BatchConsumerMiddleware and corresponding tests for ba…
Browse files Browse the repository at this point in the history
…tch processing metrics and error handling
  • Loading branch information
Ravi Atluri committed Jan 8, 2025
1 parent 5d90ea1 commit beaea1e
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
67 changes: 67 additions & 0 deletions xprom/xpromkafka/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,73 @@ func (c *Collector) ConsumerMiddleware(opts ...Option) xkafka.MiddlewareFunc {
}
}

// BatchConsumerMiddleware returns a middleware that instruments xkafka.BatchConsumer.
// Options passed to this function will override the Collector options.
func (c *Collector) BatchConsumerMiddleware(opts ...Option) xkafka.BatchMiddlewareFunc {
mwopts := &options{
errFn: c.opts.errFn,
address: c.opts.address,
port: c.opts.port,
}

for _, opt := range opts {
opt.apply(mwopts)
}

return func(next xkafka.BatchHandler) xkafka.BatchHandler {
return xkafka.BatchHandlerFunc(func(ctx context.Context, batch *xkafka.Batch) error {
start := time.Now()

for _, msg := range batch.Messages {
labels := prometheus.Labels{
semconv.MessagingOperationName: semconv.OperationConsume,
semconv.ServerAddress: mwopts.address,
semconv.MessagingKafkaConsumerGroup: msg.Group,
semconv.MessagingKafkaTopic: msg.Topic,
semconv.MessagingKafkaPartition: fmt.Sprintf("%d", msg.Partition),
}

if mwopts.port != 0 {
labels[semconv.ServerPort] = fmt.Sprintf("%d", mwopts.port)
}

inflight := c.inflight.With(labels)

inflight.Inc()
defer inflight.Dec()
}

err := next.HandleBatch(ctx, batch)

for _, msg := range batch.Messages {
labels := prometheus.Labels{
semconv.MessagingOperationName: semconv.OperationConsume,
semconv.ServerAddress: mwopts.address,
semconv.MessagingKafkaConsumerGroup: msg.Group,
semconv.MessagingKafkaTopic: msg.Topic,
semconv.MessagingKafkaPartition: fmt.Sprintf("%d", msg.Partition),
}

if mwopts.port != 0 {
labels[semconv.ServerPort] = fmt.Sprintf("%d", mwopts.port)
}

labels[semconv.MessagingKafkaMessageStatus] = batch.Status.String()
labels[semconv.ErrorType] = ""

if batch.Err() != nil && mwopts.errFn != nil {
labels[semconv.ErrorType] = mwopts.errFn(batch.Err())
}

c.duration.With(labels).Observe(time.Since(start).Seconds())
c.consumed.With(labels).Inc()
}

return err
})
}
}

// ProducerMiddleware returns a middleware that instruments xkafka.Producer.
// Options passed to this function will override the Collector options.
func (c *Collector) ProducerMiddleware(opts ...Option) xkafka.MiddlewareFunc {
Expand Down
66 changes: 66 additions & 0 deletions xprom/xpromkafka/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,72 @@ func TestConsumerMiddleware(t *testing.T) {
assert.NoError(t, err)
}

func TestBatchConsumerMiddleware(t *testing.T) {
batch := xkafka.NewBatch()
msg1 := &xkafka.Message{
Topic: "test-topic-1",
Group: "test-group-1",
Partition: 12,
Key: []byte("key"),
Value: []byte("value"),
}

msg2 := &xkafka.Message{
Topic: "test-topic-2",
Group: "test-group-2",
Partition: 13,
Key: []byte("key"),
Value: []byte("value"),
}

batch.Messages = append(batch.Messages, msg1, msg2)

handler := xkafka.BatchHandlerFunc(func(ctx context.Context, b *xkafka.Batch) error {
err := errors.New("some error")

b.AckFail(err)

return err
})

reg := prometheus.NewRegistry()
collector := NewCollector(
LatencyBuckets{0.1, 0.5, 1, 2, 5},
)

collector.Register(reg)

instrumentedHandler := collector.BatchConsumerMiddleware(
Address("localhost"),
Port(9092),
ErrorClassifer(func(err error) string {
return "CustomError"
}),
).BatchMiddleware(handler)

err := instrumentedHandler.HandleBatch(context.TODO(), batch)
assert.Error(t, err)

expectedMetrics := []string{
"messaging_inflight_messages",
"messaging_client_consumed_messages",
}

expected := `
# HELP messaging_client_consumed_messages Messages consumed.
# TYPE messaging_client_consumed_messages counter
messaging_client_consumed_messages{error_type="CustomError",messaging_consumer_group_name="test-group-1",messaging_destination_name="test-topic-1",messaging_destination_partition_id="12",messaging_kafka_message_status="FAIL",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 1
messaging_client_consumed_messages{error_type="CustomError",messaging_consumer_group_name="test-group-2",messaging_destination_name="test-topic-2",messaging_destination_partition_id="13",messaging_kafka_message_status="FAIL",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 1
# HELP messaging_inflight_messages Messages currently being processed.
# TYPE messaging_inflight_messages gauge
messaging_inflight_messages{messaging_consumer_group_name="test-group-1",messaging_destination_name="test-topic-1",messaging_destination_partition_id="12",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 0
messaging_inflight_messages{messaging_consumer_group_name="test-group-2",messaging_destination_name="test-topic-2",messaging_destination_partition_id="13",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 0
`

err = testutil.GatherAndCompare(reg, strings.NewReader(expected), expectedMetrics...)
assert.NoError(t, err)
}

func TestProducerMiddleware(t *testing.T) {
msg := &xkafka.Message{
Topic: "test-topic",
Expand Down

0 comments on commit beaea1e

Please sign in to comment.