Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(async): use annotated work group (#37) #38

Merged
merged 1 commit into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"varcheck",
"watchvc",
"watchvi",
"wgex",
"xenomorph"
]
}
66 changes: 48 additions & 18 deletions async/annotated-wait-group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package async

import (
"fmt"
"strings"
"sync"
"sync/atomic"

"github.com/google/uuid"
"github.com/samber/lo"
)

type WaitGroupName string
type GoRoutineName string
type GoRoutineName = string
type GoRoutineID string
type namesCollection map[GoRoutineName]string

func ComposeGoRoutineID(prefix ...string) GoRoutineID {
id := uuid.NewString()
Expand Down Expand Up @@ -51,32 +52,52 @@ type AssistedWaiter interface {
Done(name ...GoRoutineName)
}

// ===> Counter
type AssistedCounter interface {
Count() int
}

type WaitGroupAssister struct {
counter int32
names namesCollection
}

func (g *WaitGroupAssister) Add(delta int, name ...GoRoutineName) {
func (a *WaitGroupAssister) Add(delta int, name ...GoRoutineName) {
a.counter += int32(delta)

if len(name) > 0 {
fmt.Printf(" 🧩[[ WaitGroupAssister.Add ]] - name: '%v' (delta: '%v')\n", name[0], delta)
}
a.names[name[0]] = "foo"

atomic.AddInt32(&g.counter, int32(delta))
fmt.Printf(" 🟪🟪🟪 [[ WaitGroupAssister.Add ]] - name: '%v' (delta: '%v', count: '%v') (running: '%v')\n",
name[0], delta, a.counter, a.running(),
)
}
}

func (g *WaitGroupAssister) Done(name ...GoRoutineName) {
func (a *WaitGroupAssister) Done(name ...GoRoutineName) {
a.counter--

if len(name) > 0 {
fmt.Printf(" 🧩[[ WaitGroupAssister.Done ]] - name: '%v'\n", name[0])
}
delete(a.names, name[0])

atomic.AddInt32(&g.counter, int32(-1))
fmt.Printf(" 🔷🔷🔷 [[ WaitGroupAssister.Done ]] - name: '%v' (count: '%v') (running: '%v')\n",
name[0], a.counter, a.running(),
)
}
}

func (g *WaitGroupAssister) Wait(name ...GoRoutineName) {
func (a *WaitGroupAssister) Wait(name ...GoRoutineName) {
if len(name) > 0 {
fmt.Printf(" 🧩[[ WaitGroupAssister.Wait ]] - name: '%v'\n", name[0])
fmt.Printf(" 🟤🟤🟤 [[ WaitGroupAssister.Wait ]] - name: '%v' (count: '%v') (running: '%v')\n",
name[0], a.counter, a.running(),
)
}
}

func (a *WaitGroupAssister) running() string {
return strings.Join(lo.Keys(a.names), "/")
}

// You start off with a core instance and from here you can query it to get the
// assistant interfaces ... The WaitGroupCore and the WaitGroupAssister both
// need to know about each other, but making them hold a reference to each other
Expand All @@ -99,24 +120,29 @@ type AnnotatedWaitGroup struct {
mux sync.Mutex
}

func (d *AnnotatedWaitGroup) atomic(operation func()) {
defer d.mux.Unlock()
func NewAnnotatedWaitGroup(_ string) *AnnotatedWaitGroup {
return &AnnotatedWaitGroup{
assistant: WaitGroupAssister{
names: make(namesCollection),
},
}
}

d.mux.Lock()
func (d *AnnotatedWaitGroup) atomic(operation func()) {
operation()
}

func (d *AnnotatedWaitGroup) Add(delta int, name ...GoRoutineName) {
d.atomic(func() {
d.wg.Add(delta)
d.assistant.Add(delta, name...)
d.wg.Add(delta)
})
}

func (d *AnnotatedWaitGroup) Done(name ...GoRoutineName) {
d.atomic(func() {
d.wg.Done()
d.assistant.Done(name...)
d.wg.Done()
})
}

Expand All @@ -132,7 +158,11 @@ func (d *AnnotatedWaitGroup) Wait(name ...GoRoutineName) {
// to the console or preferably writing to a log.
//
d.atomic(func() {
d.wg.Wait()
d.assistant.Wait(name...)
d.wg.Wait()
})
}

func (d *AnnotatedWaitGroup) Count() int {
return int(d.assistant.counter)
}
6 changes: 3 additions & 3 deletions async/annotated-wait-group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ var _ = Describe("AnnotatedWaitGroup", func() {

Context("Add", func() {
It("should: add", func() {
var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{}
var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("add-unit-test")

wg.Add(1, "producer")
})
})

Context("Done", func() {
It("should: quit", func() {
var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{}
var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("done-unit-test")

wg.Add(1, "producer")
wg.Done("producer")
Expand All @@ -29,7 +29,7 @@ var _ = Describe("AnnotatedWaitGroup", func() {

Context("Wait", func() {
It("should: quit", func() {
var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{}
var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("wait-unit-test")

wg.Add(1, "producer")
go func() {
Expand Down
11 changes: 6 additions & 5 deletions async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"runtime"
"sync"

"github.com/google/uuid"
)
Expand Down Expand Up @@ -43,18 +42,19 @@ type privateWpInfo[I, O any] struct {
type WorkerPool[I, O any] struct {
private privateWpInfo[I, O]
exec ExecutiveFunc[I, O]
RoutineName GoRoutineName
noWorkers int
SourceJobsChIn JobStreamR[I]

Quit *sync.WaitGroup
Quitter AssistedQuitter
}

type NewWorkerPoolParams[I, O any] struct {
NoWorkers int
Exec ExecutiveFunc[I, O]
JobsCh chan Job[I]
CancelCh CancelStream
Quit *sync.WaitGroup
Quitter AssistedQuitter
}

func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] {
Expand All @@ -71,10 +71,11 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O
cancelCh: params.CancelCh,
},
exec: params.Exec,
RoutineName: GoRoutineName("🧊 worker pool"),
noWorkers: noWorkers,
SourceJobsChIn: params.JobsCh,

Quit: params.Quit,
Quitter: params.Quitter,
}

return wp
Expand Down Expand Up @@ -107,7 +108,7 @@ func (p *WorkerPool[I, O]) run(
) {
defer func() {
close(outputsChOut)
p.Quit.Done()
p.Quitter.Done(p.RoutineName)
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}()
fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", ctx)
Expand Down
34 changes: 22 additions & 12 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/fortytw2/leaktest"
Expand Down Expand Up @@ -52,6 +51,8 @@ var (
}

noOp = func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {}

testMain = async.GoRoutineName("👾 test-main")
)

type TestJobInput struct {
Expand All @@ -76,7 +77,7 @@ var greeter = func(j async.Job[TestJobInput]) (async.JobOutput[TestJobOutput], e
}

type pipeline[I, O any] struct {
wg sync.WaitGroup
wgex async.WaitGroupEx
sequence int
outputsCh chan async.JobOutput[O]
provider helpers.ProviderFunc[I]
Expand All @@ -89,6 +90,7 @@ type pipeline[I, O any] struct {

func start[I, O any]() *pipeline[I, O] {
pipe := &pipeline[I, O]{
wgex: async.NewAnnotatedWaitGroup("pipeline"),
outputsCh: make(chan async.JobOutput[O], OutputsChSize),
stop: noOp,
cancel: noOp,
Expand All @@ -114,13 +116,13 @@ func (p *pipeline[I, O]) produce(ctx context.Context, provider helpers.ProviderF

p.producer = helpers.StartProducer[I, O](
ctx,
&p.wg,
p.wgex,
JobChSize,
provider,
Delay,
)

p.wg.Add(1)
p.wgex.Add(1, p.producer.RoutineName)
}

func (p *pipeline[I, O]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, O]) {
Expand All @@ -130,37 +132,45 @@ func (p *pipeline[I, O]) process(ctx context.Context, noWorkers int, executive a
Exec: executive,
JobsCh: p.producer.JobsCh,
CancelCh: make(async.CancelStream),
Quit: &p.wg,
Quitter: p.wgex,
})

go p.pool.Start(ctx, p.outputsCh)

p.wg.Add(1)
p.wgex.Add(1, p.pool.RoutineName)
}

func (p *pipeline[I, O]) consume(ctx context.Context) {
p.consumer = helpers.StartConsumer(ctx,
&p.wg,
p.wgex,
p.outputsCh,
)

p.wg.Add(1)
p.wgex.Add(1, p.consumer.RoutineName)
}

var _ = Describe("WorkerPool", func() {
When("given: a stream of jobs", func() {
Context("and: Stopped", func() {
It("🧪 should: receive and process all", func(ctx SpecContext) {
defer leaktest.Check(GinkgoT())()

pipe := start[TestJobInput, TestJobOutput]()

defer func() {
if counter, ok := (pipe.wgex).(async.AssistedCounter); ok {
fmt.Printf("🎈🎈🎈🎈remaining count: '%v'\n", counter.Count())
}
}()

By("👾 WAIT-GROUP ADD(producer)")
pipe.produce(ctx, func() TestJobInput {
provider := func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
return TestJobInput{
Recipient: audience[recipient],
}
})
}
pipe.produce(ctx, provider)

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.process(ctx, DefaultNoWorkers, greeter)
Expand All @@ -170,7 +180,7 @@ var _ = Describe("WorkerPool", func() {

By("👾 NOW AWAITING TERMINATION")
pipe.stop.After(ctx, time.Second/5)
pipe.wg.Wait()
pipe.wgex.Wait(async.GoRoutineName("👾 test-main"))

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
pipe.producer.Count,
Expand Down Expand Up @@ -209,7 +219,7 @@ var _ = Describe("WorkerPool", func() {
By("👾 NOW AWAITING TERMINATION")
pipe.cancel.After(ctxCancel, time.Second/5, cancellations...)

pipe.wg.Wait()
pipe.wgex.Wait(async.GoRoutineName("👾 test-main"))

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
pipe.producer.Count,
Expand Down
2 changes: 1 addition & 1 deletion async/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (w *worker[I, O]) run(ctx context.Context) {
w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok
fmt.Printf(" <--- 🚀 worker.run(%v) (SENT FINISHED). 🚀🚀🚀\n", w.id)
}()
fmt.Printf(" ---> 🚀worker.run(%v) ...(ctx:%+v)\n", w.id, ctx)
fmt.Printf(" ---> 🚀 worker.run(%v) ...(ctx:%+v)\n", w.id, ctx)

for running := true; running; {
select {
Expand Down
13 changes: 7 additions & 6 deletions internal/helpers/test-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,25 @@ package helpers
import (
"context"
"fmt"
"sync"

"github.com/snivilised/lorax/async"
)

type Consumer[O any] struct {
quit *sync.WaitGroup
quitter async.AssistedQuitter
RoutineName async.GoRoutineName
OutputsChIn async.OutputStreamR[O]
Count int
}

func StartConsumer[O any](
ctx context.Context,
wg *sync.WaitGroup,
quitter async.AssistedQuitter,
outputsChIn async.OutputStreamR[O],
) *Consumer[O] {
consumer := &Consumer[O]{
quit: wg,
quitter: quitter,
RoutineName: async.GoRoutineName("💠 consumer"),
OutputsChIn: outputsChIn,
}
go consumer.run(ctx)
Expand All @@ -30,8 +31,8 @@ func StartConsumer[O any](

func (c *Consumer[O]) run(ctx context.Context) {
defer func() {
c.quit.Done()
fmt.Printf("<<<< consumer.run - finished (QUIT). 💠💠💠 \n")
c.quitter.Done(c.RoutineName)
fmt.Printf("<<<< 💠 consumer.run - finished (QUIT). 💠💠💠 \n")
}()
fmt.Printf("<<<< 💠 consumer.run ...(ctx:%+v)\n", ctx)

Expand Down
Loading