diff --git a/.vscode/settings.json b/.vscode/settings.json index a288c2a..652c01e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -56,6 +56,7 @@ "varcheck", "watchvc", "watchvi", + "wgex", "xenomorph" ] } diff --git a/async/annotated-wait-group.go b/async/annotated-wait-group.go index 55cde09..d15ef35 100644 --- a/async/annotated-wait-group.go +++ b/async/annotated-wait-group.go @@ -2,16 +2,17 @@ package async import ( "fmt" + "strings" "sync" - "sync/atomic" "github.com/google/uuid" "github.com/samber/lo" ) type WaitGroupName string -type GoRoutineName string +type GoRoutineName = string type GoRoutineID string +type namesCollection map[GoRoutineName]string func ComposeGoRoutineID(prefix ...string) GoRoutineID { id := uuid.NewString() @@ -51,32 +52,52 @@ type AssistedWaiter interface { Done(name ...GoRoutineName) } +// ===> Counter +type AssistedCounter interface { + Count() int +} + type WaitGroupAssister struct { counter int32 + names namesCollection } -func (g *WaitGroupAssister) Add(delta int, name ...GoRoutineName) { +func (a *WaitGroupAssister) Add(delta int, name ...GoRoutineName) { + a.counter += int32(delta) + if len(name) > 0 { - fmt.Printf(" 🧩[[ WaitGroupAssister.Add ]] - name: '%v' (delta: '%v')\n", name[0], delta) - } + a.names[name[0]] = "foo" - atomic.AddInt32(&g.counter, int32(delta)) + fmt.Printf(" πŸŸͺπŸŸͺπŸŸͺ [[ WaitGroupAssister.Add ]] - name: '%v' (delta: '%v', count: '%v') (running: '%v')\n", + name[0], delta, a.counter, a.running(), + ) + } } -func (g *WaitGroupAssister) Done(name ...GoRoutineName) { +func (a *WaitGroupAssister) Done(name ...GoRoutineName) { + a.counter-- + if len(name) > 0 { - fmt.Printf(" 🧩[[ WaitGroupAssister.Done ]] - name: '%v'\n", name[0]) - } + delete(a.names, name[0]) - atomic.AddInt32(&g.counter, int32(-1)) + fmt.Printf(" πŸ”·πŸ”·πŸ”· [[ WaitGroupAssister.Done ]] - name: '%v' (count: '%v') (running: '%v')\n", + name[0], a.counter, a.running(), + ) + } } -func (g *WaitGroupAssister) Wait(name ...GoRoutineName) { +func (a *WaitGroupAssister) Wait(name ...GoRoutineName) { if len(name) > 0 { - fmt.Printf(" 🧩[[ WaitGroupAssister.Wait ]] - name: '%v'\n", name[0]) + fmt.Printf(" 🟀🟀🟀 [[ WaitGroupAssister.Wait ]] - name: '%v' (count: '%v') (running: '%v')\n", + name[0], a.counter, a.running(), + ) } } +func (a *WaitGroupAssister) running() string { + return strings.Join(lo.Keys(a.names), "/") +} + // You start off with a core instance and from here you can query it to get the // assistant interfaces ... The WaitGroupCore and the WaitGroupAssister both // need to know about each other, but making them hold a reference to each other @@ -99,24 +120,29 @@ type AnnotatedWaitGroup struct { mux sync.Mutex } -func (d *AnnotatedWaitGroup) atomic(operation func()) { - defer d.mux.Unlock() +func NewAnnotatedWaitGroup(_ string) *AnnotatedWaitGroup { + return &AnnotatedWaitGroup{ + assistant: WaitGroupAssister{ + names: make(namesCollection), + }, + } +} - d.mux.Lock() +func (d *AnnotatedWaitGroup) atomic(operation func()) { operation() } func (d *AnnotatedWaitGroup) Add(delta int, name ...GoRoutineName) { d.atomic(func() { - d.wg.Add(delta) d.assistant.Add(delta, name...) + d.wg.Add(delta) }) } func (d *AnnotatedWaitGroup) Done(name ...GoRoutineName) { d.atomic(func() { - d.wg.Done() d.assistant.Done(name...) + d.wg.Done() }) } @@ -132,7 +158,11 @@ func (d *AnnotatedWaitGroup) Wait(name ...GoRoutineName) { // to the console or preferably writing to a log. // d.atomic(func() { - d.wg.Wait() d.assistant.Wait(name...) + d.wg.Wait() }) } + +func (d *AnnotatedWaitGroup) Count() int { + return int(d.assistant.counter) +} diff --git a/async/annotated-wait-group_test.go b/async/annotated-wait-group_test.go index 93a21bc..6ba5680 100644 --- a/async/annotated-wait-group_test.go +++ b/async/annotated-wait-group_test.go @@ -12,7 +12,7 @@ var _ = Describe("AnnotatedWaitGroup", func() { Context("Add", func() { It("should: add", func() { - var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{} + var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("add-unit-test") wg.Add(1, "producer") }) @@ -20,7 +20,7 @@ var _ = Describe("AnnotatedWaitGroup", func() { Context("Done", func() { It("should: quit", func() { - var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{} + var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("done-unit-test") wg.Add(1, "producer") wg.Done("producer") @@ -29,7 +29,7 @@ var _ = Describe("AnnotatedWaitGroup", func() { Context("Wait", func() { It("should: quit", func() { - var wg async.WaitGroupEx = &async.AnnotatedWaitGroup{} + var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("wait-unit-test") wg.Add(1, "producer") go func() { diff --git a/async/worker-pool.go b/async/worker-pool.go index 4b946fc..54f0d33 100644 --- a/async/worker-pool.go +++ b/async/worker-pool.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "runtime" - "sync" "github.com/google/uuid" ) @@ -43,10 +42,11 @@ type privateWpInfo[I, O any] struct { type WorkerPool[I, O any] struct { private privateWpInfo[I, O] exec ExecutiveFunc[I, O] + RoutineName GoRoutineName noWorkers int SourceJobsChIn JobStreamR[I] - Quit *sync.WaitGroup + Quitter AssistedQuitter } type NewWorkerPoolParams[I, O any] struct { @@ -54,7 +54,7 @@ type NewWorkerPoolParams[I, O any] struct { Exec ExecutiveFunc[I, O] JobsCh chan Job[I] CancelCh CancelStream - Quit *sync.WaitGroup + Quitter AssistedQuitter } func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] { @@ -71,10 +71,11 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O cancelCh: params.CancelCh, }, exec: params.Exec, + RoutineName: GoRoutineName("🧊 worker pool"), noWorkers: noWorkers, SourceJobsChIn: params.JobsCh, - Quit: params.Quit, + Quitter: params.Quitter, } return wp @@ -107,7 +108,7 @@ func (p *WorkerPool[I, O]) run( ) { defer func() { close(outputsChOut) - p.Quit.Done() + p.Quitter.Done(p.RoutineName) fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n") }() fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", ctx) diff --git a/async/worker-pool_test.go b/async/worker-pool_test.go index fdb5862..a7d3594 100644 --- a/async/worker-pool_test.go +++ b/async/worker-pool_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "sync" "time" "github.com/fortytw2/leaktest" @@ -52,6 +51,8 @@ var ( } noOp = func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {} + + testMain = async.GoRoutineName("πŸ‘Ύ test-main") ) type TestJobInput struct { @@ -76,7 +77,7 @@ var greeter = func(j async.Job[TestJobInput]) (async.JobOutput[TestJobOutput], e } type pipeline[I, O any] struct { - wg sync.WaitGroup + wgex async.WaitGroupEx sequence int outputsCh chan async.JobOutput[O] provider helpers.ProviderFunc[I] @@ -89,6 +90,7 @@ type pipeline[I, O any] struct { func start[I, O any]() *pipeline[I, O] { pipe := &pipeline[I, O]{ + wgex: async.NewAnnotatedWaitGroup("pipeline"), outputsCh: make(chan async.JobOutput[O], OutputsChSize), stop: noOp, cancel: noOp, @@ -114,13 +116,13 @@ func (p *pipeline[I, O]) produce(ctx context.Context, provider helpers.ProviderF p.producer = helpers.StartProducer[I, O]( ctx, - &p.wg, + p.wgex, JobChSize, provider, Delay, ) - p.wg.Add(1) + p.wgex.Add(1, p.producer.RoutineName) } func (p *pipeline[I, O]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, O]) { @@ -130,37 +132,45 @@ func (p *pipeline[I, O]) process(ctx context.Context, noWorkers int, executive a Exec: executive, JobsCh: p.producer.JobsCh, CancelCh: make(async.CancelStream), - Quit: &p.wg, + Quitter: p.wgex, }) go p.pool.Start(ctx, p.outputsCh) - p.wg.Add(1) + p.wgex.Add(1, p.pool.RoutineName) } func (p *pipeline[I, O]) consume(ctx context.Context) { p.consumer = helpers.StartConsumer(ctx, - &p.wg, + p.wgex, p.outputsCh, ) - p.wg.Add(1) + p.wgex.Add(1, p.consumer.RoutineName) } -var _ = Describe("WorkerPool", func() { +var _ = FDescribe("WorkerPool", func() { When("given: a stream of jobs", func() { Context("and: Stopped", func() { - It("πŸ§ͺ should: receive and process all", func(ctx SpecContext) { + FIt("πŸ§ͺ should: receive and process all", func(ctx SpecContext) { defer leaktest.Check(GinkgoT())() + pipe := start[TestJobInput, TestJobOutput]() + defer func() { + if counter, ok := (pipe.wgex).(async.AssistedCounter); ok { + fmt.Printf("🎈🎈🎈🎈remaining count: '%v'\n", counter.Count()) + } + }() + By("πŸ‘Ύ WAIT-GROUP ADD(producer)") - pipe.produce(ctx, func() TestJobInput { + provider := func() TestJobInput { recipient := rand.Intn(len(audience)) //nolint:gosec // trivial return TestJobInput{ Recipient: audience[recipient], } - }) + } + pipe.produce(ctx, provider) By("πŸ‘Ύ WAIT-GROUP ADD(worker-pool)\n") pipe.process(ctx, DefaultNoWorkers, greeter) @@ -170,7 +180,7 @@ var _ = Describe("WorkerPool", func() { By("πŸ‘Ύ NOW AWAITING TERMINATION") pipe.stop.After(ctx, time.Second/5) - pipe.wg.Wait() + pipe.wgex.Wait(async.GoRoutineName("πŸ‘Ύ test-main")) fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n", pipe.producer.Count, @@ -209,7 +219,7 @@ var _ = Describe("WorkerPool", func() { By("πŸ‘Ύ NOW AWAITING TERMINATION") pipe.cancel.After(ctxCancel, time.Second/5, cancellations...) - pipe.wg.Wait() + pipe.wgex.Wait(async.GoRoutineName("πŸ‘Ύ test-main")) fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n", pipe.producer.Count, diff --git a/async/worker.go b/async/worker.go index 81f05c1..31f7973 100644 --- a/async/worker.go +++ b/async/worker.go @@ -22,7 +22,7 @@ func (w *worker[I, O]) run(ctx context.Context) { w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok fmt.Printf(" <--- πŸš€ worker.run(%v) (SENT FINISHED). πŸš€πŸš€πŸš€\n", w.id) }() - fmt.Printf(" ---> πŸš€worker.run(%v) ...(ctx:%+v)\n", w.id, ctx) + fmt.Printf(" ---> πŸš€ worker.run(%v) ...(ctx:%+v)\n", w.id, ctx) for running := true; running; { select { diff --git a/internal/helpers/test-consumer.go b/internal/helpers/test-consumer.go index 1666823..51c3f46 100644 --- a/internal/helpers/test-consumer.go +++ b/internal/helpers/test-consumer.go @@ -3,24 +3,25 @@ package helpers import ( "context" "fmt" - "sync" "github.com/snivilised/lorax/async" ) type Consumer[O any] struct { - quit *sync.WaitGroup + quitter async.AssistedQuitter + RoutineName async.GoRoutineName OutputsChIn async.OutputStreamR[O] Count int } func StartConsumer[O any]( ctx context.Context, - wg *sync.WaitGroup, + quitter async.AssistedQuitter, outputsChIn async.OutputStreamR[O], ) *Consumer[O] { consumer := &Consumer[O]{ - quit: wg, + quitter: quitter, + RoutineName: async.GoRoutineName("πŸ’  consumer"), OutputsChIn: outputsChIn, } go consumer.run(ctx) @@ -30,8 +31,8 @@ func StartConsumer[O any]( func (c *Consumer[O]) run(ctx context.Context) { defer func() { - c.quit.Done() - fmt.Printf("<<<< consumer.run - finished (QUIT). πŸ’ πŸ’ πŸ’  \n") + c.quitter.Done(c.RoutineName) + fmt.Printf("<<<< πŸ’  consumer.run - finished (QUIT). πŸ’ πŸ’ πŸ’  \n") }() fmt.Printf("<<<< πŸ’  consumer.run ...(ctx:%+v)\n", ctx) diff --git a/internal/helpers/test-producer.go b/internal/helpers/test-producer.go index a7de0d2..fd02f6c 100644 --- a/internal/helpers/test-producer.go +++ b/internal/helpers/test-producer.go @@ -3,7 +3,6 @@ package helpers import ( "context" "fmt" - "sync" "time" "github.com/google/uuid" @@ -16,7 +15,8 @@ type terminationStream chan termination type ProviderFunc[I any] func() I type Producer[I, O any] struct { - quit *sync.WaitGroup + quitter async.AssistedQuitter + RoutineName async.GoRoutineName sequenceNo int provider ProviderFunc[I] delay int @@ -30,7 +30,7 @@ type Producer[I, O any] struct { // indicate end of the work load. func StartProducer[I, O any]( ctx context.Context, - wg *sync.WaitGroup, + quitter async.AssistedQuitter, capacity int, provider ProviderFunc[I], delay int, @@ -40,7 +40,8 @@ func StartProducer[I, O any]( } producer := Producer[I, O]{ - quit: wg, + quitter: quitter, + RoutineName: async.GoRoutineName("✨ producer"), provider: provider, delay: delay, terminateCh: make(terminationStream), @@ -54,7 +55,7 @@ func StartProducer[I, O any]( func (p *Producer[I, O]) run(ctx context.Context) { defer func() { close(p.JobsCh) - p.quit.Done() + p.quitter.Done(p.RoutineName) fmt.Printf(">>>> producer.run - finished (QUIT). ✨✨✨ \n") }() @@ -65,7 +66,7 @@ func (p *Producer[I, O]) run(ctx context.Context) { case <-ctx.Done(): running = false - fmt.Println(">>>> πŸ’  producer.run - done received β›”β›”β›”") + fmt.Println(">>>> ✨ producer.run - done received β›”β›”β›”") case <-p.terminateCh: running = false