Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] slog poc #57

Merged
merged 8 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package flowstate

type Command interface {
setSessID(id int64)
SessID() int64
cmd()
}

type command struct {
sessID int64
}

func (_ *command) cmd() {}

func (cmd *command) setSessID(doID int64) {
cmd.sessID = doID
}

func (cmd *command) SessID() int64 {
return cmd.sessID
}
7 changes: 7 additions & 0 deletions cmd_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ type CommitCommand struct {
Commands []Command
}

func (cmd *CommitCommand) setSessID(id int64) {
cmd.command.setSessID(id)
for _, subCmd := range cmd.Commands {
subCmd.setSessID(id)
}
}

func CommitStateCtx(stateCtx *StateCtx) *CommitStateCtxCommand {
return &CommitStateCtxCommand{
StateCtx: stateCtx,
Expand Down
49 changes: 44 additions & 5 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"sync"
"sync/atomic"
)

var ErrFlowNotFound = errors.New("flow not found")
var sessIDS = &atomic.Int64{}

type Engine struct {
d Doer
l *slog.Logger

wg *sync.WaitGroup
doneCh chan struct{}
}

func NewEngine(d Doer) (*Engine, error) {
func NewEngine(d Doer, l *slog.Logger) (*Engine, error) {
e := &Engine{
d: d,
l: l,

wg: &sync.WaitGroup{},
doneCh: make(chan struct{}),
Expand All @@ -43,6 +47,8 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
defer e.wg.Done()
}

sessID := sessIDS.Add(1)
stateCtx.sessID = sessID
stateCtx.e = e

if stateCtx.Current.ID == `` {
Expand All @@ -65,19 +71,27 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
return err
}

logExecute(stateCtx, e.l)
cmd0, err := f.Execute(stateCtx, e)
if err != nil {
return err
}

cmd0.setSessID(sessID)

if cmd, ok := cmd0.(*ExecuteCommand); ok {
cmd.sync = true
}

conflictErr := &ErrCommitConflict{}

if err = e.do(cmd0); errors.As(err, conflictErr) {
log.Printf("INFO: engine: execute: %s\n", conflictErr)
e.l.Info("engine: do conflict",
"sess", cmd0.SessID(),
"conflict", err.Error(),
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return nil
} else if err != nil {
return err
Expand All @@ -99,7 +113,22 @@ func (e *Engine) Do(cmds ...Command) error {
return fmt.Errorf("no commands to do")
}

var sessID int64
for _, cmd := range cmds {
if cmd.SessID() == 0 {
if sessID == 0 {
sessID = sessIDS.Add(1)
}

cmd.setSessID(sessID)

if cmtCmd, ok := cmd.(*CommitCommand); ok {
for _, subCmd := range cmtCmd.Commands {
subCmd.setSessID(sessID)
}
}
}

if err := e.do(cmd); err != nil {
return err
}
Expand Down Expand Up @@ -136,18 +165,28 @@ func (e *Engine) Shutdown(ctx context.Context) error {
}

func (e *Engine) do(cmd0 Command) error {
logDo(cmd0, e.l)

switch cmd := cmd0.(type) {
case *ExecuteCommand:
if cmd.sync {
return nil
}

go func() {
if err := e.Execute(cmd.StateCtx); err != nil {
log.Printf("ERROR: engine: go execute: %s\n", err)
stateCtx := cmd.StateCtx
if err := e.Execute(stateCtx); err != nil {
e.l.Error("execute failed",
"sess", stateCtx.SessID(),
"error", err,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
}
}()
return nil
case *CommitCommand:
return e.d.Do(cmd0)
default:
return e.d.Do(cmd0)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/jackc/pgx/v5 v5.6.0
github.com/mattn/go-sqlite3 v1.14.22
github.com/stretchr/testify v1.9.0
github.com/thejerf/slogassert v0.3.4
github.com/xo/dburl v0.23.2
go.uber.org/goleak v1.3.0
golang.org/x/time v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/thejerf/slogassert v0.3.4 h1:VoTsXixRbXMrRSSxDjYTiEDCM4VWbsYPW5rB/hX24kM=
github.com/thejerf/slogassert v0.3.4/go.mod h1:0zn9ISLVKo1aPMTqcGfG1o6dWwt+Rk574GlUxHD4rs8=
github.com/xo/dburl v0.23.2 h1:Fl88cvayrgE56JA/sqhNMLljCW/b7RmG1mMkKMZUFgA=
github.com/xo/dburl v0.23.2/go.mod h1:uazlaAQxj4gkshhfuuYyvwCBouOmNnG2aDxTCFZpmL4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
94 changes: 94 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package flowstate

import (
"fmt"
"log/slog"
)

func logExecute(stateCtx *StateCtx, l *slog.Logger) {
args := []any{
"sess", stateCtx.sessID,
"flow", stateCtx.Current.Transition.ToID,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
}

currTs := stateCtx.Current.Transition

if currTs.Annotations[StateAnnotation] != `` {
args = append(args, "state", currTs.Annotations[StateAnnotation])
}

if currTs.Annotations[DelayDurationAnnotation] != `` {
args = append(args, "delayed", "true")
}

if currTs.Annotations[RecoveryAttemptAnnotation] != `` {
args = append(args, "recovered", currTs.Annotations[RecoveryAttemptAnnotation])
}

l.Info("engine: execute", args...)
}

func logDo(cmd0 Command, l *slog.Logger) {
args := []any{"sess", cmd0.SessID()}

switch cmd := cmd0.(type) {
case *CommitCommand:
args = append(args, "cmd", "commit", "len", len(cmd.Commands))
case *CommitStateCtxCommand:
args = append(args, "cmd", "commit_state_ctx", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *TransitCommand:
args = append(args, "cmd", "transit", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev, "to", cmd.FlowID)
case *PauseCommand:
args = append(args, "cmd", "pause", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
if cmd.FlowID != `` {
args = append(args, "to", cmd.FlowID)
}
case *ResumeCommand:
args = append(args, "cmd", "resume", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *EndCommand:
args = append(args, "cmd", "end", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *DelayCommand:
args = append(args, "cmd", "delay", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev, "dur", cmd.Duration)
case *ExecuteCommand:
args = append(args, "cmd", "execute", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *NoopCommand:
args = append(args, "cmd", "noop", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *ReferenceDataCommand:
args = append(args,
"cmd", "reference_data",
"id", cmd.StateCtx.Current.ID,
"rev", cmd.StateCtx.Current.Rev,
"data_id", cmd.Data.ID,
"data_rev", cmd.Data.Rev,
"annot", cmd.Annotation,
)
case *DereferenceDataCommand:
args = append(args,
"cmd", "dereference_data",
"id", cmd.StateCtx.Current.ID,
"rev", cmd.StateCtx.Current.Rev,
"data_id", cmd.Data.ID,
"data_rev", cmd.Data.Rev,
"annot", cmd.Annotation,
)
case *StoreDataCommand:
args = append(args, "cmd", "store_data", "data_id", cmd.Data.ID, "data_rev", cmd.Data.Rev)
case *GetDataCommand:
args = append(args, "cmd", "get_data", "data_id", cmd.Data.ID, "data_rev", cmd.Data.Rev)
case *DeserializeCommand:
args = append(args, "cmd", "deserialize")
case *SerializeCommand:
args = append(args, "cmd", "serialize")
case *GetFlowCommand:
args = append(args, "cmd", "get_flow", "flow_id", cmd.StateCtx.Current.Transition.ToID)
case *WatchCommand:
args = append(args, "cmd", "watch")
default:
args = append(args, "cmd", fmt.Sprintf("%T", cmd))
}

l.Info("engine: do", args...)

}
38 changes: 30 additions & 8 deletions memdriver/delayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"sync"
"time"

Expand All @@ -17,11 +17,13 @@ type Delayer struct {
e *flowstate.Engine
stopCh chan struct{}
wg sync.WaitGroup
l *slog.Logger
}

func NewDelayer() *Delayer {
func NewDelayer(l *slog.Logger) *Delayer {
return &Delayer{
stopCh: make(chan struct{}),
l: l,
}
}

Expand All @@ -43,26 +45,46 @@ func (d *Delayer) Do(cmd0 flowstate.Command) error {
t := time.NewTimer(cmd.Duration)
defer t.Stop()

stateCtx := cmd.DelayStateCtx

select {
case <-t.C:
if cmd.DelayStateCtx.Current.Transition.Annotations[flowstate.DelayCommitAnnotation] == `true` {
if stateCtx.Current.Transition.Annotations[flowstate.DelayCommitAnnotation] == `true` {
conflictErr := &flowstate.ErrCommitConflict{}
if err := d.e.Do(flowstate.Commit(
flowstate.CommitStateCtx(cmd.DelayStateCtx),
)); errors.As(err, conflictErr) {
log.Printf(`ERROR: memdriver: delayer: engine: commit: %s\n`, conflictErr)
d.l.Info("delayer: commit conflict",
"sess", cmd.SessID(),
"conflict", err.Error(),
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return
} else if err != nil {
log.Printf(`ERROR: memdriver: delayer: engine: commit: %s`, err)
d.l.Error("delayer: commit failed",
"sess", cmd.SessID(),
"error", err,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return
}
}

if err := d.e.Execute(cmd.DelayStateCtx); err != nil {
log.Printf(`ERROR: memdriver: delayer: engine: execute: %s`, err)
if err := d.e.Execute(stateCtx); err != nil {
d.l.Error("delayer: execute failed",
"sess", stateCtx.SessID(),
"error", err,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
}
case <-d.stopCh:
log.Printf(`ERROR: memdriver: delayer: state delay was terminated`)
d.l.Error("delayer: delaying terminated",
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return
}
}()
Expand Down
Loading
Loading