Skip to content

Commit

Permalink
ref(async): use annotated work group (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 25, 2023
1 parent cc3a7ba commit 8004ef8
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 51 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"varcheck",
"watchvc",
"watchvi",
"wgex",
"xenomorph"
]
}
66 changes: 48 additions & 18 deletions async/annotated-wait-group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package async

import (
"fmt"
"strings"
"sync"
"sync/atomic"

"github.com/google/uuid"
"github.com/samber/lo"
)

type WaitGroupName string
type GoRoutineName string
type GoRoutineName = string
type GoRoutineID string
type namesCollection map[GoRoutineName]string

func ComposeGoRoutineID(prefix ...string) GoRoutineID {
id := uuid.NewString()
Expand Down Expand Up @@ -51,32 +52,52 @@ type AssistedWaiter interface {
Done(name ...GoRoutineName)
}

// ===> Counter
type AssistedCounter interface {
Count() int
}

type WaitGroupAssister struct {
counter int32
names namesCollection
}

func (g *WaitGroupAssister) Add(delta int, name ...GoRoutineName) {
func (a *WaitGroupAssister) Add(delta int, name ...GoRoutineName) {
a.counter += int32(delta)

if len(name) > 0 {
fmt.Printf(" 🧩[[ WaitGroupAssister.Add ]] - name: '%v' (delta: '%v')\n", name[0], delta)
}
a.names[name[0]] = "foo"

atomic.AddInt32(&g.counter, int32(delta))
fmt.Printf(" 🟪🟪🟪 [[ WaitGroupAssister.Add ]] - name: '%v' (delta: '%v', count: '%v') (running: '%v')\n",
name[0], delta, a.counter, a.running(),
)
}
}

func (g *WaitGroupAssister) Done(name ...GoRoutineName) {
func (a *WaitGroupAssister) Done(name ...GoRoutineName) {
a.counter--

if len(name) > 0 {
fmt.Printf(" 🧩[[ WaitGroupAssister.Done ]] - name: '%v'\n", name[0])
}
delete(a.names, name[0])

atomic.AddInt32(&g.counter, int32(-1))
fmt.Printf(" 🔷🔷🔷 [[ WaitGroupAssister.Done ]] - name: '%v' (count: '%v') (running: '%v')\n",
name[0], a.counter, a.running(),
)
}
}

func (g *WaitGroupAssister) Wait(name ...GoRoutineName) {
func (a *WaitGroupAssister) Wait(name ...GoRoutineName) {
if len(name) > 0 {
fmt.Printf(" 🧩[[ WaitGroupAssister.Wait ]] - name: '%v'\n", name[0])
fmt.Printf(" 🟤🟤🟤 [[ WaitGroupAssister.Wait ]] - name: '%v' (count: '%v') (running: '%v')\n",
name[0], a.counter, a.running(),
)
}
}

func (a *WaitGroupAssister) running() string {
return strings.Join(lo.Keys(a.names), "/")
}

// You start off with a core instance and from here you can query it to get the
// assistant interfaces ... The WaitGroupCore and the WaitGroupAssister both
// need to know about each other, but making them hold a reference to each other
Expand All @@ -99,24 +120,29 @@ type AnnotatedWaitGroup struct {
mux sync.Mutex
}

func (d *AnnotatedWaitGroup) atomic(operation func()) {
defer d.mux.Unlock()
func NewAnnotatedWaitGroup(_ string) *AnnotatedWaitGroup {
return &AnnotatedWaitGroup{
assistant: WaitGroupAssister{
names: make(namesCollection),
},
}
}

d.mux.Lock()
func (d *AnnotatedWaitGroup) atomic(operation func()) {
operation()
}

func (d *AnnotatedWaitGroup) Add(delta int, name ...GoRoutineName) {
d.atomic(func() {
d.wg.Add(delta)
d.assistant.Add(delta, name...)
d.wg.Add(delta)
})
}

func (d *AnnotatedWaitGroup) Done(name ...GoRoutineName) {
d.atomic(func() {
d.wg.Done()
d.assistant.Done(name...)
d.wg.Done()
})
}

Expand All @@ -132,7 +158,11 @@ func (d *AnnotatedWaitGroup) Wait(name ...GoRoutineName) {
// to the console or preferably writing to a log.
//
d.atomic(func() {
d.wg.Wait()
d.assistant.Wait(name...)
d.wg.Wait()
})
}

func (d *AnnotatedWaitGroup) Count() int {
return int(d.assistant.counter)
}
6 changes: 3 additions & 3 deletions async/annotated-wait-group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ var _ = Describe("AnnotatedWaitGroup", func() {

Context("Add", func() {
It("should: add", func() {
var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{}
var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("add-unit-test")

wg.Add(1, "producer")
})
})

Context("Done", func() {
It("should: quit", func() {
var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{}
var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("done-unit-test")

wg.Add(1, "producer")
wg.Done("producer")
Expand All @@ -29,7 +29,7 @@ var _ = Describe("AnnotatedWaitGroup", func() {

Context("Wait", func() {
It("should: quit", func() {
var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{}
var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("wait-unit-test")

wg.Add(1, "producer")
go func() {
Expand Down
11 changes: 6 additions & 5 deletions async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"runtime"
"sync"

"github.com/google/uuid"
)
Expand Down Expand Up @@ -43,18 +42,19 @@ type privateWpInfo[I, O any] struct {
type WorkerPool[I, O any] struct {
private privateWpInfo[I, O]
exec ExecutiveFunc[I, O]
RoutineName GoRoutineName
noWorkers int
SourceJobsChIn JobStreamR[I]

Quit *sync.WaitGroup
Quitter AssistedQuitter
}

type NewWorkerPoolParams[I, O any] struct {
NoWorkers int
Exec ExecutiveFunc[I, O]
JobsCh chan Job[I]
CancelCh CancelStream
Quit *sync.WaitGroup
Quitter AssistedQuitter
}

func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] {
Expand All @@ -71,10 +71,11 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O
cancelCh: params.CancelCh,
},
exec: params.Exec,
RoutineName: GoRoutineName("🧊 worker pool"),
noWorkers: noWorkers,
SourceJobsChIn: params.JobsCh,

Quit: params.Quit,
Quitter: params.Quitter,
}

return wp
Expand Down Expand Up @@ -107,7 +108,7 @@ func (p *WorkerPool[I, O]) run(
) {
defer func() {
close(outputsChOut)
p.Quit.Done()
p.Quitter.Done(p.RoutineName)
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}()
fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", ctx)
Expand Down
34 changes: 22 additions & 12 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/fortytw2/leaktest"
Expand Down Expand Up @@ -52,6 +51,8 @@ var (
}

noOp = func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {}

testMain = async.GoRoutineName("👾 test-main")
)

type TestJobInput struct {
Expand All @@ -76,7 +77,7 @@ var greeter = func(j async.Job[TestJobInput]) (async.JobOutput[TestJobOutput], e
}

type pipeline[I, O any] struct {
wg sync.WaitGroup
wgex async.WaitGroupEx
sequence int
outputsCh chan async.JobOutput[O]
provider helpers.ProviderFunc[I]
Expand All @@ -89,6 +90,7 @@ type pipeline[I, O any] struct {

func start[I, O any]() *pipeline[I, O] {
pipe := &pipeline[I, O]{
wgex: async.NewAnnotatedWaitGroup("pipeline"),
outputsCh: make(chan async.JobOutput[O], OutputsChSize),
stop: noOp,
cancel: noOp,
Expand All @@ -114,13 +116,13 @@ func (p *pipeline[I, O]) produce(ctx context.Context, provider helpers.ProviderF

p.producer = helpers.StartProducer[I, O](
ctx,
&p.wg,
p.wgex,
JobChSize,
provider,
Delay,
)

p.wg.Add(1)
p.wgex.Add(1, p.producer.RoutineName)
}

func (p *pipeline[I, O]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, O]) {
Expand All @@ -130,37 +132,45 @@ func (p *pipeline[I, O]) process(ctx context.Context, noWorkers int, executive a
Exec: executive,
JobsCh: p.producer.JobsCh,
CancelCh: make(async.CancelStream),
Quit: &p.wg,
Quitter: p.wgex,
})

go p.pool.Start(ctx, p.outputsCh)

p.wg.Add(1)
p.wgex.Add(1, p.pool.RoutineName)
}

func (p *pipeline[I, O]) consume(ctx context.Context) {
p.consumer = helpers.StartConsumer(ctx,
&p.wg,
p.wgex,
p.outputsCh,
)

p.wg.Add(1)
p.wgex.Add(1, p.consumer.RoutineName)
}

var _ = Describe("WorkerPool", func() {
When("given: a stream of jobs", func() {
Context("and: Stopped", func() {
It("🧪 should: receive and process all", func(ctx SpecContext) {
defer leaktest.Check(GinkgoT())()

pipe := start[TestJobInput, TestJobOutput]()

defer func() {
if counter, ok := (pipe.wgex).(async.AssistedCounter); ok {
fmt.Printf("🎈🎈🎈🎈remaining count: '%v'\n", counter.Count())
}
}()

By("👾 WAIT-GROUP ADD(producer)")
pipe.produce(ctx, func() TestJobInput {
provider := func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
return TestJobInput{
Recipient: audience[recipient],
}
})
}
pipe.produce(ctx, provider)

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.process(ctx, DefaultNoWorkers, greeter)
Expand All @@ -170,7 +180,7 @@ var _ = Describe("WorkerPool", func() {

By("👾 NOW AWAITING TERMINATION")
pipe.stop.After(ctx, time.Second/5)
pipe.wg.Wait()
pipe.wgex.Wait(async.GoRoutineName("👾 test-main"))

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
pipe.producer.Count,
Expand Down Expand Up @@ -209,7 +219,7 @@ var _ = Describe("WorkerPool", func() {
By("👾 NOW AWAITING TERMINATION")
pipe.cancel.After(ctxCancel, time.Second/5, cancellations...)

pipe.wg.Wait()
pipe.wgex.Wait(async.GoRoutineName("👾 test-main"))

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
pipe.producer.Count,
Expand Down
2 changes: 1 addition & 1 deletion async/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (w *worker[I, O]) run(ctx context.Context) {
w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok
fmt.Printf(" <--- 🚀 worker.run(%v) (SENT FINISHED). 🚀🚀🚀\n", w.id)
}()
fmt.Printf(" ---> 🚀worker.run(%v) ...(ctx:%+v)\n", w.id, ctx)
fmt.Printf(" ---> 🚀 worker.run(%v) ...(ctx:%+v)\n", w.id, ctx)

for running := true; running; {
select {
Expand Down
13 changes: 7 additions & 6 deletions internal/helpers/test-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,25 @@ package helpers
import (
"context"
"fmt"
"sync"

"github.com/snivilised/lorax/async"
)

type Consumer[O any] struct {
quit *sync.WaitGroup
quitter async.AssistedQuitter
RoutineName async.GoRoutineName
OutputsChIn async.OutputStreamR[O]
Count int
}

func StartConsumer[O any](
ctx context.Context,
wg *sync.WaitGroup,
quitter async.AssistedQuitter,
outputsChIn async.OutputStreamR[O],
) *Consumer[O] {
consumer := &Consumer[O]{
quit: wg,
quitter: quitter,
RoutineName: async.GoRoutineName("💠 consumer"),
OutputsChIn: outputsChIn,
}
go consumer.run(ctx)
Expand All @@ -30,8 +31,8 @@ func StartConsumer[O any](

func (c *Consumer[O]) run(ctx context.Context) {
defer func() {
c.quit.Done()
fmt.Printf("<<<< consumer.run - finished (QUIT). 💠💠💠 \n")
c.quitter.Done(c.RoutineName)
fmt.Printf("<<<< 💠 consumer.run - finished (QUIT). 💠💠💠 \n")
}()
fmt.Printf("<<<< 💠 consumer.run ...(ctx:%+v)\n", ctx)

Expand Down
Loading

0 comments on commit 8004ef8

Please sign in to comment.