Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(nav): make resume strategies use of work pool consistent (#298) #300

Merged
merged 2 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"gocyclo",
"godirwalk",
"gofmt",
"Goid",
"goimports",
"goleak",
"gomnd",
Expand Down Expand Up @@ -68,6 +69,7 @@
"unparam",
"usao",
"varcheck",
"wgex",
"xname",
"xpander",
"Zemlya",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
require (
github.com/fortytw2/leaktest v1.3.0
github.com/google/uuid v1.3.0
github.com/snivilised/lorax v0.2.4
github.com/snivilised/lorax v0.2.5
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/snivilised/lorax v0.2.4 h1:bTjZmN3MBBB4SMNseJJRBjCT2oIKOn4hL9e2HymLBH0=
github.com/snivilised/lorax v0.2.4/go.mod h1:Oa1QbZH3E5zfU5y5lmfKEcwPvuv3cHjNqk1/XhszucE=
github.com/snivilised/lorax v0.2.5 h1:0WOjjHfj08Ou85FY5V/p2QHUL4U9YZebWz2qhKjYwsQ=
github.com/snivilised/lorax v0.2.5/go.mod h1:Oa1QbZH3E5zfU5y5lmfKEcwPvuv3cHjNqk1/XhszucE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
Expand Down
8 changes: 5 additions & 3 deletions xfs/nav/navigation-accelerator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package nav

import "github.com/snivilised/lorax/async"
import (
"github.com/snivilised/lorax/async"
)

type navigationAccelerator struct {
noWorkers int
Expand All @@ -13,7 +15,7 @@ func (a *navigationAccelerator) start(ai *AsyncInfo) {
NoWorkers: a.noWorkers,
Exec: traverseExecutive,
JobsCh: ai.JobsChanOut,
Quit: ai.Wg,
Quitter: ai.Wgex,
})

// We are handing over ownership of this channel (ai.OutputsChIn) to the pool as
Expand All @@ -22,7 +24,7 @@ func (a *navigationAccelerator) start(ai *AsyncInfo) {
//
go a.pool.Start(ai.Ctx, ai.OutputsChOut)

ai.Wg.Add(1)
ai.Adder.Add(1, a.pool.RoutineName)
}

func traverseExecutive(job async.Job[TraverseItemInput]) (async.JobOutput[TraverseOutput], error) {
Expand Down
13 changes: 7 additions & 6 deletions xfs/nav/navigation-async-defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nav

import (
"context"
"sync"

"github.com/snivilised/lorax/async"
)
Expand All @@ -27,9 +26,11 @@ type TraverseItemStreamR async.JobStreamR[TraverseItemJobOutput]
type TraverseItemStreamW async.JobStreamW[TraverseItemJobOutput]

type AsyncInfo struct {
Ctx context.Context
Wg *sync.WaitGroup
JobsChanOut TraverseItemJobStream
OutputsChOut async.OutputStreamW[TraverseOutput] // consume this???
NoWorkers int
Ctx context.Context
NavigatorRoutineName async.GoRoutineName
Wgex async.WaitGroupEx
Adder async.AssistedAdder
Quitter async.AssistedQuitter
JobsChanOut TraverseItemJobStream
OutputsChOut async.OutputStreamW[TraverseOutput] // consume this???
}
15 changes: 12 additions & 3 deletions xfs/nav/navigation-session.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ func (s *session) start() {
func (s *session) finish(_ *TraverseResult, _ error, ai ...*AsyncInfo) {
defer func() {
if len(ai) > 0 {
fmt.Printf("---> 😈😈😈 defer session.finish\n")
close(ai[0].JobsChanOut)
fmt.Printf("---> observable navigator 😈😈😈 defer session.finish (CLOSE(JobsChanOut)/QUIT)\n")
close(ai[0].JobsChanOut) // ⚠️ fastward: intermittent panic on close
ai[0].Quitter.Done(ai[0].NavigatorRoutineName)
}
}()

Expand Down Expand Up @@ -111,7 +112,11 @@ func (s *PrimarySession) Run(ai ...*AsyncInfo) (result *TraverseResult, err erro

s.session.start()

return s.navigator.walk(s.Path, ai...)
if len(ai) > 0 {
s.navigator.ensync(ai[0])
}

return s.navigator.walk(s.Path)
}

func (s *PrimarySession) StartedAt() time.Time {
Expand Down Expand Up @@ -191,6 +196,10 @@ func (s *ResumeSession) Run(ai ...*AsyncInfo) (result *TraverseResult, err error

s.session.start()

if len(ai) > 0 {
s.rc.navigator.ensync(ai[0])
}

return s.rc.Continue(ai...)
}

Expand Down
17 changes: 16 additions & 1 deletion xfs/nav/navigator-abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nav
import (
"fmt"
"io/fs"
"strings"

"github.com/google/uuid"
"github.com/snivilised/extendio/internal/log"
Expand All @@ -25,7 +26,16 @@ func (n *navigator) ensync(frame *navigationFrame, ai *AsyncInfo) {
decorator := &LabelledTraverseCallback{
Label: "async decorator",
Fn: func(item *TraverseItem) error {
fmt.Printf("---> 🐬 ASYNC-CALLBACK: '%v' \n", item.Path)
defer func() {
pe := recover()
if err, ok := pe.(error); !ok || !strings.Contains(err.Error(),
"send on closed channel") {
fmt.Printf("---> ☠️☠️☠️ ENSYNC-NAV-CALLBACK(panic on close): '%v' (err:'%v')\n",
item.Path, pe,
)
}
}()
fmt.Printf("---> 🐬 ENSYNC-NAV-CALLBACK: '%v' \n", item.Path)

var err error
select {
Expand All @@ -46,6 +56,11 @@ func (n *navigator) ensync(frame *navigationFrame, ai *AsyncInfo) {
err = fs.SkipDir

case ai.JobsChanOut <- async.Job[TraverseItemInput](j):
//
// intermittent panic: send on closed channel, in fastward resume scenarios
// 'gr:observable-navigator'

fmt.Printf("-->> 🍆🍆 sending job(%v)\n", j.ID)
}
}

Expand Down
10 changes: 5 additions & 5 deletions xfs/nav/navigator-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func (c *navigatorController) logger() log.Logger {
return c.impl.logger()
}

func (c *navigatorController) walk(root string, ai ...*AsyncInfo) (*TraverseResult, error) {
func (c *navigatorController) ensync(ai *AsyncInfo) {
c.impl.ensync(c.frame, ai)
}

func (c *navigatorController) walk(root string) (*TraverseResult, error) {
c.frame.root.Set(root)
c.impl.logger().Info("walk", log.String("root", root))

if len(ai) > 0 {
c.impl.ensync(c.frame, ai[0])
}

c.frame.notifiers.begin.invoke(c.ns)

result, err := c.impl.top(c.frame, root)
Expand Down
4 changes: 0 additions & 4 deletions xfs/nav/resume-strategy-fastward.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ func (s *fastwardStrategy) resume(info *strategyResumeInfo) (*TraverseResult, er
log.String("resume-at-path", resumeAt),
)

if info.ai != nil {
s.ensync(info.ai)
}

// fast-forward doesn't need to restore the entire state, eg, the
// Depth can begin as per usual, without being restored.
//
Expand Down
4 changes: 0 additions & 4 deletions xfs/nav/resume-strategy-spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ func (s *spawnStrategy) resume(info *strategyResumeInfo) (*TraverseResult, error
log.String("resume-at-path", resumeAt),
)

if info.ai != nil {
s.ensync(info.ai)
}

return s.conclude(&concludeInfo{
active: info.ps.Active,
root: info.ps.Active.Root,
Expand Down
3 changes: 2 additions & 1 deletion xfs/nav/traverse-defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func (r *TraverseResult) merge(other *TraverseResult) (*TraverseResult, error) {

// TraverseNavigator interface to the main traverse instance.
type TraverseNavigator interface {
walk(_ string, _ ...*AsyncInfo) (*TraverseResult, error)
ensync(_ *AsyncInfo)
walk(_ string) (*TraverseResult, error)
save(_ string) error
finish() error
}
Expand Down
35 changes: 22 additions & 13 deletions xfs/nav/traverse-navigator-async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nav_test

import (
"fmt"
"sync"
"time"

"github.com/fortytw2/leaktest"
Expand Down Expand Up @@ -35,7 +34,7 @@ const (
// we use a large job queue size to prevent blocking as these unit
// tests don't have a consumer
JobsChSize = 50
OutputsChSize = 20
OutputsChSize = 50
)

// TODO: rename this file to navigation-async_test.go
Expand All @@ -45,7 +44,7 @@ var _ = Describe("navigation", Ordered, func() {
jroot string
fromJSONPath string
jobsChOut nav.TraverseItemJobStream
outputsChIn async.OutputStreamW[nav.TraverseOutput]
outputsChOut async.OutputStreamW[nav.TraverseOutput]
)

BeforeAll(func() {
Expand All @@ -61,20 +60,22 @@ var _ = Describe("navigation", Ordered, func() {
Fail(err.Error())
}
jobsChOut = make(nav.TraverseItemJobStream, JobsChSize)
outputsChIn = make(async.OutputStreamW[nav.TraverseOutput], OutputsChSize)
outputsChOut = make(async.OutputStreamW[nav.TraverseOutput], OutputsChSize)
})

DescribeTable("async",
func(ctx SpecContext, entry *asyncTE) {
defer leaktest.Check(GinkgoT())()

var wg sync.WaitGroup
var (
wgex async.WaitGroupEx
)

path := helpers.Path(root, "RETRO-WAVE")
optionFn := func(o *nav.TraverseOptions) {
o.Store.Subscription = nav.SubscribeFolders
o.Store.DoExtend = true
o.Callback = asyncCallback("WithCPUPool/primary session")
o.Callback = asyncCallback("async primary session")
o.Notify.OnBegin = begin("🛡️")
}

Expand Down Expand Up @@ -113,14 +114,21 @@ var _ = Describe("navigation", Ordered, func() {
entry.operator(runner)
}

navigatorRoutineName := async.GoRoutineName("✨ observable-navigator")
wgex = async.NewAnnotatedWaitGroup("🍂 traversal")
wgex.Add(1, navigatorRoutineName)
_, err := runner.Run(&nav.AsyncInfo{
Ctx: ctx,
Wg: &wg,
JobsChanOut: jobsChOut,
OutputsChOut: outputsChIn,
Ctx: ctx,
NavigatorRoutineName: navigatorRoutineName,
Wgex: wgex,
Adder: wgex,
Quitter: wgex,
JobsChanOut: jobsChOut,
OutputsChOut: outputsChOut,
})

wg.Wait()
wgex.Wait("👾 test-main")

Expect(err).To(BeNil())
},
func(entry *asyncTE) string {
Expand All @@ -143,8 +151,9 @@ var _ = Describe("navigation", Ordered, func() {
},
}, SpecTimeout(time.Second*2)),

XEntry(nil, &asyncTE{
// 🔥 panic: send on closed channel
Entry(nil, &asyncTE{
// 🔥 panic: send on closed channel; this is intermittent
// probably a race condition
//
resume: &asyncResumeTE{
Strategy: nav.ResumeStrategyFastwardEn,
Expand Down