Skip to content

Commit

Permalink
feat: clog update
Browse files Browse the repository at this point in the history
  • Loading branch information
adyusupov committed May 27, 2024
1 parent dd7cff7 commit ed4a873
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 48 deletions.
19 changes: 14 additions & 5 deletions clog/attrs.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package clog

import "log/slog"
import (
"log/slog"
"reflect"
)

type fieldKey string

type fields map[fieldKey]interface{}
type Fields map[fieldKey]interface{}

// convertToAttrs converts a map of custom fields to a slice of slog.Attr
func convertToAttrs(fields fields) []any {
// ConvertToAttrs converts a map of custom fields to a slice of slog.Attr
func ConvertToAttrs(fields Fields) []any {
var attrs []any
for k, v := range fields {
attrs = append(attrs, slog.Any(string(k), v))
if v != nil && !isZeroValue(v) {
attrs = append(attrs, slog.Any(string(k), v))
}
}
return attrs
}

func isZeroValue(v interface{}) bool {
return v == reflect.Zero(reflect.TypeOf(v)).Interface()
}
45 changes: 21 additions & 24 deletions clog/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import (
"sync"
)

func NewCustomLogger(writer io.Writer, level slog.Level, addSource bool) *CustomLogger {
func NewCustomLogger(dest io.Writer, level slog.Level, addSource bool) *CustomLogger {
return &CustomLogger{
Logger: slog.New(
slog.NewJSONHandler(
writer,
&slog.HandlerOptions{
AddSource: addSource,
Level: level,
},
),
),
Logger: slog.New(slog.NewJSONHandler(
dest,
&slog.HandlerOptions{
AddSource: addSource,
Level: level,
})),
ctxKeys: []fieldKey{},
}
}
Expand All @@ -30,24 +27,24 @@ type CustomLogger struct {
ctxKeys []fieldKey
}

// ErrorfCtx logs an error message with fmt.SprintF()
func (l *CustomLogger) ErrorfCtx(ctx context.Context, err error, msg string, args ...any) {
l.With(convertToAttrs(l.fromCtx(ctx))...).With(slog.String("error", err.Error())).ErrorContext(ctx, fmt.Sprintf(msg, args...))
// ErrorCtx logs an error message with fmt.SprintF()
func (l *CustomLogger) ErrorCtx(ctx context.Context, err error, msg string, args ...any) {
l.With(ConvertToAttrs(l.fromCtx(ctx))...).With(slog.String("error", err.Error())).ErrorContext(ctx, fmt.Sprintf(msg, args...))
}

// InfofCtx logs an informational message with fmt.SprintF()
func (l *CustomLogger) InfofCtx(ctx context.Context, msg string, args ...any) {
l.With(convertToAttrs(l.fromCtx(ctx))...).InfoContext(ctx, fmt.Sprintf(msg, args...))
// InfoCtx logs an informational message with fmt.SprintF()
func (l *CustomLogger) InfoCtx(ctx context.Context, msg string, args ...any) {
l.With(ConvertToAttrs(l.fromCtx(ctx))...).InfoContext(ctx, fmt.Sprintf(msg, args...))
}

// DebugfCtx logs a debug message with fmt.SprintF()
func (l *CustomLogger) DebugfCtx(ctx context.Context, msg string, args ...any) {
l.With(convertToAttrs(l.fromCtx(ctx))...).DebugContext(ctx, fmt.Sprintf(msg, args...))
// DebugCtx logs a debug message with fmt.SprintF()
func (l *CustomLogger) DebugCtx(ctx context.Context, msg string, args ...any) {
l.With(ConvertToAttrs(l.fromCtx(ctx))...).DebugContext(ctx, fmt.Sprintf(msg, args...))
}

// WarnfCtx logs a debug message with fmt.SprintF()
func (l *CustomLogger) WarnfCtx(ctx context.Context, msg string, args ...any) {
l.With(convertToAttrs(l.fromCtx(ctx))...).WarnContext(ctx, fmt.Sprintf(msg, args...))
// WarnCtx logs a debug message with fmt.SprintF()
func (l *CustomLogger) WarnCtx(ctx context.Context, msg string, args ...any) {
l.With(ConvertToAttrs(l.fromCtx(ctx))...).WarnContext(ctx, fmt.Sprintf(msg, args...))
}

func (l *CustomLogger) AddKeysValuesToCtx(ctx context.Context, kv map[string]interface{}) context.Context {
Expand All @@ -62,11 +59,11 @@ func (l *CustomLogger) AddKeysValuesToCtx(ctx context.Context, kv map[string]int
return ctx
}

func (l *CustomLogger) fromCtx(ctx context.Context) fields {
func (l *CustomLogger) fromCtx(ctx context.Context) Fields {
l.mu.Lock()
defer l.mu.Unlock()

f := make(fields)
f := make(Fields)
for _, ctxKey := range l.ctxKeys {
f[ctxKey] = ctx.Value(ctxKey)
}
Expand Down
25 changes: 14 additions & 11 deletions clog/clog_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
package clog
package clog_test

import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/require"
"log/slog"
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/gateway-fm/scriptorium/clog"
)

const msgKey = "msg"

func TestCustomLogger(t *testing.T) {
var buf bytes.Buffer

logger := NewCustomLogger(&buf, slog.LevelDebug, true)
logger := clog.NewCustomLogger(&buf, slog.LevelDebug, true)

ctx := context.Background()
ctx = logger.AddKeysValuesToCtx(ctx, map[string]interface{}{"user": "testUser"})
Expand All @@ -30,29 +33,29 @@ func TestCustomLogger(t *testing.T) {
{
name: "ErrorfCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.ErrorfCtx(ctx, fmt.Errorf("test error"), msg, args...)
logger.ErrorCtx(ctx, fmt.Errorf("test error"), msg, args...)
},
expected: map[string]interface{}{"level": "ERROR", "user": "testUser", "error": "test error", msgKey: "an error occurred"},
errorInput: fmt.Errorf("test error"),
},
{
name: "InfofCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.InfofCtx(ctx, msg, args...)
logger.InfoCtx(ctx, msg, args...)
},
expected: map[string]interface{}{"level": "INFO", "user": "testUser", msgKey: "informational message"},
},
{
name: "DebugfCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.DebugfCtx(ctx, msg, args...)
logger.DebugCtx(ctx, msg, args...)
},
expected: map[string]interface{}{"level": "DEBUG", "user": "testUser", msgKey: "debugging message"},
},
{
name: "WarnfCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.WarnfCtx(ctx, msg, args...)
logger.WarnCtx(ctx, msg, args...)
},
expected: map[string]interface{}{"level": "WARN", "user": "testUser", msgKey: "warning message"},
},
Expand Down Expand Up @@ -80,7 +83,7 @@ func TestCustomLogger(t *testing.T) {
func TestCustomLogger_Level(t *testing.T) {
var buf bytes.Buffer

logger := NewCustomLogger(&buf, slog.LevelInfo, true)
logger := clog.NewCustomLogger(&buf, slog.LevelInfo, true)

ctx := context.Background()
ctx = logger.AddKeysValuesToCtx(ctx, map[string]interface{}{"user": "testUser"})
Expand All @@ -94,7 +97,7 @@ func TestCustomLogger_Level(t *testing.T) {
{
name: "DebugfCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.DebugfCtx(ctx, msg, args...)
logger.DebugCtx(ctx, msg, args...)
},
expected: map[string]interface{}{"level": "DEBUG", "user": "testUser", msgKey: "debugging message"},
},
Expand All @@ -112,7 +115,7 @@ func TestCustomLogger_Level(t *testing.T) {
}

func TestConvertToAttrsConcurrentAccess(t *testing.T) {
testFields := fields{
testFields := clog.Fields{
"user": "testUser",
"session": "xyz123",
"role": "admin",
Expand All @@ -126,7 +129,7 @@ func TestConvertToAttrsConcurrentAccess(t *testing.T) {
for i := 0; i < repeat; i++ {
go func() {
defer wg.Done()
_ = convertToAttrs(testFields)
_ = clog.ConvertToAttrs(testFields)
}()
}

Expand Down
21 changes: 13 additions & 8 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Event struct {
}

// EventHandler is a function type that processes an Event and returns an error if the processing fails.
type EventHandler func(ctx context.Context, event Event) AckStatus
type EventHandler func(ctx context.Context, event *Event) AckStatus

// eventBus implements the EventBus interface with support for topic-based subscriptions and event retries.
type eventBus struct {
Expand All @@ -34,7 +34,7 @@ type eventBus struct {
log *clog.CustomLogger // log is a custom logger for logging information about event processing.
handlers map[string][]EventHandler // handlers store a slice of event handlers for each topic.
delay map[string][]time.Duration // delay specifies the retry delays for each topic.
queue chan Event // queue is the channel through which events are published and processed.
queue chan *Event // queue is the channel through which events are published and processed.
lock sync.RWMutex // lock is used to synchronize access to handlers and delays.
}

Expand All @@ -47,7 +47,7 @@ func NewEventBus(ctx context.Context, size int) EventBus {
cf: cf,
handlers: make(map[string][]EventHandler),
delay: make(map[string][]time.Duration),
queue: make(chan Event, size),
queue: make(chan *Event, size),
}
}

Expand Down Expand Up @@ -77,7 +77,7 @@ func (bus *eventBus) Subscribe(

// Publish sends an event with the specified data to the specified topic.
func (bus *eventBus) Publish(topic string, data []byte) {
bus.queue <- Event{Data: data, Topic: topic, Retry: 0, NextRetry: 0}
bus.queue <- &Event{Data: data, Topic: topic, Retry: 0, NextRetry: 0}
}

// StartProcessing begins processing events from the queue. It listens for cancellation via the provided context to gracefully stop processing.
Expand All @@ -98,7 +98,7 @@ func (bus *eventBus) StartProcessing(ctx context.Context) {
}

// processEvent handles the processing of a single event, including retry logic and error handling.
func processEvent(ctx context.Context, bus *eventBus, event Event) {
func processEvent(ctx context.Context, bus *eventBus, event *Event) {
handlers, ok := bus.handlers[event.Topic]
if !ok {
return
Expand All @@ -113,15 +113,20 @@ func processEvent(ctx context.Context, bus *eventBus, event Event) {

go bus.retryEvent(ctx, event)
case event.Retry >= maxRetries:
bus.log.DebugfCtx(ctx, "Max retries for event: %+v\n", event)
bus.log.DebugfCtx(ctx, "Message %s from topic %s reached max retries, this was %d retry", event)
event.AckStatus = ACK
case status == ACK:
bus.log.DebugfCtx(ctx, "Message read: %+v\n", event)
bus.log.DebugfCtx(ctx, "Message %s from topic %s read, this was %d retry",
string(event.Data),
event.Topic,
event.Retry,
)
}
}
}

// retryEvent attempts to re-enqueue an event for processing after a delay, respecting the provided context.
func (bus *eventBus) retryEvent(ctx context.Context, event Event) {
func (bus *eventBus) retryEvent(ctx context.Context, event *Event) {
select {
case <-ctx.Done():
bus.log.DebugfCtx(ctx, "Retry canceled due to context cancellation for event: %+v\n", event)
Expand Down

0 comments on commit ed4a873

Please sign in to comment.