Skip to content

Commit

Permalink
chore: setup ginkgo linter (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 14, 2023
1 parent 1a0acc7 commit 5e243be
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 41 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ linters:
- errcheck
- exportloopref
- exhaustive
- ginkgolinter
- goconst
- gocritic
- gofmt
Expand Down
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
"exportloopref",
"extendio",
"fieldalignment",
"fortytw",
"ginkgolinter",
"gobby",
"goconst",
"gocritic",
Expand Down
9 changes: 4 additions & 5 deletions async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,12 @@ func (p *WorkerPool[I, R]) run(
p.Quit.Done()
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}()
fmt.Println("===> 🧊 WorkerPool.run")
fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", ctx)

for running := true; running; {
select {
case <-ctx.Done():
fmt.Println("===> 🧊 WorkerPool.run - done received ☢️☢️☢️")
p.cancelWorkers()

running = false

Expand Down Expand Up @@ -163,7 +162,7 @@ func (p *WorkerPool[I, R]) run(

func (p *WorkerPool[I, R]) spawn(
ctx context.Context,
jobsInCh JobStreamIn[I],
jobsChIn JobStreamIn[I],
resultsChOut ResultStreamOut[R],
finishedChOut FinishedStreamOut,
) {
Expand All @@ -173,8 +172,8 @@ func (p *WorkerPool[I, R]) spawn(
core: &worker[I, R]{
id: p.composeID(),
exec: p.exec,
jobsInCh: jobsInCh,
resultsOutCh: resultsChOut,
jobsChIn: jobsChIn,
resultsChOut: resultsChOut,
finishedChOut: finishedChOut,
cancelChIn: cancelCh,
},
Expand Down
103 changes: 82 additions & 21 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ import (

func init() { rand.Seed(time.Now().Unix()) }

// TerminatorFunc brings the work pool processing to an end, eg
// by stopping or cancellation after the requested amount of time.
type TerminatorFunc[I, R any] func(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc)

func (f TerminatorFunc[I, R]) After(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) {
f(ctx, delay, funcs...)
}

const (
JobChSize = 10
ResultChSize = 10
Expand Down Expand Up @@ -70,23 +78,44 @@ type pipeline[I, R any] struct {
wg sync.WaitGroup
sequence int
resultsCh chan async.JobResult[R]
provider helpers.ProviderFn[I]
provider helpers.ProviderFunc[I]
producer *helpers.Producer[I, R]
pool *async.WorkerPool[I, R]
consumer *helpers.Consumer[R]
cancel TerminatorFunc[I, R]
stop TerminatorFunc[I, R]
}

func start[I, R any]() *pipeline[I, R] {
resultsCh := make(chan async.JobResult[R], ResultChSize)

pipe := &pipeline[I, R]{
resultsCh: resultsCh,
resultsCh: make(chan async.JobResult[R], ResultChSize),
stop: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
cancel: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
}

return pipe
}

func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.ProviderFn[I]) {
func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderFunc[I]) {
p.cancel = func(ctx context.Context, delay time.Duration, cancellations ...context.CancelFunc) {
go helpers.CancelProducerAfter[I, R](
ctx,
delay,
cancellations...,
)
}
p.stop = func(ctx context.Context, delay time.Duration, _ ...context.CancelFunc) {
go helpers.StopProducerAfter(
ctx,
p.producer,
delay,
)
}

p.producer = helpers.StartProducer[I, R](
ctx,
&p.wg,
Expand All @@ -98,7 +127,7 @@ func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.Pro
p.wg.Add(1)
}

func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.ExecutiveFunc[I, R]) {
func (p *pipeline[I, R]) process(ctx context.Context, executive async.ExecutiveFunc[I, R]) {
p.pool = async.NewWorkerPool[I, R](
&async.NewWorkerPoolParams[I, R]{
NoWorkers: 5,
Expand All @@ -113,7 +142,7 @@ func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.Executiv
p.wg.Add(1)
}

func (p *pipeline[I, R]) startConsumer(ctx context.Context) {
func (p *pipeline[I, R]) consume(ctx context.Context) {
p.consumer = helpers.StartConsumer(ctx,
&p.wg,
p.resultsCh,
Expand All @@ -122,14 +151,6 @@ func (p *pipeline[I, R]) startConsumer(ctx context.Context) {
p.wg.Add(1)
}

func (p *pipeline[I, R]) stopProducerAfter(ctx context.Context, after time.Duration) {
go helpers.StopProducerAfter(
ctx,
p.producer,
after,
)
}

var _ = Describe("WorkerPool", func() {
When("given: a stream of jobs", func() {
Context("and: Stopped", func() {
Expand All @@ -139,7 +160,7 @@ var _ = Describe("WorkerPool", func() {

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.startProducer(ctx, func() TestJobInput {
pipe.produce(ctx, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++
return TestJobInput{
Expand All @@ -149,13 +170,13 @@ var _ = Describe("WorkerPool", func() {
})

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.startPool(ctx, greeter)
pipe.process(ctx, greeter)

By("👾 WAIT-GROUP ADD(consumer)")
pipe.startConsumer(ctx)
pipe.consume(ctx)

By("👾 NOW AWAITING TERMINATION")
pipe.stopProducerAfter(ctx, time.Second/5)
pipe.stop.After(ctx, time.Second/5)
pipe.wg.Wait()

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
Expand All @@ -170,9 +191,49 @@ var _ = Describe("WorkerPool", func() {
})

Context("and: Cancelled", func() {
It("🧪 should: handle cancellation and shutdown cleanly", func(_ SpecContext) {
It("🧪 should: handle cancellation and shutdown cleanly", func(ctxSpec SpecContext) {
defer leaktest.Check(GinkgoT())()
pipe := start[TestJobInput, TestJobResult]()

ctxCancel, cancel := context.WithCancel(ctxSpec)
cancellations := []context.CancelFunc{cancel}

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.produce(ctxCancel, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++
return TestJobInput{
sequenceNo: sequence,
Recipient: audience[recipient],
}
})

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.process(ctxCancel, greeter)

By("👾 WAIT-GROUP ADD(consumer)")
pipe.consume(ctxCancel)

By("👾 NOW AWAITING TERMINATION")
pipe.cancel.After(ctxCancel, time.Second/5, cancellations...)

})
pipe.wg.Wait()

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
pipe.producer.Count,
pipe.consumer.Count,
)

// The producer count is higher than the consumer count. As a feature, we could
// collate the numbers produced vs the numbers consumed and perhaps also calculate
// which jobs were not processed, each indicated with their corresponding Input
// value.

// Eventually(ctxCancel, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed())
// Eventually(ctxCancel, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed())

}, SpecTimeout(time.Second*5))
})
})
})
9 changes: 5 additions & 4 deletions async/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
type worker[I any, R any] struct {
id WorkerID
exec ExecutiveFunc[I, R]
jobsInCh JobStreamIn[I]
resultsOutCh ResultStreamOut[R]
jobsChIn JobStreamIn[I]
resultsChOut ResultStreamOut[R]
finishedChOut FinishedStreamOut

// this might be better replaced with a broadcast mechanism such as sync.Cond
Expand All @@ -22,14 +22,15 @@ func (w *worker[I, R]) run(ctx context.Context) {
w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok
fmt.Printf(" <--- 🚀 worker.run(%v) (SENT FINISHED). 🚀🚀🚀\n", w.id)
}()
fmt.Printf(" ---> 🚀worker.run(%v) ...(ctx:%+v)\n", w.id, ctx)

for running := true; running; {
select {
case <-ctx.Done():
fmt.Printf(" ---> 🚀 worker.run(%v)(finished) - done received 🔶🔶🔶\n", w.id)

running = false
case job, ok := <-w.jobsInCh:
case job, ok := <-w.jobsChIn:
if ok {
fmt.Printf(" ---> 🚀 worker.run(%v)(input:'%v')\n", w.id, job.Input)
w.invoke(ctx, job)
Expand All @@ -49,6 +50,6 @@ func (w *worker[I, R]) invoke(ctx context.Context, job Job[I]) {
case <-ctx.Done():
fmt.Printf(" ---> 🚀 worker.invoke(%v)(cancel) - done received 💥💥💥\n", w.id)

case w.resultsOutCh <- result:
case w.resultsChOut <- result:
}
}
2 changes: 1 addition & 1 deletion internal/helpers/test-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *Consumer[R]) run(ctx context.Context) {
c.quit.Done()
fmt.Printf("<<<< consumer.run - finished (QUIT). 💠💠💠 \n")
}()
fmt.Printf("<<<< 💠 consumer.run ...\n")
fmt.Printf("<<<< 💠 consumer.run ...(ctx:%+v)\n", ctx)

for running := true; running; {
select {
Expand Down
66 changes: 56 additions & 10 deletions internal/helpers/test-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"github.com/snivilised/lorax/async"
)

type ProviderFn[I any] func() I
type ProviderFunc[I any] func() I

type Producer[I, R any] struct {
sequenceNo int
JobsCh async.JobStream[I]
quit *sync.WaitGroup
Count int
provider ProviderFn[I]
provider ProviderFunc[I]
delay int
terminateCh chan string
}
Expand All @@ -29,7 +29,7 @@ func StartProducer[I, R any](
ctx context.Context,
wg *sync.WaitGroup,
capacity int,
provider ProviderFn[I],
provider ProviderFunc[I],
delay int,
) *Producer[I, R] {
if delay == 0 {
Expand All @@ -55,8 +55,12 @@ func (p *Producer[I, R]) run(ctx context.Context) {
fmt.Printf(">>>> producer.run - finished (QUIT). ✨✨✨ \n")
}()

fmt.Printf(">>>> ✨ producer.run ...\n")
fmt.Printf(">>>> ✨ producer.run ...(ctx:%+v)\n", ctx)

// ⚠️ The producer is not responding to the done message, and therefore
// doesn't get to exit its loop and then invoke p.quit.Done(), hence
// the deadlock.
//
for running := true; running; {
select {
case <-ctx.Done():
Expand All @@ -70,21 +74,41 @@ func (p *Producer[I, R]) run(ctx context.Context) {

case <-time.After(time.Second / time.Duration(p.delay)):
fmt.Printf(">>>> ✨ producer.run - default (running: %v) ...\n", running)
p.item()

if !p.item(ctx) {
running = false
}
}
}
}

func (p *Producer[I, R]) item() {
func (p *Producer[I, R]) item(ctx context.Context) bool {
result := true
i := p.provider()
j := async.Job[I]{
ID: fmt.Sprintf("JOB-ID:%v", uuid.NewString()),
Input: i,
}
p.JobsCh <- j

fmt.Printf(">>>> ✨ producer.item, 🟠 waiting to post item: '%+v'\n", i)

select {
case <-ctx.Done():
fmt.Println(">>>> 💠 producer.item - done received ⛔⛔⛔")

result = false

case p.JobsCh <- j:
}
p.Count++

fmt.Printf(">>>> ✨ producer.item, posted item: '%+v'\n", i)
if result {
fmt.Printf(">>>> ✨ producer.item, 🟢 posted item: '%+v'\n", i)
} else {
fmt.Printf(">>>> ✨ producer.item, 🔴 item NOT posted: '%+v'\n", i)
}

return result
}

func (p *Producer[I, R]) Stop() {
Expand All @@ -99,12 +123,34 @@ func StopProducerAfter[I, R any](
producer *Producer[I, R],
delay time.Duration,
) {
fmt.Printf(" >>> 💤 Sleeping before requesting stop (%v) ...\n", delay)
fmt.Printf(" >>> 💤 StopAfter - Sleeping before requesting stop (%v) ...\n", delay)
select {
case <-ctx.Done():
case <-time.After(delay):
}

producer.Stop()
fmt.Printf(" >>> 🍧🍧🍧 stop submitted.\n")
fmt.Printf(" >>> StopAfter - 🍧🍧🍧 stop submitted.\n")
}

func CancelProducerAfter[I, R any](
_ context.Context,
delay time.Duration,
cancellation ...context.CancelFunc,
) {
fmt.Printf(" >>> 💤 CancelAfter - Sleeping before requesting cancellation (%v) ...\n", delay)
<-time.After(delay)

// we should always expect to get a cancel function back, even if we don't
// ever use it, so it is still relevant to get it in the stop test case
//
if len(cancellation) > 0 {
cancel := cancellation[0]

fmt.Printf(" >>> CancelAfter - 🛑🛑🛑 cancellation submitted.\n")
cancel()
fmt.Printf(" >>> CancelAfter - ➖➖➖ CANCELLED\n")
} else {
fmt.Printf(" >>> CancelAfter(noc) - ✖️✖️✖️ cancellation attempt benign.\n")
}
}

0 comments on commit 5e243be

Please sign in to comment.