diff --git a/.vscode/settings.json b/.vscode/settings.json index 16ba0a4..0697148 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,6 +16,7 @@ "exportloopref", "extendio", "fieldalignment", + "fortytw", "ginkgolinter", "gobby", "goconst", diff --git a/async/worker-pool.go b/async/worker-pool.go index de34f70..c43573d 100644 --- a/async/worker-pool.go +++ b/async/worker-pool.go @@ -110,13 +110,12 @@ func (p *WorkerPool[I, R]) run( p.Quit.Done() fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n") }() - fmt.Println("===> 🧊 WorkerPool.run") + fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", ctx) for running := true; running; { select { case <-ctx.Done(): fmt.Println("===> 🧊 WorkerPool.run - done received ☒️☒️☒️") - p.cancelWorkers() running = false @@ -163,7 +162,7 @@ func (p *WorkerPool[I, R]) run( func (p *WorkerPool[I, R]) spawn( ctx context.Context, - jobsInCh JobStreamIn[I], + jobsChIn JobStreamIn[I], resultsChOut ResultStreamOut[R], finishedChOut FinishedStreamOut, ) { @@ -173,8 +172,8 @@ func (p *WorkerPool[I, R]) spawn( core: &worker[I, R]{ id: p.composeID(), exec: p.exec, - jobsInCh: jobsInCh, - resultsOutCh: resultsChOut, + jobsChIn: jobsChIn, + resultsChOut: resultsChOut, finishedChOut: finishedChOut, cancelChIn: cancelCh, }, @@ -196,9 +195,9 @@ func (p *WorkerPool[I, R]) drain(finishedChIn FinishedStreamIn) { // πŸ“ Here, we don't access the finishedChIn channel in a pre-emptive way via // the ctx.Done() channel. This is because in a unit test, we define a timeout as // part of the test spec using SpecTimeout. When this fires, this is handled by the - // run loop, which ends that loop then enters drain. When this happens, you can't - // reuse that same done channel as it will immediately return the value already - // handled. This has the effect of short-circuiting this loop meaning that + // run loop, which ends that loop then enters drain the phase. When this happens, + // you can't reuse that same done channel as it will immediately return the value + // already handled. This has the effect of short-circuiting this loop meaning that // workerID := <-finishedChIn never has a chance to be selected and the drain loop // exits early. The end result of which means that the p.private.pool collection is // never depleted. diff --git a/async/worker-pool_test.go b/async/worker-pool_test.go index 85b732a..6b63f29 100644 --- a/async/worker-pool_test.go +++ b/async/worker-pool_test.go @@ -15,8 +15,18 @@ import ( "github.com/snivilised/lorax/internal/helpers" ) +const DefaultNoWorkers = 5 + func init() { rand.Seed(time.Now().Unix()) } +// TerminatorFunc brings the work pool processing to an end, eg +// by stopping or cancellation after the requested amount of time. +type TerminatorFunc[I, R any] func(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) + +func (f TerminatorFunc[I, R]) After(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) { + f(ctx, delay, funcs...) +} + const ( JobChSize = 10 ResultChSize = 10 @@ -70,23 +80,43 @@ type pipeline[I, R any] struct { wg sync.WaitGroup sequence int resultsCh chan async.JobResult[R] - provider helpers.ProviderFn[I] + provider helpers.ProviderFunc[I] producer *helpers.Producer[I, R] pool *async.WorkerPool[I, R] consumer *helpers.Consumer[R] + cancel TerminatorFunc[I, R] + stop TerminatorFunc[I, R] } func start[I, R any]() *pipeline[I, R] { - resultsCh := make(chan async.JobResult[R], ResultChSize) - pipe := &pipeline[I, R]{ - resultsCh: resultsCh, + resultsCh: make(chan async.JobResult[R], ResultChSize), + stop: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) { + // no-op + }, + cancel: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) { + // no-op + }, } return pipe } -func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.ProviderFn[I]) { +func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderFunc[I]) { + p.cancel = func(ctx context.Context, delay time.Duration, cancellations ...context.CancelFunc) { + go helpers.CancelProducerAfter[I, R]( + delay, + cancellations..., + ) + } + p.stop = func(ctx context.Context, delay time.Duration, _ ...context.CancelFunc) { + go helpers.StopProducerAfter( + ctx, + p.producer, + delay, + ) + } + p.producer = helpers.StartProducer[I, R]( ctx, &p.wg, @@ -98,10 +128,10 @@ func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.Pro p.wg.Add(1) } -func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.ExecutiveFunc[I, R]) { +func (p *pipeline[I, R]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, R]) { p.pool = async.NewWorkerPool[I, R]( &async.NewWorkerPoolParams[I, R]{ - NoWorkers: 5, + NoWorkers: noWorkers, Exec: executive, JobsCh: p.producer.JobsCh, CancelCh: make(async.CancelStream), @@ -113,7 +143,7 @@ func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.Executiv p.wg.Add(1) } -func (p *pipeline[I, R]) startConsumer(ctx context.Context) { +func (p *pipeline[I, R]) consume(ctx context.Context) { p.consumer = helpers.StartConsumer(ctx, &p.wg, p.resultsCh, @@ -122,14 +152,6 @@ func (p *pipeline[I, R]) startConsumer(ctx context.Context) { p.wg.Add(1) } -func (p *pipeline[I, R]) stopProducerAfter(ctx context.Context, after time.Duration) { - go helpers.StopProducerAfter( - ctx, - p.producer, - after, - ) -} - var _ = Describe("WorkerPool", func() { When("given: a stream of jobs", func() { Context("and: Stopped", func() { @@ -139,7 +161,7 @@ var _ = Describe("WorkerPool", func() { By("πŸ‘Ύ WAIT-GROUP ADD(producer)") sequence := 0 - pipe.startProducer(ctx, func() TestJobInput { + pipe.produce(ctx, func() TestJobInput { recipient := rand.Intn(len(audience)) //nolint:gosec // trivial sequence++ return TestJobInput{ @@ -149,13 +171,13 @@ var _ = Describe("WorkerPool", func() { }) By("πŸ‘Ύ WAIT-GROUP ADD(worker-pool)\n") - pipe.startPool(ctx, greeter) + pipe.process(ctx, DefaultNoWorkers, greeter) By("πŸ‘Ύ WAIT-GROUP ADD(consumer)") - pipe.startConsumer(ctx) + pipe.consume(ctx) By("πŸ‘Ύ NOW AWAITING TERMINATION") - pipe.stopProducerAfter(ctx, time.Second/5) + pipe.stop.After(ctx, time.Second/5) pipe.wg.Wait() fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n", @@ -170,13 +192,49 @@ var _ = Describe("WorkerPool", func() { }) Context("and: Cancelled", func() { - It("should test something", func() { // It is ginkgo test case function - Expect(audience).To(HaveLen(14)) - }) + It("πŸ§ͺ should: handle cancellation and shutdown cleanly", func(ctxSpec SpecContext) { + defer leaktest.Check(GinkgoT())() + pipe := start[TestJobInput, TestJobResult]() + + ctxCancel, cancel := context.WithCancel(ctxSpec) + cancellations := []context.CancelFunc{cancel} + + By("πŸ‘Ύ WAIT-GROUP ADD(producer)") + sequence := 0 + pipe.produce(ctxCancel, func() TestJobInput { + recipient := rand.Intn(len(audience)) //nolint:gosec // trivial + sequence++ + return TestJobInput{ + sequenceNo: sequence, + Recipient: audience[recipient], + } + }) + + By("πŸ‘Ύ WAIT-GROUP ADD(worker-pool)\n") + pipe.process(ctxCancel, DefaultNoWorkers, greeter) + + By("πŸ‘Ύ WAIT-GROUP ADD(consumer)") + pipe.consume(ctxCancel) + + By("πŸ‘Ύ NOW AWAITING TERMINATION") + pipe.cancel.After(ctxCancel, time.Second/5, cancellations...) + + pipe.wg.Wait() + + fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n", + pipe.producer.Count, + pipe.consumer.Count, + ) - It("πŸ§ͺ should: handle cancellation and shutdown cleanly", func(_ SpecContext) { + // The producer count is higher than the consumer count. As a feature, we could + // collate the numbers produced vs the numbers consumed and perhaps also calculate + // which jobs were not processed, each indicated with their corresponding Input + // value. - }) + // Eventually(ctxCancel, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed()) + // Eventually(ctxCancel, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed()) + + }, SpecTimeout(time.Second*5)) }) }) }) diff --git a/async/worker.go b/async/worker.go index 6fd5a42..60ed9e9 100644 --- a/async/worker.go +++ b/async/worker.go @@ -8,8 +8,8 @@ import ( type worker[I any, R any] struct { id WorkerID exec ExecutiveFunc[I, R] - jobsInCh JobStreamIn[I] - resultsOutCh ResultStreamOut[R] + jobsChIn JobStreamIn[I] + resultsChOut ResultStreamOut[R] finishedChOut FinishedStreamOut // this might be better replaced with a broadcast mechanism such as sync.Cond @@ -22,6 +22,7 @@ func (w *worker[I, R]) run(ctx context.Context) { w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok fmt.Printf(" <--- πŸš€ worker.run(%v) (SENT FINISHED). πŸš€πŸš€πŸš€\n", w.id) }() + fmt.Printf(" ---> πŸš€worker.run(%v) ...(ctx:%+v)\n", w.id, ctx) for running := true; running; { select { @@ -29,7 +30,7 @@ func (w *worker[I, R]) run(ctx context.Context) { fmt.Printf(" ---> πŸš€ worker.run(%v)(finished) - done received πŸ”ΆπŸ”ΆπŸ”Ά\n", w.id) running = false - case job, ok := <-w.jobsInCh: + case job, ok := <-w.jobsChIn: if ok { fmt.Printf(" ---> πŸš€ worker.run(%v)(input:'%v')\n", w.id, job.Input) w.invoke(ctx, job) @@ -49,6 +50,6 @@ func (w *worker[I, R]) invoke(ctx context.Context, job Job[I]) { case <-ctx.Done(): fmt.Printf(" ---> πŸš€ worker.invoke(%v)(cancel) - done received πŸ’₯πŸ’₯πŸ’₯\n", w.id) - case w.resultsOutCh <- result: + case w.resultsChOut <- result: } } diff --git a/internal/helpers/test-consumer.go b/internal/helpers/test-consumer.go index 9cd91e6..e50f8da 100644 --- a/internal/helpers/test-consumer.go +++ b/internal/helpers/test-consumer.go @@ -9,19 +9,19 @@ import ( ) type Consumer[R any] struct { - ResultsCh <-chan async.JobResult[R] - quit *sync.WaitGroup - Count int + ResultsChIn async.ResultStreamIn[R] + quit *sync.WaitGroup + Count int } func StartConsumer[R any]( ctx context.Context, wg *sync.WaitGroup, - resultsCh <-chan async.JobResult[R], + resultsChIn async.ResultStreamIn[R], ) *Consumer[R] { consumer := &Consumer[R]{ - ResultsCh: resultsCh, - quit: wg, + ResultsChIn: resultsChIn, + quit: wg, } go consumer.run(ctx) @@ -33,7 +33,7 @@ func (c *Consumer[R]) run(ctx context.Context) { c.quit.Done() fmt.Printf("<<<< consumer.run - finished (QUIT). πŸ’ πŸ’ πŸ’  \n") }() - fmt.Printf("<<<< πŸ’  consumer.run ...\n") + fmt.Printf("<<<< πŸ’  consumer.run ...(ctx:%+v)\n", ctx) for running := true; running; { select { @@ -42,7 +42,7 @@ func (c *Consumer[R]) run(ctx context.Context) { fmt.Println("<<<< πŸ’  consumer.run - done received πŸ’”πŸ’”πŸ’”") - case result, ok := <-c.ResultsCh: + case result, ok := <-c.ResultsChIn: if ok { c.Count++ fmt.Printf("<<<< πŸ’  consumer.run - new result arrived(#%v): '%+v' \n", diff --git a/internal/helpers/test-producer.go b/internal/helpers/test-producer.go index 2d2d166..47b801f 100644 --- a/internal/helpers/test-producer.go +++ b/internal/helpers/test-producer.go @@ -10,14 +10,14 @@ import ( "github.com/snivilised/lorax/async" ) -type ProviderFn[I any] func() I +type ProviderFunc[I any] func() I type Producer[I, R any] struct { sequenceNo int JobsCh async.JobStream[I] quit *sync.WaitGroup Count int - provider ProviderFn[I] + provider ProviderFunc[I] delay int terminateCh chan string } @@ -29,7 +29,7 @@ func StartProducer[I, R any]( ctx context.Context, wg *sync.WaitGroup, capacity int, - provider ProviderFn[I], + provider ProviderFunc[I], delay int, ) *Producer[I, R] { if delay == 0 { @@ -55,7 +55,7 @@ func (p *Producer[I, R]) run(ctx context.Context) { fmt.Printf(">>>> producer.run - finished (QUIT). ✨✨✨ \n") }() - fmt.Printf(">>>> ✨ producer.run ...\n") + fmt.Printf(">>>> ✨ producer.run ...(ctx:%+v)\n", ctx) for running := true; running; { select { @@ -70,21 +70,41 @@ func (p *Producer[I, R]) run(ctx context.Context) { case <-time.After(time.Second / time.Duration(p.delay)): fmt.Printf(">>>> ✨ producer.run - default (running: %v) ...\n", running) - p.item() + + if !p.item(ctx) { + running = false + } } } } -func (p *Producer[I, R]) item() { +func (p *Producer[I, R]) item(ctx context.Context) bool { + result := true i := p.provider() j := async.Job[I]{ ID: fmt.Sprintf("JOB-ID:%v", uuid.NewString()), Input: i, } - p.JobsCh <- j + + fmt.Printf(">>>> ✨ producer.item, 🟠 waiting to post item: '%+v'\n", i) + + select { + case <-ctx.Done(): + fmt.Println(">>>> πŸ’  producer.item - done received β›”β›”β›”") + + result = false + + case p.JobsCh <- j: + } p.Count++ - fmt.Printf(">>>> ✨ producer.item, posted item: '%+v'\n", i) + if result { + fmt.Printf(">>>> ✨ producer.item, 🟒 posted item: '%+v'\n", i) + } else { + fmt.Printf(">>>> ✨ producer.item, πŸ”΄ item NOT posted: '%+v'\n", i) + } + + return result } func (p *Producer[I, R]) Stop() { @@ -99,12 +119,33 @@ func StopProducerAfter[I, R any]( producer *Producer[I, R], delay time.Duration, ) { - fmt.Printf(" >>> πŸ’€ Sleeping before requesting stop (%v) ...\n", delay) + fmt.Printf(" >>> πŸ’€ StopAfter - Sleeping before requesting stop (%v) ...\n", delay) select { case <-ctx.Done(): case <-time.After(delay): } producer.Stop() - fmt.Printf(" >>> 🍧🍧🍧 stop submitted.\n") + fmt.Printf(" >>> StopAfter - 🍧🍧🍧 stop submitted.\n") +} + +func CancelProducerAfter[I, R any]( + delay time.Duration, + cancellation ...context.CancelFunc, +) { + fmt.Printf(" >>> πŸ’€ CancelAfter - Sleeping before requesting cancellation (%v) ...\n", delay) + <-time.After(delay) + + // we should always expect to get a cancel function back, even if we don't + // ever use it, so it is still relevant to get it in the stop test case + // + if len(cancellation) > 0 { + cancel := cancellation[0] + + fmt.Printf(" >>> CancelAfter - πŸ›‘πŸ›‘πŸ›‘ cancellation submitted.\n") + cancel() + fmt.Printf(" >>> CancelAfter - βž–βž–βž– CANCELLED\n") + } else { + fmt.Printf(" >>> CancelAfter(noc) - βœ–οΈβœ–οΈβœ–οΈ cancellation attempt benign.\n") + } }