Skip to content

Commit

Permalink
ref(async): tidy up annotated wait group (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 25, 2023
1 parent 653e58b commit 2f82164
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 58 deletions.
117 changes: 63 additions & 54 deletions async/annotated-wait-group.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type WaitGroupName string
type GoRoutineName = string
type GoRoutineName string
type GoRoutineID string
type namesCollection map[GoRoutineName]string

Expand All @@ -27,103 +27,113 @@ func ComposeGoRoutineID(prefix ...string) GoRoutineID {
)
}

//
// We need to use variadic parameter list in the methods because of go's lack of
// overloading methods; we need to support calls to wait group methods with or
// without the go routine name behind the same methods name, ie Add needs to be
// able to be invoked either way.

// WaitGroupEx the extended WaitGroup
type WaitGroupEx interface {
Add(delta int, name ...GoRoutineName)
Done(name ...GoRoutineName)
Wait(name ...GoRoutineName)
}

// ===> Adder

// AssistedAdder is the interface that is a restricted view of a wait group
// that only allows adding to the wait group with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
type AssistedAdder interface {
Add(delta int, name ...GoRoutineName)
}

// ===> Quitter

// AssistedQuitter is the interface that is a restricted view of a wait group
// that only allows Done signalling on the wait group with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
type AssistedQuitter interface {
Done(name ...GoRoutineName)
}

// ===> Waiter

// AssistedWaiter is the interface that is a restricted view of a wait group
// that only allows waiting on the wait group with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
type AssistedWaiter interface {
Done(name ...GoRoutineName)
}

// ===> Counter
// AssistedCounter is the interface that is a restricted view of a wait group
// that only allows querying the wait group count. This interface
// can be acquired from the wait group using a standard interface type query.
type AssistedCounter interface {
Count() int
}

type WaitGroupAssister struct {
counter int32
names namesCollection
type waitGroupAssister struct {
counter int32
names namesCollection
waitGroupName string
}

func (a *WaitGroupAssister) Add(delta int, name ...GoRoutineName) {
func (a *waitGroupAssister) Add(delta int, name ...GoRoutineName) {
a.counter += int32(delta)

if len(name) > 0 {
a.names[name[0]] = "foo"

fmt.Printf(" 🟪🟪🟪 [[ WaitGroupAssister.Add ]] - name: '%v' (delta: '%v', count: '%v') (running: '%v')\n",
name[0], delta, a.counter, a.running(),
)
a.indicate("➕➕➕", string(name[0]), "Add")
}
}

func (a *WaitGroupAssister) Done(name ...GoRoutineName) {
func (a *waitGroupAssister) Done(name ...GoRoutineName) {
a.counter--

if len(name) > 0 {
delete(a.names, name[0])

fmt.Printf(" 🔷🔷🔷 [[ WaitGroupAssister.Done ]] - name: '%v' (count: '%v') (running: '%v')\n",
name[0], a.counter, a.running(),
)
a.indicate("🚩🚩🚩", string(name[0]), "Done")
}
}

func (a *WaitGroupAssister) Wait(name ...GoRoutineName) {
func (a *waitGroupAssister) Wait(name ...GoRoutineName) {
if len(name) > 0 {
fmt.Printf(" 🟤🟤🟤 [[ WaitGroupAssister.Wait ]] - name: '%v' (count: '%v') (running: '%v')\n",
name[0], a.counter, a.running(),
)
a.indicate("🧭🧭🧭", string(name[0]), "Wait")
}
}

func (a *WaitGroupAssister) running() string {
return strings.Join(lo.Keys(a.names), "/")
func (a *waitGroupAssister) indicate(highlight, name, op string) {
fmt.Printf(
" %v [[ WaitGroupAssister(%v).%v ]] - gr-name: '%v' (count: '%v') (running: '%v')\n",
highlight, a.waitGroupName, op, name, a.counter, a.running(),
)
}

// You start off with a core instance and from here you can query it to get the
// assistant interfaces ... The WaitGroupCore and the WaitGroupAssister both
// need to know about each other, but making them hold a reference to each other
// will cause a memory leak. To resolve this, there is a release mechanism
// which is automatically enacted after the Wait has become unblocked. The
// consequence of this characteristic is that once a Wait has completed, it
// should not be re-used.
//
// We need to use variadic parameter list in the methods because of go's lack of
// overloading methods; we need to support calls to wait group methods with or
// without the go routine name behind the same methods name, ie Add needs to be
// able to be invoked either way.
// type WaitGroupCore struct {
// wg sync.WaitGroup
// }
func (a *waitGroupAssister) running() string {
runners := lo.Map(lo.Keys(a.names), func(item GoRoutineName, _ int) string {
return string(item)
})

return strings.Join(runners, "/")
}

// AnnotatedWaitGroup is a wrapper around the standard WaitGroup that
// provides annotations to wait group operations that can assist in
// diagnosing concurrency issues.
type AnnotatedWaitGroup struct {
wg sync.WaitGroup
assistant WaitGroupAssister
assistant waitGroupAssister
mux sync.Mutex
}

func NewAnnotatedWaitGroup(_ string) *AnnotatedWaitGroup {
// NewAnnotatedWaitGroup creates a new AnnotatedWaitGroup instance containing
// the core WaitGroup instance.
func NewAnnotatedWaitGroup(name string) *AnnotatedWaitGroup {
return &AnnotatedWaitGroup{
assistant: WaitGroupAssister{
names: make(namesCollection),
assistant: waitGroupAssister{
waitGroupName: name,
names: make(namesCollection),
},
}
}
Expand All @@ -132,31 +142,30 @@ func (d *AnnotatedWaitGroup) atomic(operation func()) {
operation()
}

// Add wraps the standard WaitGroup Add operation with the addition of
// being able to associate a go routine (identified by a client provided
// name) with the Add request.
func (d *AnnotatedWaitGroup) Add(delta int, name ...GoRoutineName) {
d.atomic(func() {
d.assistant.Add(delta, name...)
d.wg.Add(delta)
})
}

// Done wraps the standard WaitGroup Done operation with the addition of
// being able to associate a go routine (identified by a client provided
// name) with the Done request.
func (d *AnnotatedWaitGroup) Done(name ...GoRoutineName) {
d.atomic(func() {
d.assistant.Done(name...)
d.wg.Done()
})
}

// Wait wraps the standard WaitGroup Wait operation with the addition of
// being able to associate a go routine (identified by a client provided
// name) with the Wait request.
func (d *AnnotatedWaitGroup) Wait(name ...GoRoutineName) {
// We could make the wait an active operation, that includes
// creating a new go routine and a channel. The go routine
// will monitor the state of the wait group, every time
// either an Add or Done occurs, those send a message to the go
// routine via this channel. The go routing will simply keep
// reading the channel and displaying the current count. The
// active-ness is just a debugging tool, not intended to be
// used in production as it will be noisy by design either writing
// to the console or preferably writing to a log.
//
d.atomic(func() {
d.assistant.Wait(name...)
d.wg.Wait()
Expand Down
4 changes: 2 additions & 2 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type pipeline[I, O any] struct {

func start[I, O any]() *pipeline[I, O] {
pipe := &pipeline[I, O]{
wgex: async.NewAnnotatedWaitGroup("pipeline"),
wgex: async.NewAnnotatedWaitGroup("🍂 pipeline"),
outputsCh: make(chan async.JobOutput[O], OutputsChSize),
stop: noOp,
cancel: noOp,
Expand Down Expand Up @@ -159,7 +159,7 @@ var _ = Describe("WorkerPool", func() {

defer func() {
if counter, ok := (pipe.wgex).(async.AssistedCounter); ok {
fmt.Printf("🎈🎈🎈🎈remaining count: '%v'\n", counter.Count())
fmt.Printf("🎈🎈🎈🎈 remaining count: '%v'\n", counter.Count())
}
}()

Expand Down
4 changes: 2 additions & 2 deletions internal/helpers/test-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (p *Producer[I, O]) run(ctx context.Context) {
defer func() {
close(p.JobsCh)
p.quitter.Done(p.RoutineName)
fmt.Printf(">>>> producer.run - finished (QUIT). ✨✨✨ \n")
fmt.Printf(">>>> producer.run - finished (QUIT). ✨✨✨ \n")
}()

fmt.Printf(">>>> ✨ producer.run ...(ctx:%+v)\n", ctx)
Expand Down Expand Up @@ -98,7 +98,7 @@ func (p *Producer[I, O]) item(ctx context.Context) bool {

select {
case <-ctx.Done():
fmt.Println(">>>> 💠 producer.item - done received ⛔⛔⛔")
fmt.Println(">>>> producer.item - done received ⛔⛔⛔")

result = false

Expand Down

0 comments on commit 2f82164

Please sign in to comment.