Skip to content

Commit

Permalink
Move logs to slog (#6)
Browse files Browse the repository at this point in the history
* Move logs to slog

While less performant, its in the stdlib which is probably more suitable
for an SDK. Users can register a slog handler to intercept logs

* Warn when client goes away
  • Loading branch information
jackkleeman authored Jul 15, 2024
1 parent df3a918 commit 96a7bd9
Show file tree
Hide file tree
Showing 15 changed files with 312 additions and 166 deletions.
12 changes: 5 additions & 7 deletions example/checkout.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package main

import (
"context"
"fmt"
"math/rand"

"github.com/google/uuid"
restate "github.com/restatedev/sdk-go"
"github.com/rs/zerolog/log"
)

type PaymentRequest struct {
Expand All @@ -29,7 +27,7 @@ func (c *checkout) Name() string {
const CheckoutServiceName = "Checkout"

func (c *checkout) Payment(ctx restate.Context, request PaymentRequest) (response PaymentResponse, err error) {
uuid, err := restate.RunAs(ctx, func(ctx context.Context) (string, error) {
uuid, err := restate.RunAs(ctx, func(ctx restate.RunContext) (string, error) {
uuid := uuid.New()
return uuid.String(), nil
})
Expand All @@ -45,13 +43,13 @@ func (c *checkout) Payment(ctx restate.Context, request PaymentRequest) (respons
price := len(request.Tickets) * 30

response.Price = price
_, err = restate.RunAs(ctx, func(ctx context.Context) (bool, error) {
log := log.With().Str("uuid", uuid).Int("price", price).Logger()
_, err = restate.RunAs(ctx, func(ctx restate.RunContext) (bool, error) {
log := ctx.Log().With("uuid", uuid, "price", price)
if rand.Float64() < 0.5 {
log.Info().Msg("payment succeeded")
log.Info("payment succeeded")
return true, nil
} else {
log.Error().Msg("payment failed")
log.Error("payment failed")
return false, fmt.Errorf("failed to pay")
}
})
Expand Down
8 changes: 2 additions & 6 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,22 @@ package main

import (
"context"
"log/slog"
"os"

restate "github.com/restatedev/sdk-go"
"github.com/restatedev/sdk-go/server"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

func main() {

log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
zerolog.SetGlobalLevel(zerolog.InfoLevel)

server := server.NewRestate().
Bind(restate.Object(&userSession{})).
Bind(restate.Object(&ticketService{})).
Bind(restate.Service(&checkout{}))

if err := server.Start(context.Background(), ":9080"); err != nil {
log.Error().Err(err).Msg("application exited unexpectedly")
slog.Error("application exited unexpectedly", "err", err.Error())
os.Exit(1)
}
}
5 changes: 2 additions & 3 deletions example/ticket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"

restate "github.com/restatedev/sdk-go"
"github.com/rs/zerolog/log"
)

type TicketStatus int
Expand Down Expand Up @@ -36,7 +35,7 @@ func (t *ticketService) Reserve(ctx restate.ObjectContext, _ restate.Void) (bool

func (t *ticketService) Unreserve(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) {
ticketId := ctx.Key()
log.Info().Str("ticket", ticketId).Msg("un-reserving ticket")
ctx.Log().Info("un-reserving ticket", "ticket", ticketId)
status, err := restate.GetAs[TicketStatus](ctx, "status")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return void, err
Expand All @@ -52,7 +51,7 @@ func (t *ticketService) Unreserve(ctx restate.ObjectContext, _ restate.Void) (vo

func (t *ticketService) MarkAsSold(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) {
ticketId := ctx.Key()
log.Info().Str("ticket", ticketId).Msg("mark ticket as sold")
ctx.Log().Info("mark ticket as sold", "ticket", ticketId)

status, err := restate.GetAs[TicketStatus](ctx, "status")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
Expand Down
5 changes: 2 additions & 3 deletions example/user_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

restate "github.com/restatedev/sdk-go"
"github.com/rs/zerolog/log"
)

const UserSessionServiceName = "UserSession"
Expand Down Expand Up @@ -81,7 +80,7 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool,
return false, err
}

log.Info().Strs("tickets", tickets).Msg("tickets in basket")
ctx.Log().Info("tickets in basket", "tickets", tickets)

if len(tickets) == 0 {
return false, nil
Expand All @@ -95,7 +94,7 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool,
return false, err
}

log.Info().Str("id", response.ID).Int("price", response.Price).Msg("payment details")
ctx.Log().Info("payment details", "id", response.ID, "price", response.Price)

for _, ticket := range tickets {
call := ctx.ObjectSend(TicketServiceName, ticket, 0).Method("MarkAsSold")
Expand Down
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.22.0
require (
github.com/google/uuid v1.6.0
github.com/posener/h2conn v0.0.0-20231204025407-3997deeca0f0
github.com/rs/zerolog v1.32.0
github.com/stretchr/testify v1.9.0
github.com/vmihailenco/msgpack/v5 v5.4.1
golang.org/x/net v0.21.0
Expand All @@ -14,11 +13,8 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
16 changes: 0 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,24 +1,13 @@
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/h2conn v0.0.0-20231204025407-3997deeca0f0 h1:zZg03nifrj6ayWNaDO8tNj57tqrOIKDwiBaLkhtK7Kk=
github.com/posener/h2conn v0.0.0-20231204025407-3997deeca0f0/go.mod h1:bblJa8QcHntareAJYfLJUzLj42sUFBKCBeTDK5LyUrw=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
Expand All @@ -27,11 +16,6 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
82 changes: 82 additions & 0 deletions internal/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package log

import (
"context"
"fmt"
"log/slog"
"reflect"
"sync/atomic"

"github.com/restatedev/sdk-go/rcontext"
)

const (
LevelTrace slog.Level = -8
)

type typeValue struct{ inner any }

func (t typeValue) LogValue() slog.Value {
return slog.StringValue(reflect.TypeOf(t.inner).String())
}

func Type(key string, value any) slog.Attr {
return slog.Any(key, typeValue{value})
}

type stringerValue[T fmt.Stringer] struct{ inner T }

func (t stringerValue[T]) LogValue() slog.Value {
return slog.StringValue(t.inner.String())
}

func Stringer[T fmt.Stringer](key string, value T) slog.Attr {
return slog.Any(key, slog.AnyValue(stringerValue[T]{value}))
}

func Error(err error) slog.Attr {
return slog.String("err", err.Error())
}

type contextInjectingHandler struct {
logContext *atomic.Pointer[rcontext.LogContext]
dropReplay bool
inner slog.Handler
}

func NewUserContextHandler(logContext *atomic.Pointer[rcontext.LogContext], dropReplay bool, inner slog.Handler) slog.Handler {
return &contextInjectingHandler{logContext, dropReplay, inner}
}

func NewRestateContextHandler(inner slog.Handler) slog.Handler {
logContext := atomic.Pointer[rcontext.LogContext]{}
logContext.Store(&rcontext.LogContext{Source: rcontext.LogSourceRestate, IsReplaying: false})
return &contextInjectingHandler{&logContext, false, inner}
}

func (d *contextInjectingHandler) Enabled(ctx context.Context, l slog.Level) bool {
lc := d.logContext.Load()
if d.dropReplay && lc.IsReplaying {
return false
}
return d.inner.Enabled(rcontext.WithLogContext(ctx, lc), l)
}

func (d *contextInjectingHandler) Handle(ctx context.Context, record slog.Record) error {
return d.inner.Handle(rcontext.WithLogContext(ctx, d.logContext.Load()), record)
}

func (d *contextInjectingHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return &contextInjectingHandler{d.logContext, d.dropReplay, d.inner.WithAttrs(attrs)}
}

func (d *contextInjectingHandler) WithGroup(name string) slog.Handler {
return &contextInjectingHandler{d.logContext, d.dropReplay, d.inner.WithGroup(name)}
}

var _ slog.Handler = &contextInjectingHandler{}

type dropReplayHandler struct {
isReplaying func() bool
inner slog.Handler
}
4 changes: 0 additions & 4 deletions internal/state/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ func (m *Machine) doDynCall(service, key, method string, input any) (*wire.CallE
}

func (m *Machine) doCall(service, key, method string, params []byte) (*wire.CallEntryMessage, uint32) {
m.log.Debug().Str("service", service).Str("method", method).Str("key", key).Msg("executing sync call")

entry, entryIndex := replayOrNew(
m,
func(entry *wire.CallEntryMessage) *wire.CallEntryMessage {
Expand Down Expand Up @@ -132,8 +130,6 @@ func (m *Machine) _doCall(service, key, method string, params []byte) *wire.Call
}

func (m *Machine) sendCall(service, key, method string, body any, delay time.Duration) error {
m.log.Debug().Str("service", service).Str("method", method).Str("key", key).Msg("executing async call")

params, err := json.Marshal(body)
if err != nil {
return err
Expand Down
31 changes: 18 additions & 13 deletions internal/state/completion.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package state

import (
"errors"
"io"
"log/slog"

"github.com/restatedev/sdk-go/internal/log"
"github.com/restatedev/sdk-go/internal/wire"
)

Expand Down Expand Up @@ -40,7 +40,9 @@ func (m *Machine) Write(message wire.Message) {
m.pendingAcks[m.entryIndex] = message
m.pendingMutex.Unlock()
}
if err := m.protocol.Write(message); err != nil {
typ := wire.MessageType(message)
m.log.LogAttrs(m.ctx, log.LevelTrace, "Sending message to runtime", log.Stringer("type", typ))
if err := m.protocol.Write(typ, message); err != nil {
panic(m.newWriteError(message, err))
}
}
Expand All @@ -59,33 +61,36 @@ func (m *Machine) newWriteError(entry wire.Message, err error) *writeError {

func (m *Machine) handleCompletionsAcks() {
for {
msg, err := m.protocol.Read()
msg, _, err := m.protocol.Read()
if err != nil {
if errors.Is(err, io.EOF) {
m.log.Trace().Err(err).Msg("request body closed; next blocking operation will suspend")
if m.ctx.Err() == nil {
m.log.LogAttrs(m.ctx, log.LevelTrace, "Request body closed; next blocking operation will suspend")
m.suspend(err)
}
m.suspend(err)
return
}
switch msg := msg.(type) {
case *wire.CompletionMessage:
completable := m.completable(msg.EntryIndex)
if completable == nil {
m.log.Error().Uint32("index", msg.EntryIndex).Msg("failed to find pending completion at index")
m.log.LogAttrs(m.ctx, slog.LevelError, "Failed to find pending completion at index", slog.Uint64("index", uint64(msg.EntryIndex)))
continue
}
completable.Complete(&msg.CompletionMessage)
m.log.Debug().Uint32("index", msg.EntryIndex).Msg("processed completion")
if err := completable.Complete(&msg.CompletionMessage); err != nil {
m.log.LogAttrs(m.ctx, slog.LevelError, "Failed to process completion", log.Error(err), slog.Uint64("index", uint64(msg.EntryIndex)))
} else {
m.log.LogAttrs(m.ctx, slog.LevelDebug, "Processed completion", slog.Uint64("index", uint64(msg.EntryIndex)))
}
case *wire.EntryAckMessage:
ackable := m.ackable(msg.EntryIndex)
if ackable == nil {
m.log.Error().Uint32("index", msg.EntryIndex).Msg("failed to find pending ack at index")
m.log.LogAttrs(m.ctx, slog.LevelError, "Failed to find pending ack at index", slog.Uint64("index", uint64(msg.EntryIndex)))
continue
}
ackable.Ack()
m.log.Debug().Uint32("index", msg.EntryIndex).Msg("processed ack")
m.log.LogAttrs(m.ctx, slog.LevelDebug, "Processed ack", slog.Uint64("index", uint64(msg.EntryIndex)))
default:
m.log.Error().Type("type", msg).Msg("unexpected non-completion non-ack message during invocation")
m.log.LogAttrs(m.ctx, slog.LevelError, "Unexpected non-completion non-ack message during invocation", log.Type("type", msg))
continue
}
}
Expand Down
Loading

0 comments on commit 96a7bd9

Please sign in to comment.