From 2f82164ff0b47a280887e2599d508fd6351dd51a Mon Sep 17 00:00:00 2001 From: plastikfan Date: Fri, 25 Aug 2023 20:26:24 +0100 Subject: [PATCH] ref(async): tidy up annotated wait group (#40) --- async/annotated-wait-group.go | 117 ++++++++++++++++-------------- async/worker-pool_test.go | 4 +- internal/helpers/test-producer.go | 4 +- 3 files changed, 67 insertions(+), 58 deletions(-) diff --git a/async/annotated-wait-group.go b/async/annotated-wait-group.go index d15ef35..58bd427 100644 --- a/async/annotated-wait-group.go +++ b/async/annotated-wait-group.go @@ -10,7 +10,7 @@ import ( ) type WaitGroupName string -type GoRoutineName = string +type GoRoutineName string type GoRoutineID string type namesCollection map[GoRoutineName]string @@ -27,6 +27,12 @@ func ComposeGoRoutineID(prefix ...string) GoRoutineID { ) } +// +// We need to use variadic parameter list in the methods because of go's lack of +// overloading methods; we need to support calls to wait group methods with or +// without the go routine name behind the same methods name, ie Add needs to be +// able to be invoked either way. + // WaitGroupEx the extended WaitGroup type WaitGroupEx interface { Add(delta int, name ...GoRoutineName) @@ -34,96 +40,100 @@ type WaitGroupEx interface { Wait(name ...GoRoutineName) } -// ===> Adder - +// AssistedAdder is the interface that is a restricted view of a wait group +// that only allows adding to the wait group with the addition of being +// able to specify the name representing the calling go routine. This interface +// can be acquired from the wait group using a standard interface type query. type AssistedAdder interface { Add(delta int, name ...GoRoutineName) } -// ===> Quitter - +// AssistedQuitter is the interface that is a restricted view of a wait group +// that only allows Done signalling on the wait group with the addition of being +// able to specify the name representing the calling go routine. This interface +// can be acquired from the wait group using a standard interface type query. type AssistedQuitter interface { Done(name ...GoRoutineName) } -// ===> Waiter - +// AssistedWaiter is the interface that is a restricted view of a wait group +// that only allows waiting on the wait group with the addition of being +// able to specify the name representing the calling go routine. This interface +// can be acquired from the wait group using a standard interface type query. type AssistedWaiter interface { Done(name ...GoRoutineName) } -// ===> Counter +// AssistedCounter is the interface that is a restricted view of a wait group +// that only allows querying the wait group count. This interface +// can be acquired from the wait group using a standard interface type query. type AssistedCounter interface { Count() int } -type WaitGroupAssister struct { - counter int32 - names namesCollection +type waitGroupAssister struct { + counter int32 + names namesCollection + waitGroupName string } -func (a *WaitGroupAssister) Add(delta int, name ...GoRoutineName) { +func (a *waitGroupAssister) Add(delta int, name ...GoRoutineName) { a.counter += int32(delta) if len(name) > 0 { a.names[name[0]] = "foo" - fmt.Printf(" πŸŸͺπŸŸͺπŸŸͺ [[ WaitGroupAssister.Add ]] - name: '%v' (delta: '%v', count: '%v') (running: '%v')\n", - name[0], delta, a.counter, a.running(), - ) + a.indicate("βž•βž•βž•", string(name[0]), "Add") } } -func (a *WaitGroupAssister) Done(name ...GoRoutineName) { +func (a *waitGroupAssister) Done(name ...GoRoutineName) { a.counter-- if len(name) > 0 { delete(a.names, name[0]) - fmt.Printf(" πŸ”·πŸ”·πŸ”· [[ WaitGroupAssister.Done ]] - name: '%v' (count: '%v') (running: '%v')\n", - name[0], a.counter, a.running(), - ) + a.indicate("🚩🚩🚩", string(name[0]), "Done") } } -func (a *WaitGroupAssister) Wait(name ...GoRoutineName) { +func (a *waitGroupAssister) Wait(name ...GoRoutineName) { if len(name) > 0 { - fmt.Printf(" 🟀🟀🟀 [[ WaitGroupAssister.Wait ]] - name: '%v' (count: '%v') (running: '%v')\n", - name[0], a.counter, a.running(), - ) + a.indicate("🧭🧭🧭", string(name[0]), "Wait") } } -func (a *WaitGroupAssister) running() string { - return strings.Join(lo.Keys(a.names), "/") +func (a *waitGroupAssister) indicate(highlight, name, op string) { + fmt.Printf( + " %v [[ WaitGroupAssister(%v).%v ]] - gr-name: '%v' (count: '%v') (running: '%v')\n", + highlight, a.waitGroupName, op, name, a.counter, a.running(), + ) } -// You start off with a core instance and from here you can query it to get the -// assistant interfaces ... The WaitGroupCore and the WaitGroupAssister both -// need to know about each other, but making them hold a reference to each other -// will cause a memory leak. To resolve this, there is a release mechanism -// which is automatically enacted after the Wait has become unblocked. The -// consequence of this characteristic is that once a Wait has completed, it -// should not be re-used. -// -// We need to use variadic parameter list in the methods because of go's lack of -// overloading methods; we need to support calls to wait group methods with or -// without the go routine name behind the same methods name, ie Add needs to be -// able to be invoked either way. -// type WaitGroupCore struct { -// wg sync.WaitGroup -// } +func (a *waitGroupAssister) running() string { + runners := lo.Map(lo.Keys(a.names), func(item GoRoutineName, _ int) string { + return string(item) + }) + + return strings.Join(runners, "/") +} +// AnnotatedWaitGroup is a wrapper around the standard WaitGroup that +// provides annotations to wait group operations that can assist in +// diagnosing concurrency issues. type AnnotatedWaitGroup struct { wg sync.WaitGroup - assistant WaitGroupAssister + assistant waitGroupAssister mux sync.Mutex } -func NewAnnotatedWaitGroup(_ string) *AnnotatedWaitGroup { +// NewAnnotatedWaitGroup creates a new AnnotatedWaitGroup instance containing +// the core WaitGroup instance. +func NewAnnotatedWaitGroup(name string) *AnnotatedWaitGroup { return &AnnotatedWaitGroup{ - assistant: WaitGroupAssister{ - names: make(namesCollection), + assistant: waitGroupAssister{ + waitGroupName: name, + names: make(namesCollection), }, } } @@ -132,6 +142,9 @@ func (d *AnnotatedWaitGroup) atomic(operation func()) { operation() } +// Add wraps the standard WaitGroup Add operation with the addition of +// being able to associate a go routine (identified by a client provided +// name) with the Add request. func (d *AnnotatedWaitGroup) Add(delta int, name ...GoRoutineName) { d.atomic(func() { d.assistant.Add(delta, name...) @@ -139,6 +152,9 @@ func (d *AnnotatedWaitGroup) Add(delta int, name ...GoRoutineName) { }) } +// Done wraps the standard WaitGroup Done operation with the addition of +// being able to associate a go routine (identified by a client provided +// name) with the Done request. func (d *AnnotatedWaitGroup) Done(name ...GoRoutineName) { d.atomic(func() { d.assistant.Done(name...) @@ -146,17 +162,10 @@ func (d *AnnotatedWaitGroup) Done(name ...GoRoutineName) { }) } +// Wait wraps the standard WaitGroup Wait operation with the addition of +// being able to associate a go routine (identified by a client provided +// name) with the Wait request. func (d *AnnotatedWaitGroup) Wait(name ...GoRoutineName) { - // We could make the wait an active operation, that includes - // creating a new go routine and a channel. The go routine - // will monitor the state of the wait group, every time - // either an Add or Done occurs, those send a message to the go - // routine via this channel. The go routing will simply keep - // reading the channel and displaying the current count. The - // active-ness is just a debugging tool, not intended to be - // used in production as it will be noisy by design either writing - // to the console or preferably writing to a log. - // d.atomic(func() { d.assistant.Wait(name...) d.wg.Wait() diff --git a/async/worker-pool_test.go b/async/worker-pool_test.go index 459ab2a..f856126 100644 --- a/async/worker-pool_test.go +++ b/async/worker-pool_test.go @@ -90,7 +90,7 @@ type pipeline[I, O any] struct { func start[I, O any]() *pipeline[I, O] { pipe := &pipeline[I, O]{ - wgex: async.NewAnnotatedWaitGroup("pipeline"), + wgex: async.NewAnnotatedWaitGroup("πŸ‚ pipeline"), outputsCh: make(chan async.JobOutput[O], OutputsChSize), stop: noOp, cancel: noOp, @@ -159,7 +159,7 @@ var _ = Describe("WorkerPool", func() { defer func() { if counter, ok := (pipe.wgex).(async.AssistedCounter); ok { - fmt.Printf("🎈🎈🎈🎈remaining count: '%v'\n", counter.Count()) + fmt.Printf("🎈🎈🎈🎈 remaining count: '%v'\n", counter.Count()) } }() diff --git a/internal/helpers/test-producer.go b/internal/helpers/test-producer.go index fd02f6c..3453e23 100644 --- a/internal/helpers/test-producer.go +++ b/internal/helpers/test-producer.go @@ -56,7 +56,7 @@ func (p *Producer[I, O]) run(ctx context.Context) { defer func() { close(p.JobsCh) p.quitter.Done(p.RoutineName) - fmt.Printf(">>>> producer.run - finished (QUIT). ✨✨✨ \n") + fmt.Printf(">>>> ✨ producer.run - finished (QUIT). ✨✨✨ \n") }() fmt.Printf(">>>> ✨ producer.run ...(ctx:%+v)\n", ctx) @@ -98,7 +98,7 @@ func (p *Producer[I, O]) item(ctx context.Context) bool { select { case <-ctx.Done(): - fmt.Println(">>>> πŸ’  producer.item - done received β›”β›”β›”") + fmt.Println(">>>> ✨ producer.item - done received β›”β›”β›”") result = false