Skip to content

Commit

Permalink
feat: clog update
Browse files Browse the repository at this point in the history
  • Loading branch information
adyusupov committed May 29, 2024
1 parent dd7cff7 commit 995c552
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
version: latest

- name: Test
run: go test ./...
run: go clean --testcache && go test ./...

- name: Clean workspace
uses: AutoModality/[email protected]
19 changes: 14 additions & 5 deletions clog/attrs.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package clog

import "log/slog"
import (
"log/slog"
"reflect"
)

type fieldKey string

type fields map[fieldKey]interface{}
type Fields map[fieldKey]interface{}

// convertToAttrs converts a map of custom fields to a slice of slog.Attr
func convertToAttrs(fields fields) []any {
// ConvertToAttrs converts a map of custom fields to a slice of slog.Attr
func ConvertToAttrs(fields Fields) []any {
var attrs []any
for k, v := range fields {
attrs = append(attrs, slog.Any(string(k), v))
if v != nil && !isZeroValue(v) {
attrs = append(attrs, slog.Any(string(k), v))
}
}
return attrs
}

func isZeroValue(v interface{}) bool {
return v == reflect.Zero(reflect.TypeOf(v)).Interface()
}
45 changes: 21 additions & 24 deletions clog/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import (
"sync"
)

func NewCustomLogger(writer io.Writer, level slog.Level, addSource bool) *CustomLogger {
func NewCustomLogger(dest io.Writer, level slog.Level, addSource bool) *CustomLogger {
return &CustomLogger{
Logger: slog.New(
slog.NewJSONHandler(
writer,
&slog.HandlerOptions{
AddSource: addSource,
Level: level,
},
),
),
Logger: slog.New(slog.NewJSONHandler(
dest,
&slog.HandlerOptions{
AddSource: addSource,
Level: level,
})),
ctxKeys: []fieldKey{},
}
}
Expand All @@ -30,24 +27,24 @@ type CustomLogger struct {
ctxKeys []fieldKey
}

// ErrorfCtx logs an error message with fmt.SprintF()
func (l *CustomLogger) ErrorfCtx(ctx context.Context, err error, msg string, args ...any) {
l.With(convertToAttrs(l.fromCtx(ctx))...).With(slog.String("error", err.Error())).ErrorContext(ctx, fmt.Sprintf(msg, args...))
// ErrorCtx logs an error message with fmt.SprintF()
func (l *CustomLogger) ErrorCtx(ctx context.Context, err error, msg string, args ...any) {
l.With(ConvertToAttrs(l.fromCtx(ctx))...).With(slog.String("error", err.Error())).ErrorContext(ctx, fmt.Sprintf(msg, args...))
}

// InfofCtx logs an informational message with fmt.SprintF()
func (l *CustomLogger) InfofCtx(ctx context.Context, msg string, args ...any) {
l.With(convertToAttrs(l.fromCtx(ctx))...).InfoContext(ctx, fmt.Sprintf(msg, args...))
// InfoCtx logs an informational message with fmt.SprintF()
func (l *CustomLogger) InfoCtx(ctx context.Context, msg string, args ...any) {
l.With(ConvertToAttrs(l.fromCtx(ctx))...).InfoContext(ctx, fmt.Sprintf(msg, args...))
}

// DebugfCtx logs a debug message with fmt.SprintF()
func (l *CustomLogger) DebugfCtx(ctx context.Context, msg string, args ...any) {
l.With(convertToAttrs(l.fromCtx(ctx))...).DebugContext(ctx, fmt.Sprintf(msg, args...))
// DebugCtx logs a debug message with fmt.SprintF()
func (l *CustomLogger) DebugCtx(ctx context.Context, msg string, args ...any) {
l.With(ConvertToAttrs(l.fromCtx(ctx))...).DebugContext(ctx, fmt.Sprintf(msg, args...))
}

// WarnfCtx logs a debug message with fmt.SprintF()
func (l *CustomLogger) WarnfCtx(ctx context.Context, msg string, args ...any) {
l.With(convertToAttrs(l.fromCtx(ctx))...).WarnContext(ctx, fmt.Sprintf(msg, args...))
// WarnCtx logs a debug message with fmt.SprintF()
func (l *CustomLogger) WarnCtx(ctx context.Context, msg string, args ...any) {
l.With(ConvertToAttrs(l.fromCtx(ctx))...).WarnContext(ctx, fmt.Sprintf(msg, args...))
}

func (l *CustomLogger) AddKeysValuesToCtx(ctx context.Context, kv map[string]interface{}) context.Context {
Expand All @@ -62,11 +59,11 @@ func (l *CustomLogger) AddKeysValuesToCtx(ctx context.Context, kv map[string]int
return ctx
}

func (l *CustomLogger) fromCtx(ctx context.Context) fields {
func (l *CustomLogger) fromCtx(ctx context.Context) Fields {
l.mu.Lock()
defer l.mu.Unlock()

f := make(fields)
f := make(Fields)
for _, ctxKey := range l.ctxKeys {
f[ctxKey] = ctx.Value(ctxKey)
}
Expand Down
25 changes: 14 additions & 11 deletions clog/clog_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
package clog
package clog_test

import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/require"
"log/slog"
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/gateway-fm/scriptorium/clog"
)

const msgKey = "msg"

func TestCustomLogger(t *testing.T) {
var buf bytes.Buffer

logger := NewCustomLogger(&buf, slog.LevelDebug, true)
logger := clog.NewCustomLogger(&buf, slog.LevelDebug, true)

ctx := context.Background()
ctx = logger.AddKeysValuesToCtx(ctx, map[string]interface{}{"user": "testUser"})
Expand All @@ -30,29 +33,29 @@ func TestCustomLogger(t *testing.T) {
{
name: "ErrorfCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.ErrorfCtx(ctx, fmt.Errorf("test error"), msg, args...)
logger.ErrorCtx(ctx, fmt.Errorf("test error"), msg, args...)
},
expected: map[string]interface{}{"level": "ERROR", "user": "testUser", "error": "test error", msgKey: "an error occurred"},
errorInput: fmt.Errorf("test error"),
},
{
name: "InfofCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.InfofCtx(ctx, msg, args...)
logger.InfoCtx(ctx, msg, args...)
},
expected: map[string]interface{}{"level": "INFO", "user": "testUser", msgKey: "informational message"},
},
{
name: "DebugfCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.DebugfCtx(ctx, msg, args...)
logger.DebugCtx(ctx, msg, args...)
},
expected: map[string]interface{}{"level": "DEBUG", "user": "testUser", msgKey: "debugging message"},
},
{
name: "WarnfCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.WarnfCtx(ctx, msg, args...)
logger.WarnCtx(ctx, msg, args...)
},
expected: map[string]interface{}{"level": "WARN", "user": "testUser", msgKey: "warning message"},
},
Expand Down Expand Up @@ -80,7 +83,7 @@ func TestCustomLogger(t *testing.T) {
func TestCustomLogger_Level(t *testing.T) {
var buf bytes.Buffer

logger := NewCustomLogger(&buf, slog.LevelInfo, true)
logger := clog.NewCustomLogger(&buf, slog.LevelInfo, true)

ctx := context.Background()
ctx = logger.AddKeysValuesToCtx(ctx, map[string]interface{}{"user": "testUser"})
Expand All @@ -94,7 +97,7 @@ func TestCustomLogger_Level(t *testing.T) {
{
name: "DebugfCtx",
logFunc: func(ctx context.Context, msg string, args ...any) {
logger.DebugfCtx(ctx, msg, args...)
logger.DebugCtx(ctx, msg, args...)
},
expected: map[string]interface{}{"level": "DEBUG", "user": "testUser", msgKey: "debugging message"},
},
Expand All @@ -112,7 +115,7 @@ func TestCustomLogger_Level(t *testing.T) {
}

func TestConvertToAttrsConcurrentAccess(t *testing.T) {
testFields := fields{
testFields := clog.Fields{
"user": "testUser",
"session": "xyz123",
"role": "admin",
Expand All @@ -126,7 +129,7 @@ func TestConvertToAttrsConcurrentAccess(t *testing.T) {
for i := 0; i < repeat; i++ {
go func() {
defer wg.Done()
_ = convertToAttrs(testFields)
_ = clog.ConvertToAttrs(testFields)
}()
}

Expand Down
8 changes: 6 additions & 2 deletions queue/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import (
"context"
"time"

"github.com/gateway-fm/scriptorium/transactions"

"github.com/gateway-fm/scriptorium/clog"
)

// EventBus defines an interface for subscribing to topics, publishing events, and managing event processing.
type EventBus interface {
Subscribe(topic string, handler EventHandler, delays []int, durationType time.Duration)
Publish(topic string, data []byte)
StartProcessing(ctx context.Context)
StartProcessing(ctx context.Context) error
Stop()
ReachedMaxRetries(event Event) bool
ExceededMaxRetries(event *Event) bool
SetLogger(log *clog.CustomLogger)
AddEventToCtx(ctx context.Context, event *Event) context.Context
WithOutbox(factory transactions.TransactionFactory)
}
45 changes: 45 additions & 0 deletions queue/outbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package queue

import "context"

// loadEventsFromOutbox loads events from the outbox table into the in-memory queue.
func (bus *eventBus) loadEventsFromOutbox(ctx context.Context) error {
if bus.outbox == nil {
return nil
}

events, err := bus.outbox.LoadPendingEvents(ctx)
if err != nil {
bus.log.ErrorCtx(ctx, err, "Failed to load events from outbox")
return err
}

for _, outboxEvent := range events {
bus.queue <- convertOutboxEventToEvent(outboxEvent)
}

return nil
}

// updateEventStatus updates the status and retry count of an event in the outbox table.
func (bus *eventBus) updateEventStatus(ctx context.Context, event *Event) {
outboxEvent := convertEventToOutboxEvent(event)

if err := bus.outbox.UpdateEventStatus(ctx, outboxEvent); err != nil {
bus.log.ErrorCtx(ctx, err, "Failed to update event status in outbox")
}
}

// markEventAsProcessed marks an event as processed in the outbox table.
func (bus *eventBus) markEventAsProcessed(ctx context.Context, eventID int) {
if err := bus.outbox.MarkEventAsProcessed(ctx, eventID); err != nil {
bus.log.ErrorCtx(ctx, err, "Failed to mark event as processed in outbox")
}
}

// markEventAsFailed marks an event as failed in the outbox table.
func (bus *eventBus) markEventAsFailed(ctx context.Context, eventID int) {
if err := bus.outbox.MarkEventAsFailed(ctx, eventID); err != nil {
bus.log.ErrorCtx(ctx, err, "Failed to mark event as failed in outbox")
}
}
96 changes: 96 additions & 0 deletions queue/outbox_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package queue

import (
"context"
"fmt"

"github.com/gateway-fm/scriptorium/transactions"
)

// OutboxEvent represents an event stored in the outbox table.
type OutboxEvent struct {
ID int `pg:",pk"` // Primary key
Data []byte `pg:"data"` // Data column
Topic string `pg:"topic"` // Topic column
Retry int `pg:"retry"` // Retry column
NextRetry uint `pg:"next_retry"` // NextRetry column as minutes
AckStatus AckStatus `pg:"ack_status"` // AckStatus column
CreatedAt int64 `pg:"created_at"` // CreatedAt column as Unix timestamp
UpdatedAt int64 `pg:"updated_at"` // UpdatedAt column as Unix timestamp
}

// OutboxRepository provides methods to interact with the outbox table.
type OutboxRepository struct {
transactionFactory transactions.TransactionFactory
}

// NewOutboxRepository creates a new OutboxRepository.
func NewOutboxRepository(transactionFactory transactions.TransactionFactory) *OutboxRepository {
return &OutboxRepository{transactionFactory: transactionFactory}
}

// InsertEvent inserts a new event into the outbox table or updates it if it already exists.
func (r *OutboxRepository) InsertEvent(ctx context.Context, event *OutboxEvent) error {
_, err := r.transactionFactory.Transaction(ctx).
Model(event).
OnConflict("(data, topic) DO UPDATE").
Set("retry = EXCLUDED.retry, next_retry = EXCLUDED.next_retry, ack_status = EXCLUDED.ack_status, updated_at = EXCLUDED.updated_at").
Returning("*").
Insert()
if err != nil {
return fmt.Errorf("insert event into outbox: %w", err)
}
return nil
}

// LoadPendingEvents loads all pending events from the outbox table.
func (r *OutboxRepository) LoadPendingEvents(ctx context.Context) ([]*OutboxEvent, error) {
var events []*OutboxEvent
query := r.transactionFactory.Transaction(ctx).
Model(&events).
Where("ack_status = ?", NACK)

if err := query.Select(); err != nil {
return nil, fmt.Errorf("load pending events from outbox: %w", err)
}
return events, nil
}

// UpdateEventStatus updates the status and retry count of an event in the outbox table.
func (r *OutboxRepository) UpdateEventStatus(ctx context.Context, event *OutboxEvent) error {
_, err := r.transactionFactory.Transaction(ctx).
Model(event).
Column("retry", "next_retry", "ack_status").
Where("id = ?", event.ID).
Update()
if err != nil {
return fmt.Errorf("update event status in outbox: %w", err)
}
return nil
}

// MarkEventAsProcessed marks an event as processed in the outbox table.
func (r *OutboxRepository) MarkEventAsProcessed(ctx context.Context, eventID int) error {
_, err := r.transactionFactory.Transaction(ctx).
Model(&OutboxEvent{}).
Set("ack_status = ?", ACK).
Where("id = ?", eventID).
Update()
if err != nil {
return fmt.Errorf("mark event as processed in outbox: %w", err)
}
return nil
}

// MarkEventAsFailed marks an event as failed in the outbox table.
func (r *OutboxRepository) MarkEventAsFailed(ctx context.Context, eventID int) error {
_, err := r.transactionFactory.Transaction(ctx).
Model(&OutboxEvent{}).
Set("ack_status = ?", NACK).
Where("id = ?", eventID).
Update()
if err != nil {
return fmt.Errorf("mark event as failed in outbox: %w", err)
}
return nil
}
Loading

0 comments on commit 995c552

Please sign in to comment.