Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: setup ginkgo linter (#8) #16

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"exportloopref",
"extendio",
"fieldalignment",
"fortytw",
"ginkgolinter",
"gobby",
"goconst",
Expand Down
9 changes: 4 additions & 5 deletions async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,12 @@ func (p *WorkerPool[I, R]) run(
p.Quit.Done()
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}()
fmt.Println("===> 🧊 WorkerPool.run")
fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", ctx)

for running := true; running; {
select {
case <-ctx.Done():
fmt.Println("===> 🧊 WorkerPool.run - done received ☢️☢️☢️")
p.cancelWorkers()

running = false

Expand Down Expand Up @@ -163,7 +162,7 @@ func (p *WorkerPool[I, R]) run(

func (p *WorkerPool[I, R]) spawn(
ctx context.Context,
jobsInCh JobStreamIn[I],
jobsChIn JobStreamIn[I],
resultsChOut ResultStreamOut[R],
finishedChOut FinishedStreamOut,
) {
Expand All @@ -173,8 +172,8 @@ func (p *WorkerPool[I, R]) spawn(
core: &worker[I, R]{
id: p.composeID(),
exec: p.exec,
jobsInCh: jobsInCh,
resultsOutCh: resultsChOut,
jobsChIn: jobsChIn,
resultsChOut: resultsChOut,
finishedChOut: finishedChOut,
cancelChIn: cancelCh,
},
Expand Down
102 changes: 82 additions & 20 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

func init() { rand.Seed(time.Now().Unix()) }

// TerminatorFunc brings the work pool processing to an end, eg
// by stopping or cancellation after the requested amount of time.
type TerminatorFunc[I, R any] func(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc)

func (f TerminatorFunc[I, R]) After(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) {
f(ctx, delay, funcs...)
}

const (
JobChSize = 10
ResultChSize = 10
Expand Down Expand Up @@ -70,23 +78,44 @@
wg sync.WaitGroup
sequence int
resultsCh chan async.JobResult[R]
provider helpers.ProviderFn[I]
provider helpers.ProviderFunc[I]
producer *helpers.Producer[I, R]
pool *async.WorkerPool[I, R]
consumer *helpers.Consumer[R]
cancel TerminatorFunc[I, R]
stop TerminatorFunc[I, R]
}

func start[I, R any]() *pipeline[I, R] {
resultsCh := make(chan async.JobResult[R], ResultChSize)

pipe := &pipeline[I, R]{
resultsCh: resultsCh,
resultsCh: make(chan async.JobResult[R], ResultChSize),
stop: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
cancel: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
}

return pipe
}

func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.ProviderFn[I]) {
func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderFunc[I]) {
p.cancel = func(ctx context.Context, delay time.Duration, cancellations ...context.CancelFunc) {
go helpers.CancelProducerAfter[I, R](
ctx,
delay,
cancellations...,
)
}
p.stop = func(ctx context.Context, delay time.Duration, _ ...context.CancelFunc) {
go helpers.StopProducerAfter(
ctx,
p.producer,
delay,
)
}

p.producer = helpers.StartProducer[I, R](
ctx,
&p.wg,
Expand All @@ -98,7 +127,7 @@
p.wg.Add(1)
}

func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.ExecutiveFunc[I, R]) {
func (p *pipeline[I, R]) process(ctx context.Context, executive async.ExecutiveFunc[I, R]) {
p.pool = async.NewWorkerPool[I, R](
&async.NewWorkerPoolParams[I, R]{
NoWorkers: 5,
Expand All @@ -113,7 +142,7 @@
p.wg.Add(1)
}

func (p *pipeline[I, R]) startConsumer(ctx context.Context) {
func (p *pipeline[I, R]) consume(ctx context.Context) {
p.consumer = helpers.StartConsumer(ctx,
&p.wg,
p.resultsCh,
Expand All @@ -122,14 +151,6 @@
p.wg.Add(1)
}

func (p *pipeline[I, R]) stopProducerAfter(ctx context.Context, after time.Duration) {
go helpers.StopProducerAfter(
ctx,
p.producer,
after,
)
}

var _ = Describe("WorkerPool", func() {
When("given: a stream of jobs", func() {
Context("and: Stopped", func() {
Expand All @@ -139,7 +160,7 @@

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.startProducer(ctx, func() TestJobInput {
pipe.produce(ctx, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++
return TestJobInput{
Expand All @@ -149,13 +170,13 @@
})

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.startPool(ctx, greeter)
pipe.process(ctx, greeter)

By("👾 WAIT-GROUP ADD(consumer)")
pipe.startConsumer(ctx)
pipe.consume(ctx)

By("👾 NOW AWAITING TERMINATION")
pipe.stopProducerAfter(ctx, time.Second/5)
pipe.stop.After(ctx, time.Second/5)
pipe.wg.Wait()

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
Expand All @@ -170,13 +191,54 @@
})

Context("and: Cancelled", func() {
It("🧪 should: handle cancellation and shutdown cleanly", func(ctxSpec SpecContext) {
defer leaktest.Check(GinkgoT())()
pipe := start[TestJobInput, TestJobResult]()

ctxCancel, cancel := context.WithCancel(ctxSpec)
cancellations := []context.CancelFunc{cancel}

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.produce(ctxCancel, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++
return TestJobInput{
sequenceNo: sequence,
Recipient: audience[recipient],
}
})

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.process(ctxCancel, greeter)

By("👾 WAIT-GROUP ADD(consumer)")
pipe.consume(ctxCancel)

By("👾 NOW AWAITING TERMINATION")
pipe.cancel.After(ctxCancel, time.Second/5, cancellations...)
It("should test something", func() { // It is ginkgo test case function
Expect(audience).To(HaveLen(14))
})

It("🧪 should: handle cancellation and shutdown cleanly", func(_ SpecContext) {

})
pipe.wg.Wait()

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
pipe.producer.Count,
pipe.consumer.Count,
)

// The producer count is higher than the consumer count. As a feature, we could
// collate the numbers produced vs the numbers consumed and perhaps also calculate
// which jobs were not processed, each indicated with their corresponding Input
// value.

// Eventually(ctxCancel, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed())
// Eventually(ctxCancel, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed())

}, SpecTimeout(time.Second*5))
})
})
})

Check failure on line 244 in async/worker-pool_test.go

View workflow job for this annotation

GitHub Actions / lint

expected '}', found 'EOF' (typecheck)

Check failure on line 244 in async/worker-pool_test.go

View workflow job for this annotation

GitHub Actions / test (1.19, ubuntu-latest)

expected '}', found 'EOF'
9 changes: 5 additions & 4 deletions async/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
type worker[I any, R any] struct {
id WorkerID
exec ExecutiveFunc[I, R]
jobsInCh JobStreamIn[I]
resultsOutCh ResultStreamOut[R]
jobsChIn JobStreamIn[I]
resultsChOut ResultStreamOut[R]
finishedChOut FinishedStreamOut

// this might be better replaced with a broadcast mechanism such as sync.Cond
Expand All @@ -22,14 +22,15 @@ func (w *worker[I, R]) run(ctx context.Context) {
w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok
fmt.Printf(" <--- 🚀 worker.run(%v) (SENT FINISHED). 🚀🚀🚀\n", w.id)
}()
fmt.Printf(" ---> 🚀worker.run(%v) ...(ctx:%+v)\n", w.id, ctx)

for running := true; running; {
select {
case <-ctx.Done():
fmt.Printf(" ---> 🚀 worker.run(%v)(finished) - done received 🔶🔶🔶\n", w.id)

running = false
case job, ok := <-w.jobsInCh:
case job, ok := <-w.jobsChIn:
if ok {
fmt.Printf(" ---> 🚀 worker.run(%v)(input:'%v')\n", w.id, job.Input)
w.invoke(ctx, job)
Expand All @@ -49,6 +50,6 @@ func (w *worker[I, R]) invoke(ctx context.Context, job Job[I]) {
case <-ctx.Done():
fmt.Printf(" ---> 🚀 worker.invoke(%v)(cancel) - done received 💥💥💥\n", w.id)

case w.resultsOutCh <- result:
case w.resultsChOut <- result:
}
}
2 changes: 1 addition & 1 deletion internal/helpers/test-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *Consumer[R]) run(ctx context.Context) {
c.quit.Done()
fmt.Printf("<<<< consumer.run - finished (QUIT). 💠💠💠 \n")
}()
fmt.Printf("<<<< 💠 consumer.run ...\n")
fmt.Printf("<<<< 💠 consumer.run ...(ctx:%+v)\n", ctx)

for running := true; running; {
select {
Expand Down
66 changes: 56 additions & 10 deletions internal/helpers/test-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"github.com/snivilised/lorax/async"
)

type ProviderFn[I any] func() I
type ProviderFunc[I any] func() I

type Producer[I, R any] struct {
sequenceNo int
JobsCh async.JobStream[I]
quit *sync.WaitGroup
Count int
provider ProviderFn[I]
provider ProviderFunc[I]
delay int
terminateCh chan string
}
Expand All @@ -29,7 +29,7 @@ func StartProducer[I, R any](
ctx context.Context,
wg *sync.WaitGroup,
capacity int,
provider ProviderFn[I],
provider ProviderFunc[I],
delay int,
) *Producer[I, R] {
if delay == 0 {
Expand All @@ -55,8 +55,12 @@ func (p *Producer[I, R]) run(ctx context.Context) {
fmt.Printf(">>>> producer.run - finished (QUIT). ✨✨✨ \n")
}()

fmt.Printf(">>>> ✨ producer.run ...\n")
fmt.Printf(">>>> ✨ producer.run ...(ctx:%+v)\n", ctx)

// ⚠️ The producer is not responding to the done message, and therefore
// doesn't get to exit its loop and then invoke p.quit.Done(), hence
// the deadlock.
//
for running := true; running; {
select {
case <-ctx.Done():
Expand All @@ -70,21 +74,41 @@ func (p *Producer[I, R]) run(ctx context.Context) {

case <-time.After(time.Second / time.Duration(p.delay)):
fmt.Printf(">>>> ✨ producer.run - default (running: %v) ...\n", running)
p.item()

if !p.item(ctx) {
running = false
}
}
}
}

func (p *Producer[I, R]) item() {
func (p *Producer[I, R]) item(ctx context.Context) bool {
result := true
i := p.provider()
j := async.Job[I]{
ID: fmt.Sprintf("JOB-ID:%v", uuid.NewString()),
Input: i,
}
p.JobsCh <- j

fmt.Printf(">>>> ✨ producer.item, 🟠 waiting to post item: '%+v'\n", i)

select {
case <-ctx.Done():
fmt.Println(">>>> 💠 producer.item - done received ⛔⛔⛔")

result = false

case p.JobsCh <- j:
}
p.Count++

fmt.Printf(">>>> ✨ producer.item, posted item: '%+v'\n", i)
if result {
fmt.Printf(">>>> ✨ producer.item, 🟢 posted item: '%+v'\n", i)
} else {
fmt.Printf(">>>> ✨ producer.item, 🔴 item NOT posted: '%+v'\n", i)
}

return result
}

func (p *Producer[I, R]) Stop() {
Expand All @@ -99,12 +123,34 @@ func StopProducerAfter[I, R any](
producer *Producer[I, R],
delay time.Duration,
) {
fmt.Printf(" >>> 💤 Sleeping before requesting stop (%v) ...\n", delay)
fmt.Printf(" >>> 💤 StopAfter - Sleeping before requesting stop (%v) ...\n", delay)
select {
case <-ctx.Done():
case <-time.After(delay):
}

producer.Stop()
fmt.Printf(" >>> 🍧🍧🍧 stop submitted.\n")
fmt.Printf(" >>> StopAfter - 🍧🍧🍧 stop submitted.\n")
}

func CancelProducerAfter[I, R any](
_ context.Context,
delay time.Duration,
cancellation ...context.CancelFunc,
) {
fmt.Printf(" >>> 💤 CancelAfter - Sleeping before requesting cancellation (%v) ...\n", delay)
<-time.After(delay)

// we should always expect to get a cancel function back, even if we don't
// ever use it, so it is still relevant to get it in the stop test case
//
if len(cancellation) > 0 {
cancel := cancellation[0]

fmt.Printf(" >>> CancelAfter - 🛑🛑🛑 cancellation submitted.\n")
cancel()
fmt.Printf(" >>> CancelAfter - ➖➖➖ CANCELLED\n")
} else {
fmt.Printf(" >>> CancelAfter(noc) - ✖️✖️✖️ cancellation attempt benign.\n")
}
}
Loading