Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(async): implement pool cancellation (#15) #17

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"exportloopref",
"extendio",
"fieldalignment",
"fortytw",
"ginkgolinter",
"gobby",
"goconst",
Expand Down
15 changes: 7 additions & 8 deletions async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,12 @@ func (p *WorkerPool[I, R]) run(
p.Quit.Done()
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}()
fmt.Println("===> 🧊 WorkerPool.run")
fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", ctx)

for running := true; running; {
select {
case <-ctx.Done():
fmt.Println("===> 🧊 WorkerPool.run - done received ☢️☢️☢️")
p.cancelWorkers()

running = false

Expand Down Expand Up @@ -163,7 +162,7 @@ func (p *WorkerPool[I, R]) run(

func (p *WorkerPool[I, R]) spawn(
ctx context.Context,
jobsInCh JobStreamIn[I],
jobsChIn JobStreamIn[I],
resultsChOut ResultStreamOut[R],
finishedChOut FinishedStreamOut,
) {
Expand All @@ -173,8 +172,8 @@ func (p *WorkerPool[I, R]) spawn(
core: &worker[I, R]{
id: p.composeID(),
exec: p.exec,
jobsInCh: jobsInCh,
resultsOutCh: resultsChOut,
jobsChIn: jobsChIn,
resultsChOut: resultsChOut,
finishedChOut: finishedChOut,
cancelChIn: cancelCh,
},
Expand All @@ -196,9 +195,9 @@ func (p *WorkerPool[I, R]) drain(finishedChIn FinishedStreamIn) {
// 📍 Here, we don't access the finishedChIn channel in a pre-emptive way via
// the ctx.Done() channel. This is because in a unit test, we define a timeout as
// part of the test spec using SpecTimeout. When this fires, this is handled by the
// run loop, which ends that loop then enters drain. When this happens, you can't
// reuse that same done channel as it will immediately return the value already
// handled. This has the effect of short-circuiting this loop meaning that
// run loop, which ends that loop then enters drain the phase. When this happens,
// you can't reuse that same done channel as it will immediately return the value
// already handled. This has the effect of short-circuiting this loop meaning that
// workerID := <-finishedChIn never has a chance to be selected and the drain loop
// exits early. The end result of which means that the p.private.pool collection is
// never depleted.
Expand Down
108 changes: 83 additions & 25 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,18 @@ import (
"github.com/snivilised/lorax/internal/helpers"
)

const DefaultNoWorkers = 5

func init() { rand.Seed(time.Now().Unix()) }

// TerminatorFunc brings the work pool processing to an end, eg
// by stopping or cancellation after the requested amount of time.
type TerminatorFunc[I, R any] func(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc)

func (f TerminatorFunc[I, R]) After(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) {
f(ctx, delay, funcs...)
}

const (
JobChSize = 10
ResultChSize = 10
Expand Down Expand Up @@ -70,23 +80,43 @@ type pipeline[I, R any] struct {
wg sync.WaitGroup
sequence int
resultsCh chan async.JobResult[R]
provider helpers.ProviderFn[I]
provider helpers.ProviderFunc[I]
producer *helpers.Producer[I, R]
pool *async.WorkerPool[I, R]
consumer *helpers.Consumer[R]
cancel TerminatorFunc[I, R]
stop TerminatorFunc[I, R]
}

func start[I, R any]() *pipeline[I, R] {
resultsCh := make(chan async.JobResult[R], ResultChSize)

pipe := &pipeline[I, R]{
resultsCh: resultsCh,
resultsCh: make(chan async.JobResult[R], ResultChSize),
stop: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
cancel: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
}

return pipe
}

func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.ProviderFn[I]) {
func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderFunc[I]) {
p.cancel = func(ctx context.Context, delay time.Duration, cancellations ...context.CancelFunc) {
go helpers.CancelProducerAfter[I, R](
delay,
cancellations...,
)
}
p.stop = func(ctx context.Context, delay time.Duration, _ ...context.CancelFunc) {
go helpers.StopProducerAfter(
ctx,
p.producer,
delay,
)
}

p.producer = helpers.StartProducer[I, R](
ctx,
&p.wg,
Expand All @@ -98,10 +128,10 @@ func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.Pro
p.wg.Add(1)
}

func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.ExecutiveFunc[I, R]) {
func (p *pipeline[I, R]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, R]) {
p.pool = async.NewWorkerPool[I, R](
&async.NewWorkerPoolParams[I, R]{
NoWorkers: 5,
NoWorkers: noWorkers,
Exec: executive,
JobsCh: p.producer.JobsCh,
CancelCh: make(async.CancelStream),
Expand All @@ -113,7 +143,7 @@ func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.Executiv
p.wg.Add(1)
}

func (p *pipeline[I, R]) startConsumer(ctx context.Context) {
func (p *pipeline[I, R]) consume(ctx context.Context) {
p.consumer = helpers.StartConsumer(ctx,
&p.wg,
p.resultsCh,
Expand All @@ -122,14 +152,6 @@ func (p *pipeline[I, R]) startConsumer(ctx context.Context) {
p.wg.Add(1)
}

func (p *pipeline[I, R]) stopProducerAfter(ctx context.Context, after time.Duration) {
go helpers.StopProducerAfter(
ctx,
p.producer,
after,
)
}

var _ = Describe("WorkerPool", func() {
When("given: a stream of jobs", func() {
Context("and: Stopped", func() {
Expand All @@ -139,7 +161,7 @@ var _ = Describe("WorkerPool", func() {

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.startProducer(ctx, func() TestJobInput {
pipe.produce(ctx, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++
return TestJobInput{
Expand All @@ -149,13 +171,13 @@ var _ = Describe("WorkerPool", func() {
})

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.startPool(ctx, greeter)
pipe.process(ctx, DefaultNoWorkers, greeter)

By("👾 WAIT-GROUP ADD(consumer)")
pipe.startConsumer(ctx)
pipe.consume(ctx)

By("👾 NOW AWAITING TERMINATION")
pipe.stopProducerAfter(ctx, time.Second/5)
pipe.stop.After(ctx, time.Second/5)
pipe.wg.Wait()

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
Expand All @@ -170,13 +192,49 @@ var _ = Describe("WorkerPool", func() {
})

Context("and: Cancelled", func() {
It("should test something", func() { // It is ginkgo test case function
Expect(audience).To(HaveLen(14))
})
It("🧪 should: handle cancellation and shutdown cleanly", func(ctxSpec SpecContext) {
defer leaktest.Check(GinkgoT())()
pipe := start[TestJobInput, TestJobResult]()

ctxCancel, cancel := context.WithCancel(ctxSpec)
cancellations := []context.CancelFunc{cancel}

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.produce(ctxCancel, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++
return TestJobInput{
sequenceNo: sequence,
Recipient: audience[recipient],
}
})

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.process(ctxCancel, DefaultNoWorkers, greeter)

By("👾 WAIT-GROUP ADD(consumer)")
pipe.consume(ctxCancel)

By("👾 NOW AWAITING TERMINATION")
pipe.cancel.After(ctxCancel, time.Second/5, cancellations...)

pipe.wg.Wait()

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
pipe.producer.Count,
pipe.consumer.Count,
)

It("🧪 should: handle cancellation and shutdown cleanly", func(_ SpecContext) {
// The producer count is higher than the consumer count. As a feature, we could
// collate the numbers produced vs the numbers consumed and perhaps also calculate
// which jobs were not processed, each indicated with their corresponding Input
// value.

})
// Eventually(ctxCancel, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed())
// Eventually(ctxCancel, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed())

}, SpecTimeout(time.Second*5))
})
})
})
9 changes: 5 additions & 4 deletions async/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
type worker[I any, R any] struct {
id WorkerID
exec ExecutiveFunc[I, R]
jobsInCh JobStreamIn[I]
resultsOutCh ResultStreamOut[R]
jobsChIn JobStreamIn[I]
resultsChOut ResultStreamOut[R]
finishedChOut FinishedStreamOut

// this might be better replaced with a broadcast mechanism such as sync.Cond
Expand All @@ -22,14 +22,15 @@ func (w *worker[I, R]) run(ctx context.Context) {
w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok
fmt.Printf(" <--- 🚀 worker.run(%v) (SENT FINISHED). 🚀🚀🚀\n", w.id)
}()
fmt.Printf(" ---> 🚀worker.run(%v) ...(ctx:%+v)\n", w.id, ctx)

for running := true; running; {
select {
case <-ctx.Done():
fmt.Printf(" ---> 🚀 worker.run(%v)(finished) - done received 🔶🔶🔶\n", w.id)

running = false
case job, ok := <-w.jobsInCh:
case job, ok := <-w.jobsChIn:
if ok {
fmt.Printf(" ---> 🚀 worker.run(%v)(input:'%v')\n", w.id, job.Input)
w.invoke(ctx, job)
Expand All @@ -49,6 +50,6 @@ func (w *worker[I, R]) invoke(ctx context.Context, job Job[I]) {
case <-ctx.Done():
fmt.Printf(" ---> 🚀 worker.invoke(%v)(cancel) - done received 💥💥💥\n", w.id)

case w.resultsOutCh <- result:
case w.resultsChOut <- result:
}
}
16 changes: 8 additions & 8 deletions internal/helpers/test-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import (
)

type Consumer[R any] struct {
ResultsCh <-chan async.JobResult[R]
quit *sync.WaitGroup
Count int
ResultsChIn async.ResultStreamIn[R]
quit *sync.WaitGroup
Count int
}

func StartConsumer[R any](
ctx context.Context,
wg *sync.WaitGroup,
resultsCh <-chan async.JobResult[R],
resultsChIn async.ResultStreamIn[R],
) *Consumer[R] {
consumer := &Consumer[R]{
ResultsCh: resultsCh,
quit: wg,
ResultsChIn: resultsChIn,
quit: wg,
}
go consumer.run(ctx)

Expand All @@ -33,7 +33,7 @@ func (c *Consumer[R]) run(ctx context.Context) {
c.quit.Done()
fmt.Printf("<<<< consumer.run - finished (QUIT). 💠💠💠 \n")
}()
fmt.Printf("<<<< 💠 consumer.run ...\n")
fmt.Printf("<<<< 💠 consumer.run ...(ctx:%+v)\n", ctx)

for running := true; running; {
select {
Expand All @@ -42,7 +42,7 @@ func (c *Consumer[R]) run(ctx context.Context) {

fmt.Println("<<<< 💠 consumer.run - done received 💔💔💔")

case result, ok := <-c.ResultsCh:
case result, ok := <-c.ResultsChIn:
if ok {
c.Count++
fmt.Printf("<<<< 💠 consumer.run - new result arrived(#%v): '%+v' \n",
Expand Down
Loading