Skip to content

Commit

Permalink
feat(async): add annotated wait group (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 25, 2023
1 parent de1c0b9 commit cc3a7ba
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 0 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"--fast"
],
"cSpell.words": [
"Assistable",
"astrolib",
"bodyclose",
"coverpkg",
Expand Down
138 changes: 138 additions & 0 deletions async/annotated-wait-group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package async

import (
"fmt"
"sync"
"sync/atomic"

"github.com/google/uuid"
"github.com/samber/lo"
)

type WaitGroupName string
type GoRoutineName string
type GoRoutineID string

func ComposeGoRoutineID(prefix ...string) GoRoutineID {
id := uuid.NewString()

return lo.TernaryF(len(prefix) > 0,
func() GoRoutineID {
return GoRoutineID(fmt.Sprintf("GR-ID(%v):%v", prefix[0], id))
},
func() GoRoutineID {
return GoRoutineID(fmt.Sprintf("GR-ID:%v", id))
},
)
}

// WaitGroupEx the extended WaitGroup
type WaitGroupEx interface {
Add(delta int, name ...GoRoutineName)
Done(name ...GoRoutineName)
Wait(name ...GoRoutineName)
}

// ===> Adder

type AssistedAdder interface {
Add(delta int, name ...GoRoutineName)
}

// ===> Quitter

type AssistedQuitter interface {
Done(name ...GoRoutineName)
}

// ===> Waiter

type AssistedWaiter interface {
Done(name ...GoRoutineName)
}

type WaitGroupAssister struct {
counter int32
}

func (g *WaitGroupAssister) Add(delta int, name ...GoRoutineName) {
if len(name) > 0 {
fmt.Printf(" 🧩[[ WaitGroupAssister.Add ]] - name: '%v' (delta: '%v')\n", name[0], delta)
}

atomic.AddInt32(&g.counter, int32(delta))
}

func (g *WaitGroupAssister) Done(name ...GoRoutineName) {
if len(name) > 0 {
fmt.Printf(" 🧩[[ WaitGroupAssister.Done ]] - name: '%v'\n", name[0])
}

atomic.AddInt32(&g.counter, int32(-1))
}

func (g *WaitGroupAssister) Wait(name ...GoRoutineName) {
if len(name) > 0 {
fmt.Printf(" 🧩[[ WaitGroupAssister.Wait ]] - name: '%v'\n", name[0])
}
}

// You start off with a core instance and from here you can query it to get the
// assistant interfaces ... The WaitGroupCore and the WaitGroupAssister both
// need to know about each other, but making them hold a reference to each other
// will cause a memory leak. To resolve this, there is a release mechanism
// which is automatically enacted after the Wait has become unblocked. The
// consequence of this characteristic is that once a Wait has completed, it
// should not be re-used.
//
// We need to use variadic parameter list in the methods because of go's lack of
// overloading methods; we need to support calls to wait group methods with or
// without the go routine name behind the same methods name, ie Add needs to be
// able to be invoked either way.
// type WaitGroupCore struct {
// wg sync.WaitGroup
// }

type AnnotatedWaitGroup struct {
wg sync.WaitGroup
assistant WaitGroupAssister
mux sync.Mutex
}

func (d *AnnotatedWaitGroup) atomic(operation func()) {
defer d.mux.Unlock()

d.mux.Lock()
operation()
}

func (d *AnnotatedWaitGroup) Add(delta int, name ...GoRoutineName) {
d.atomic(func() {
d.wg.Add(delta)
d.assistant.Add(delta, name...)
})
}

func (d *AnnotatedWaitGroup) Done(name ...GoRoutineName) {
d.atomic(func() {
d.wg.Done()
d.assistant.Done(name...)
})
}

func (d *AnnotatedWaitGroup) Wait(name ...GoRoutineName) {
// We could make the wait an active operation, that includes
// creating a new go routine and a channel. The go routine
// will monitor the state of the wait group, every time
// either an Add or Done occurs, those send a message to the go
// routine via this channel. The go routing will simply keep
// reading the channel and displaying the current count. The
// active-ness is just a debugging tool, not intended to be
// used in production as it will be noisy by design either writing
// to the console or preferably writing to a log.
//
d.atomic(func() {
d.wg.Wait()
d.assistant.Wait(name...)
})
}
42 changes: 42 additions & 0 deletions async/annotated-wait-group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package async_test

import (
"time"

. "github.com/onsi/ginkgo/v2"

"github.com/snivilised/lorax/async"
)

var _ = Describe("AnnotatedWaitGroup", func() {

Context("Add", func() {
It("should: add", func() {
var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{}

wg.Add(1, "producer")
})
})

Context("Done", func() {
It("should: quit", func() {
var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{}

wg.Add(1, "producer")
wg.Done("producer")
})
})

Context("Wait", func() {
It("should: quit", func() {
var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{}

wg.Add(1, "producer")
go func() {
wg.Done("producer")
}()
<-time.After(time.Second / 10)
wg.Wait("main")
})
})
})

0 comments on commit cc3a7ba

Please sign in to comment.