Skip to content

Commit

Permalink
Add slog middleware (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes authored Jun 13, 2024
1 parent 5aeb0ca commit 77b3ba0
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 0 deletions.
82 changes: 82 additions & 0 deletions xkafka/middleware/slog/slog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Package slog provides logging middleware using log/slog.
package slog

import (
"context"
"time"

"log/slog"

"github.com/gojekfarm/xtools/xkafka"
)

// Option is a configuration option for the logging middleware.
type Option interface {
apply(*logOptions)
}

type optionFunc func(*logOptions)

func (f optionFunc) apply(o *logOptions) { f(o) }

// Level sets the log level to be used.
type Level slog.Level

func (l Level) apply(o *logOptions) { o.level = slog.Level(l) }

// Logger sets a custom logger to be used.
// slog.Default() is used by default.
func Logger(logger *slog.Logger) Option {
return optionFunc(func(o *logOptions) {
o.logger = logger
})
}

type logOptions struct {
level slog.Level
logger *slog.Logger
}

func newLogOptions(opts ...Option) *logOptions {
opt := &logOptions{
level: slog.LevelInfo,
logger: slog.Default(),
}

for _, o := range opts {
o.apply(opt)
}

return opt
}

// LoggingMiddleware is a middleware that logs messages using log/slog.
func LoggingMiddleware(opts ...Option) xkafka.MiddlewareFunc {
cfg := newLogOptions(opts...)

return func(next xkafka.Handler) xkafka.Handler {
return xkafka.HandlerFunc(func(ctx context.Context, msg *xkafka.Message) error {
start := time.Now()
logger := cfg.logger.WithGroup("xkafka")

err := next.Handle(ctx, msg)

args := []slog.Attr{
slog.String("topic", msg.Topic),
slog.Int64("partition", int64(msg.Partition)),
slog.Int64("offset", msg.Offset),
slog.String("key", string(msg.Key)),
slog.String("status", msg.Status.String()),
slog.Duration("duration", time.Since(start)),
}

if err != nil {
logger.LogAttrs(ctx, slog.LevelError, "[xkafka] message processing failed", args...)
} else {
logger.LogAttrs(ctx, cfg.level, "[xkafka] message processed", args...)
}

return err
})
}
}
57 changes: 57 additions & 0 deletions xkafka/middleware/slog/slog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package slog

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"log/slog"

"github.com/gojekfarm/xtools/xkafka"
)

var logger = Logger(slog.Default())

func TestLoggingMiddleware(t *testing.T) {
msg := &xkafka.Message{
Topic: "test-topic",
Partition: 0,
Offset: 0,
Key: []byte("test-key"),
}

loggingMiddleware := LoggingMiddleware(
Level(slog.LevelInfo),
logger,
)
handler := loggingMiddleware(xkafka.HandlerFunc(func(ctx context.Context, msg *xkafka.Message) error {
msg.AckSuccess()

return nil
}))

err := handler.Handle(context.Background(), msg)
assert.NoError(t, err)
}

func TestLoggingMiddlewareWithError(t *testing.T) {
msg := &xkafka.Message{
Topic: "test-topic",
Partition: 0,
Offset: 0,
Key: []byte("test-key"),
}

loggingMiddleware := LoggingMiddleware(
Level(slog.LevelInfo),
logger,
)
handler := loggingMiddleware(xkafka.HandlerFunc(func(ctx context.Context, msg *xkafka.Message) error {
msg.AckFail(assert.AnError)

return assert.AnError
}))

err := handler.Handle(context.Background(), msg)
assert.ErrorIs(t, err, assert.AnError)
}

0 comments on commit 77b3ba0

Please sign in to comment.