diff --git a/async/pool-defs-internal.go b/async/pool-defs-internal.go index 5a4ef12..8a380c7 100644 --- a/async/pool-defs-internal.go +++ b/async/pool-defs-internal.go @@ -7,9 +7,9 @@ const ( DefaultChSize = 100 ) -type workerWrapper[I any, R any] struct { +type workerWrapper[I any, O any] struct { cancelChOut chan<- CancelWorkSignal - core *worker[I, R] + core *worker[I, O] } -type workersCollection[I, R any] map[WorkerID]*workerWrapper[I, R] +type workersCollection[I, O any] map[WorkerID]*workerWrapper[I, O] diff --git a/async/pool-defs.go b/async/pool-defs.go index 69c1ef5..b9a8c87 100644 --- a/async/pool-defs.go +++ b/async/pool-defs.go @@ -20,30 +20,30 @@ type Job[I any] struct { SequenceNo int } -type ExecutiveFunc[I, R any] func(j Job[I]) (JobResult[R], error) +type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error) -func (f ExecutiveFunc[I, R]) Invoke(j Job[I]) (JobResult[R], error) { +func (f ExecutiveFunc[I, O]) Invoke(j Job[I]) (JobOutput[O], error) { return f(j) } -type JobResult[R any] struct { - Payload R +type JobOutput[O any] struct { + Payload O } type JobStream[I any] chan Job[I] -type JobStreamIn[I any] <-chan Job[I] -type JobStreamOut[I any] chan<- Job[I] +type JobStreamR[I any] <-chan Job[I] +type JobStreamW[I any] chan<- Job[I] -type ResultStream[R any] chan JobResult[R] -type ResultStreamIn[R any] <-chan JobResult[R] -type ResultStreamOut[R any] chan<- JobResult[R] +type OutputStream[O any] chan JobOutput[O] +type OutputStreamR[O any] <-chan JobOutput[O] +type OutputStreamW[O any] chan<- JobOutput[O] type CancelWorkSignal struct{} type CancelStream = chan CancelWorkSignal -type CancelStreamIn = <-chan CancelWorkSignal -type CancelStreamOut = chan<- CancelWorkSignal +type CancelStreamR = <-chan CancelWorkSignal +type CancelStreamW = chan<- CancelWorkSignal type WorkerID string type FinishedStream = chan WorkerID -type FinishedStreamIn = <-chan WorkerID -type FinishedStreamOut = chan<- WorkerID +type FinishedStreamR = <-chan WorkerID +type FinishedStreamW = chan<- WorkerID diff --git a/async/worker-pool.go b/async/worker-pool.go index 3867bb3..4b946fc 100644 --- a/async/worker-pool.go +++ b/async/worker-pool.go @@ -30,8 +30,8 @@ import ( // that any channel defined in privateWpInfo should never to accessed directly (other // than for passing it to another method). This is an experimental convention that // I'm establishing for all snivilised projects. -type privateWpInfo[I, R any] struct { - pool workersCollection[I, R] +type privateWpInfo[I, O any] struct { + pool workersCollection[I, O] workersJobsCh chan Job[I] finishedCh FinishedStream cancelCh CancelStream @@ -40,32 +40,32 @@ type privateWpInfo[I, R any] struct { // WorkerPool owns the resultOut channel, because it is the only entity that knows // when all workers have completed their work due to the finished channel, which it also // owns. -type WorkerPool[I, R any] struct { - private privateWpInfo[I, R] - exec ExecutiveFunc[I, R] +type WorkerPool[I, O any] struct { + private privateWpInfo[I, O] + exec ExecutiveFunc[I, O] noWorkers int - SourceJobsChIn JobStreamIn[I] + SourceJobsChIn JobStreamR[I] Quit *sync.WaitGroup } -type NewWorkerPoolParams[I, R any] struct { +type NewWorkerPoolParams[I, O any] struct { NoWorkers int - Exec ExecutiveFunc[I, R] + Exec ExecutiveFunc[I, O] JobsCh chan Job[I] CancelCh CancelStream Quit *sync.WaitGroup } -func NewWorkerPool[I, R any](params *NewWorkerPoolParams[I, R]) *WorkerPool[I, R] { +func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] { noWorkers := runtime.NumCPU() if params.NoWorkers > 1 && params.NoWorkers <= MaxWorkers { noWorkers = params.NoWorkers } - wp := &WorkerPool[I, R]{ - private: privateWpInfo[I, R]{ - pool: make(workersCollection[I, R], noWorkers), + wp := &WorkerPool[I, O]{ + private: privateWpInfo[I, O]{ + pool: make(workersCollection[I, O], noWorkers), workersJobsCh: make(chan Job[I], noWorkers), finishedCh: make(FinishedStream, noWorkers), cancelCh: params.CancelCh, @@ -86,27 +86,27 @@ var eyeballs = []string{ "โค๏ธ", "๐Ÿ’™", "๐Ÿ’š", "๐Ÿ’œ", "๐Ÿ’›", "๐Ÿค", "๐Ÿ’–", "๐Ÿ’—", "๐Ÿ’", } -func (p *WorkerPool[I, R]) composeID() WorkerID { +func (p *WorkerPool[I, O]) composeID() WorkerID { n := len(p.private.pool) + 1 emoji := eyeballs[(n-1)%p.noWorkers] return WorkerID(fmt.Sprintf("(%v)WORKER-ID-%v:%v", emoji, n, uuid.NewString())) } -func (p *WorkerPool[I, R]) Start( +func (p *WorkerPool[I, O]) Start( ctx context.Context, - resultsChOut ResultStreamOut[R], + outputsChOut OutputStreamW[O], ) { - p.run(ctx, p.private.workersJobsCh, resultsChOut) + p.run(ctx, p.private.workersJobsCh, outputsChOut) } -func (p *WorkerPool[I, R]) run( +func (p *WorkerPool[I, O]) run( ctx context.Context, - forwardChOut chan<- Job[I], - resultsChOut ResultStreamOut[R], + forwardChOut JobStreamW[I], + outputsChOut OutputStreamW[O], ) { defer func() { - close(resultsChOut) + close(outputsChOut) p.Quit.Done() fmt.Printf("<--- WorkerPool.run (QUIT). ๐ŸงŠ๐ŸงŠ๐ŸงŠ\n") }() @@ -126,7 +126,7 @@ func (p *WorkerPool[I, R]) run( ) if len(p.private.pool) < p.noWorkers { - p.spawn(ctx, p.private.workersJobsCh, resultsChOut, p.private.finishedCh) + p.spawn(ctx, p.private.workersJobsCh, outputsChOut, p.private.finishedCh) } select { case forwardChOut <- job: @@ -163,20 +163,20 @@ func (p *WorkerPool[I, R]) run( ) } -func (p *WorkerPool[I, R]) spawn( +func (p *WorkerPool[I, O]) spawn( ctx context.Context, - jobsChIn JobStreamIn[I], - resultsChOut ResultStreamOut[R], - finishedChOut FinishedStreamOut, + jobsChIn JobStreamR[I], + outputsChOut OutputStreamW[O], + finishedChOut FinishedStreamW, ) { - cancelCh := make(chan CancelWorkSignal, 1) + cancelCh := make(CancelStream, 1) - w := &workerWrapper[I, R]{ - core: &worker[I, R]{ + w := &workerWrapper[I, O]{ + core: &worker[I, O]{ id: p.composeID(), exec: p.exec, jobsChIn: jobsChIn, - resultsChOut: resultsChOut, + outputsChOut: outputsChOut, finishedChOut: finishedChOut, cancelChIn: cancelCh, }, @@ -188,7 +188,7 @@ func (p *WorkerPool[I, R]) spawn( fmt.Printf("===> ๐ŸงŠ WorkerPool.spawned new worker: '%v' ๐ŸŽ€๐ŸŽ€๐ŸŽ€\n", w.core.id) } -func (p *WorkerPool[I, R]) drain(finishedChIn FinishedStreamIn) { +func (p *WorkerPool[I, O]) drain(finishedChIn FinishedStreamR) { fmt.Printf( "!!!! ๐ŸงŠ WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); ๐ŸงŠ๐ŸงŠ๐ŸงŠ \n", len(p.private.pool), runtime.NumGoroutine(), @@ -246,7 +246,7 @@ func (p *WorkerPool[I, R]) drain(finishedChIn FinishedStreamIn) { } } -func (p *WorkerPool[I, R]) cancelWorkers() { +func (p *WorkerPool[I, O]) cancelWorkers() { // perhaps, we can replace this with another broadcast mechanism such as sync.Cond // n := len(p.private.pool) diff --git a/async/worker-pool_test.go b/async/worker-pool_test.go index 6f958dd..fdb5862 100644 --- a/async/worker-pool_test.go +++ b/async/worker-pool_test.go @@ -21,16 +21,16 @@ func init() { rand.Seed(time.Now().Unix()) } // TerminatorFunc brings the work pool processing to an end, eg // by stopping or cancellation after the requested amount of time. -type TerminatorFunc[I, R any] func(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) +type TerminatorFunc[I, O any] func(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) -func (f TerminatorFunc[I, R]) After(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) { +func (f TerminatorFunc[I, O]) After(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) { f(ctx, delay, funcs...) } const ( - JobChSize = 10 - ResultChSize = 10 - Delay = 750 + JobChSize = 10 + OutputsChSize = 10 + Delay = 750 ) var ( @@ -58,15 +58,15 @@ type TestJobInput struct { Recipient string } -type TestJobResult = string -type TestResultStream chan async.JobResult[TestJobResult] +type TestJobOutput = string +type TestOutputStream chan async.JobOutput[TestJobOutput] -var greeter = func(j async.Job[TestJobInput]) (async.JobResult[TestJobResult], error) { +var greeter = func(j async.Job[TestJobInput]) (async.JobOutput[TestJobOutput], error) { r := rand.Intn(1000) + 1 //nolint:gosec // trivial delay := time.Millisecond * time.Duration(r) time.Sleep(delay) - result := async.JobResult[TestJobResult]{ + result := async.JobOutput[TestJobOutput]{ Payload: fmt.Sprintf(" ---> ๐Ÿ‰๐Ÿ‰๐Ÿ‰ [Seq: %v] Hello: '%v'", j.SequenceNo, j.Input.Recipient, ), @@ -75,21 +75,21 @@ var greeter = func(j async.Job[TestJobInput]) (async.JobResult[TestJobResult], e return result, nil } -type pipeline[I, R any] struct { +type pipeline[I, O any] struct { wg sync.WaitGroup sequence int - resultsCh chan async.JobResult[R] + outputsCh chan async.JobOutput[O] provider helpers.ProviderFunc[I] - producer *helpers.Producer[I, R] - pool *async.WorkerPool[I, R] - consumer *helpers.Consumer[R] - cancel TerminatorFunc[I, R] - stop TerminatorFunc[I, R] + producer *helpers.Producer[I, O] + pool *async.WorkerPool[I, O] + consumer *helpers.Consumer[O] + cancel TerminatorFunc[I, O] + stop TerminatorFunc[I, O] } -func start[I, R any]() *pipeline[I, R] { - pipe := &pipeline[I, R]{ - resultsCh: make(chan async.JobResult[R], ResultChSize), +func start[I, O any]() *pipeline[I, O] { + pipe := &pipeline[I, O]{ + outputsCh: make(chan async.JobOutput[O], OutputsChSize), stop: noOp, cancel: noOp, } @@ -97,9 +97,9 @@ func start[I, R any]() *pipeline[I, R] { return pipe } -func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderFunc[I]) { +func (p *pipeline[I, O]) produce(ctx context.Context, provider helpers.ProviderFunc[I]) { p.cancel = func(ctx context.Context, delay time.Duration, cancellations ...context.CancelFunc) { - go helpers.CancelProducerAfter[I, R]( + go helpers.CancelProducerAfter[I, O]( delay, cancellations..., ) @@ -112,7 +112,7 @@ func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderF ) } - p.producer = helpers.StartProducer[I, R]( + p.producer = helpers.StartProducer[I, O]( ctx, &p.wg, JobChSize, @@ -123,9 +123,9 @@ func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderF p.wg.Add(1) } -func (p *pipeline[I, R]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, R]) { - p.pool = async.NewWorkerPool[I, R]( - &async.NewWorkerPoolParams[I, R]{ +func (p *pipeline[I, O]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, O]) { + p.pool = async.NewWorkerPool[I, O]( + &async.NewWorkerPoolParams[I, O]{ NoWorkers: noWorkers, Exec: executive, JobsCh: p.producer.JobsCh, @@ -133,15 +133,15 @@ func (p *pipeline[I, R]) process(ctx context.Context, noWorkers int, executive a Quit: &p.wg, }) - go p.pool.Start(ctx, p.resultsCh) + go p.pool.Start(ctx, p.outputsCh) p.wg.Add(1) } -func (p *pipeline[I, R]) consume(ctx context.Context) { +func (p *pipeline[I, O]) consume(ctx context.Context) { p.consumer = helpers.StartConsumer(ctx, &p.wg, - p.resultsCh, + p.outputsCh, ) p.wg.Add(1) @@ -152,7 +152,7 @@ var _ = Describe("WorkerPool", func() { Context("and: Stopped", func() { It("๐Ÿงช should: receive and process all", func(ctx SpecContext) { defer leaktest.Check(GinkgoT())() - pipe := start[TestJobInput, TestJobResult]() + pipe := start[TestJobInput, TestJobOutput]() By("๐Ÿ‘พ WAIT-GROUP ADD(producer)") pipe.produce(ctx, func() TestJobInput { @@ -178,7 +178,7 @@ var _ = Describe("WorkerPool", func() { ) Expect(pipe.producer.Count).To(Equal(pipe.consumer.Count)) - Eventually(ctx, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed()) + Eventually(ctx, pipe.outputsCh).WithTimeout(time.Second * 5).Should(BeClosed()) Eventually(ctx, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed()) }, SpecTimeout(time.Second*5)) }) @@ -186,7 +186,7 @@ var _ = Describe("WorkerPool", func() { Context("and: Cancelled", func() { It("๐Ÿงช should: handle cancellation and shutdown cleanly", func(ctxSpec SpecContext) { defer leaktest.Check(GinkgoT())() - pipe := start[TestJobInput, TestJobResult]() + pipe := start[TestJobInput, TestJobOutput]() ctxCancel, cancel := context.WithCancel(ctxSpec) cancellations := []context.CancelFunc{cancel} @@ -221,7 +221,7 @@ var _ = Describe("WorkerPool", func() { // which jobs were not processed, each indicated with their corresponding Input // value. - // Eventually(ctxCancel, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed()) + // Eventually(ctxCancel, pipe.outputsCh).WithTimeout(time.Second * 5).Should(BeClosed()) // Eventually(ctxCancel, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed()) }, SpecTimeout(time.Second*5)) diff --git a/async/worker.go b/async/worker.go index 60ed9e9..81f05c1 100644 --- a/async/worker.go +++ b/async/worker.go @@ -5,19 +5,19 @@ import ( "fmt" ) -type worker[I any, R any] struct { +type worker[I any, O any] struct { id WorkerID - exec ExecutiveFunc[I, R] - jobsChIn JobStreamIn[I] - resultsChOut ResultStreamOut[R] - finishedChOut FinishedStreamOut + exec ExecutiveFunc[I, O] + jobsChIn JobStreamR[I] + outputsChOut OutputStreamW[O] + finishedChOut FinishedStreamW // this might be better replaced with a broadcast mechanism such as sync.Cond // - cancelChIn CancelStreamIn + cancelChIn CancelStreamR } -func (w *worker[I, R]) run(ctx context.Context) { +func (w *worker[I, O]) run(ctx context.Context) { defer func() { w.finishedChOut <- w.id // โš ๏ธ non-pre-emptive send, but this should be ok fmt.Printf(" <--- ๐Ÿš€ worker.run(%v) (SENT FINISHED). ๐Ÿš€๐Ÿš€๐Ÿš€\n", w.id) @@ -43,13 +43,13 @@ func (w *worker[I, R]) run(ctx context.Context) { } } -func (w *worker[I, R]) invoke(ctx context.Context, job Job[I]) { +func (w *worker[I, O]) invoke(ctx context.Context, job Job[I]) { result, _ := w.exec(job) select { case <-ctx.Done(): fmt.Printf(" ---> ๐Ÿš€ worker.invoke(%v)(cancel) - done received ๐Ÿ’ฅ๐Ÿ’ฅ๐Ÿ’ฅ\n", w.id) - case w.resultsChOut <- result: + case w.outputsChOut <- result: } } diff --git a/internal/helpers/test-consumer.go b/internal/helpers/test-consumer.go index e50f8da..1666823 100644 --- a/internal/helpers/test-consumer.go +++ b/internal/helpers/test-consumer.go @@ -8,27 +8,27 @@ import ( "github.com/snivilised/lorax/async" ) -type Consumer[R any] struct { - ResultsChIn async.ResultStreamIn[R] +type Consumer[O any] struct { quit *sync.WaitGroup + OutputsChIn async.OutputStreamR[O] Count int } -func StartConsumer[R any]( +func StartConsumer[O any]( ctx context.Context, wg *sync.WaitGroup, - resultsChIn async.ResultStreamIn[R], -) *Consumer[R] { - consumer := &Consumer[R]{ - ResultsChIn: resultsChIn, + outputsChIn async.OutputStreamR[O], +) *Consumer[O] { + consumer := &Consumer[O]{ quit: wg, + OutputsChIn: outputsChIn, } go consumer.run(ctx) return consumer } -func (c *Consumer[R]) run(ctx context.Context) { +func (c *Consumer[O]) run(ctx context.Context) { defer func() { c.quit.Done() fmt.Printf("<<<< consumer.run - finished (QUIT). ๐Ÿ’ ๐Ÿ’ ๐Ÿ’  \n") @@ -42,7 +42,7 @@ func (c *Consumer[R]) run(ctx context.Context) { fmt.Println("<<<< ๐Ÿ’  consumer.run - done received ๐Ÿ’”๐Ÿ’”๐Ÿ’”") - case result, ok := <-c.ResultsChIn: + case result, ok := <-c.OutputsChIn: if ok { c.Count++ fmt.Printf("<<<< ๐Ÿ’  consumer.run - new result arrived(#%v): '%+v' \n", diff --git a/internal/helpers/test-producer.go b/internal/helpers/test-producer.go index 3aec359..a7de0d2 100644 --- a/internal/helpers/test-producer.go +++ b/internal/helpers/test-producer.go @@ -10,45 +10,48 @@ import ( "github.com/snivilised/lorax/async" ) +type termination string +type terminationStream chan termination + type ProviderFunc[I any] func() I -type Producer[I, R any] struct { - sequenceNo int - JobsCh async.JobStream[I] +type Producer[I, O any] struct { quit *sync.WaitGroup - Count int + sequenceNo int provider ProviderFunc[I] delay int - terminateCh chan string + terminateCh terminationStream + JobsCh async.JobStream[I] + Count int } // The producer owns the Jobs channel as it knows when to close it. This producer is // a fake producer and exposes a stop method that the client go routing can call to // indicate end of the work load. -func StartProducer[I, R any]( +func StartProducer[I, O any]( ctx context.Context, wg *sync.WaitGroup, capacity int, provider ProviderFunc[I], delay int, -) *Producer[I, R] { +) *Producer[I, O] { if delay == 0 { panic(fmt.Sprintf("Invalid delay requested: '%v'", delay)) } - producer := Producer[I, R]{ - JobsCh: make(async.JobStream[I], capacity), + producer := Producer[I, O]{ quit: wg, provider: provider, delay: delay, - terminateCh: make(chan string), + terminateCh: make(terminationStream), + JobsCh: make(async.JobStream[I], capacity), } go producer.run(ctx) return &producer } -func (p *Producer[I, R]) run(ctx context.Context) { +func (p *Producer[I, O]) run(ctx context.Context) { defer func() { close(p.JobsCh) p.quit.Done() @@ -78,7 +81,7 @@ func (p *Producer[I, R]) run(ctx context.Context) { } } -func (p *Producer[I, R]) item(ctx context.Context) bool { +func (p *Producer[I, O]) item(ctx context.Context) bool { p.sequenceNo++ p.Count++ @@ -110,16 +113,16 @@ func (p *Producer[I, R]) item(ctx context.Context) bool { return result } -func (p *Producer[I, R]) Stop() { +func (p *Producer[I, O]) Stop() { fmt.Println(">>>> ๐Ÿงฒ producer terminating ...") - p.terminateCh <- "done" + p.terminateCh <- termination("done") close(p.terminateCh) } // StopProducerAfter, run in a new go routine -func StopProducerAfter[I, R any]( +func StopProducerAfter[I, O any]( ctx context.Context, - producer *Producer[I, R], + producer *Producer[I, O], delay time.Duration, ) { fmt.Printf(" >>> ๐Ÿ’ค StopAfter - Sleeping before requesting stop (%v) ...\n", delay) @@ -132,7 +135,7 @@ func StopProducerAfter[I, R any]( fmt.Printf(" >>> StopAfter - ๐Ÿง๐Ÿง๐Ÿง stop submitted.\n") } -func CancelProducerAfter[I, R any]( +func CancelProducerAfter[I, O any]( delay time.Duration, cancellation ...context.CancelFunc, ) {