Skip to content

Commit

Permalink
ref(async): rename type defs/generics (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 21, 2023
1 parent 2eabdba commit ba8aa93
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 114 deletions.
6 changes: 3 additions & 3 deletions async/pool-defs-internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ const (
DefaultChSize = 100
)

type workerWrapper[I any, R any] struct {
type workerWrapper[I any, O any] struct {
cancelChOut chan<- CancelWorkSignal
core *worker[I, R]
core *worker[I, O]
}

type workersCollection[I, R any] map[WorkerID]*workerWrapper[I, R]
type workersCollection[I, O any] map[WorkerID]*workerWrapper[I, O]
26 changes: 13 additions & 13 deletions async/pool-defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,30 @@ type Job[I any] struct {
SequenceNo int
}

type ExecutiveFunc[I, R any] func(j Job[I]) (JobResult[R], error)
type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error)

func (f ExecutiveFunc[I, R]) Invoke(j Job[I]) (JobResult[R], error) {
func (f ExecutiveFunc[I, O]) Invoke(j Job[I]) (JobOutput[O], error) {
return f(j)
}

type JobResult[R any] struct {
Payload R
type JobOutput[O any] struct {
Payload O
}

type JobStream[I any] chan Job[I]
type JobStreamIn[I any] <-chan Job[I]
type JobStreamOut[I any] chan<- Job[I]
type JobStreamR[I any] <-chan Job[I]
type JobStreamW[I any] chan<- Job[I]

type ResultStream[R any] chan JobResult[R]
type ResultStreamIn[R any] <-chan JobResult[R]
type ResultStreamOut[R any] chan<- JobResult[R]
type OutputStream[O any] chan JobOutput[O]
type OutputStreamR[O any] <-chan JobOutput[O]
type OutputStreamW[O any] chan<- JobOutput[O]

type CancelWorkSignal struct{}
type CancelStream = chan CancelWorkSignal
type CancelStreamIn = <-chan CancelWorkSignal
type CancelStreamOut = chan<- CancelWorkSignal
type CancelStreamR = <-chan CancelWorkSignal
type CancelStreamW = chan<- CancelWorkSignal

type WorkerID string
type FinishedStream = chan WorkerID
type FinishedStreamIn = <-chan WorkerID
type FinishedStreamOut = chan<- WorkerID
type FinishedStreamR = <-chan WorkerID
type FinishedStreamW = chan<- WorkerID
62 changes: 31 additions & 31 deletions async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
// that any channel defined in privateWpInfo should never to accessed directly (other
// than for passing it to another method). This is an experimental convention that
// I'm establishing for all snivilised projects.
type privateWpInfo[I, R any] struct {
pool workersCollection[I, R]
type privateWpInfo[I, O any] struct {
pool workersCollection[I, O]
workersJobsCh chan Job[I]
finishedCh FinishedStream
cancelCh CancelStream
Expand All @@ -40,32 +40,32 @@ type privateWpInfo[I, R any] struct {
// WorkerPool owns the resultOut channel, because it is the only entity that knows
// when all workers have completed their work due to the finished channel, which it also
// owns.
type WorkerPool[I, R any] struct {
private privateWpInfo[I, R]
exec ExecutiveFunc[I, R]
type WorkerPool[I, O any] struct {
private privateWpInfo[I, O]
exec ExecutiveFunc[I, O]
noWorkers int
SourceJobsChIn JobStreamIn[I]
SourceJobsChIn JobStreamR[I]

Quit *sync.WaitGroup
}

type NewWorkerPoolParams[I, R any] struct {
type NewWorkerPoolParams[I, O any] struct {
NoWorkers int
Exec ExecutiveFunc[I, R]
Exec ExecutiveFunc[I, O]
JobsCh chan Job[I]
CancelCh CancelStream
Quit *sync.WaitGroup
}

func NewWorkerPool[I, R any](params *NewWorkerPoolParams[I, R]) *WorkerPool[I, R] {
func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] {
noWorkers := runtime.NumCPU()
if params.NoWorkers > 1 && params.NoWorkers <= MaxWorkers {
noWorkers = params.NoWorkers
}

wp := &WorkerPool[I, R]{
private: privateWpInfo[I, R]{
pool: make(workersCollection[I, R], noWorkers),
wp := &WorkerPool[I, O]{
private: privateWpInfo[I, O]{
pool: make(workersCollection[I, O], noWorkers),
workersJobsCh: make(chan Job[I], noWorkers),
finishedCh: make(FinishedStream, noWorkers),
cancelCh: params.CancelCh,
Expand All @@ -86,27 +86,27 @@ var eyeballs = []string{
"❤️", "💙", "💚", "💜", "💛", "🤍", "💖", "💗", "💝",
}

func (p *WorkerPool[I, R]) composeID() WorkerID {
func (p *WorkerPool[I, O]) composeID() WorkerID {
n := len(p.private.pool) + 1
emoji := eyeballs[(n-1)%p.noWorkers]

return WorkerID(fmt.Sprintf("(%v)WORKER-ID-%v:%v", emoji, n, uuid.NewString()))
}

func (p *WorkerPool[I, R]) Start(
func (p *WorkerPool[I, O]) Start(
ctx context.Context,
resultsChOut ResultStreamOut[R],
outputsChOut OutputStreamW[O],
) {
p.run(ctx, p.private.workersJobsCh, resultsChOut)
p.run(ctx, p.private.workersJobsCh, outputsChOut)
}

func (p *WorkerPool[I, R]) run(
func (p *WorkerPool[I, O]) run(
ctx context.Context,
forwardChOut chan<- Job[I],
resultsChOut ResultStreamOut[R],
forwardChOut JobStreamW[I],
outputsChOut OutputStreamW[O],
) {
defer func() {
close(resultsChOut)
close(outputsChOut)
p.Quit.Done()
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}()
Expand All @@ -126,7 +126,7 @@ func (p *WorkerPool[I, R]) run(
)

if len(p.private.pool) < p.noWorkers {
p.spawn(ctx, p.private.workersJobsCh, resultsChOut, p.private.finishedCh)
p.spawn(ctx, p.private.workersJobsCh, outputsChOut, p.private.finishedCh)
}
select {
case forwardChOut <- job:
Expand Down Expand Up @@ -163,20 +163,20 @@ func (p *WorkerPool[I, R]) run(
)
}

func (p *WorkerPool[I, R]) spawn(
func (p *WorkerPool[I, O]) spawn(
ctx context.Context,
jobsChIn JobStreamIn[I],
resultsChOut ResultStreamOut[R],
finishedChOut FinishedStreamOut,
jobsChIn JobStreamR[I],
outputsChOut OutputStreamW[O],
finishedChOut FinishedStreamW,
) {
cancelCh := make(chan CancelWorkSignal, 1)
cancelCh := make(CancelStream, 1)

w := &workerWrapper[I, R]{
core: &worker[I, R]{
w := &workerWrapper[I, O]{
core: &worker[I, O]{
id: p.composeID(),
exec: p.exec,
jobsChIn: jobsChIn,
resultsChOut: resultsChOut,
outputsChOut: outputsChOut,
finishedChOut: finishedChOut,
cancelChIn: cancelCh,
},
Expand All @@ -188,7 +188,7 @@ func (p *WorkerPool[I, R]) spawn(
fmt.Printf("===> 🧊 WorkerPool.spawned new worker: '%v' 🎀🎀🎀\n", w.core.id)
}

func (p *WorkerPool[I, R]) drain(finishedChIn FinishedStreamIn) {
func (p *WorkerPool[I, O]) drain(finishedChIn FinishedStreamR) {
fmt.Printf(
"!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊 \n",
len(p.private.pool), runtime.NumGoroutine(),
Expand Down Expand Up @@ -246,7 +246,7 @@ func (p *WorkerPool[I, R]) drain(finishedChIn FinishedStreamIn) {
}
}

func (p *WorkerPool[I, R]) cancelWorkers() {
func (p *WorkerPool[I, O]) cancelWorkers() {
// perhaps, we can replace this with another broadcast mechanism such as sync.Cond
//
n := len(p.private.pool)
Expand Down
64 changes: 32 additions & 32 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ func init() { rand.Seed(time.Now().Unix()) }

// TerminatorFunc brings the work pool processing to an end, eg
// by stopping or cancellation after the requested amount of time.
type TerminatorFunc[I, R any] func(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc)
type TerminatorFunc[I, O any] func(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc)

func (f TerminatorFunc[I, R]) After(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) {
func (f TerminatorFunc[I, O]) After(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) {
f(ctx, delay, funcs...)
}

const (
JobChSize = 10
ResultChSize = 10
Delay = 750
JobChSize = 10
OutputsChSize = 10
Delay = 750
)

var (
Expand Down Expand Up @@ -58,15 +58,15 @@ type TestJobInput struct {
Recipient string
}

type TestJobResult = string
type TestResultStream chan async.JobResult[TestJobResult]
type TestJobOutput = string
type TestOutputStream chan async.JobOutput[TestJobOutput]

var greeter = func(j async.Job[TestJobInput]) (async.JobResult[TestJobResult], error) {
var greeter = func(j async.Job[TestJobInput]) (async.JobOutput[TestJobOutput], error) {
r := rand.Intn(1000) + 1 //nolint:gosec // trivial
delay := time.Millisecond * time.Duration(r)
time.Sleep(delay)

result := async.JobResult[TestJobResult]{
result := async.JobOutput[TestJobOutput]{
Payload: fmt.Sprintf(" ---> 🍉🍉🍉 [Seq: %v] Hello: '%v'",
j.SequenceNo, j.Input.Recipient,
),
Expand All @@ -75,31 +75,31 @@ var greeter = func(j async.Job[TestJobInput]) (async.JobResult[TestJobResult], e
return result, nil
}

type pipeline[I, R any] struct {
type pipeline[I, O any] struct {
wg sync.WaitGroup
sequence int
resultsCh chan async.JobResult[R]
outputsCh chan async.JobOutput[O]
provider helpers.ProviderFunc[I]
producer *helpers.Producer[I, R]
pool *async.WorkerPool[I, R]
consumer *helpers.Consumer[R]
cancel TerminatorFunc[I, R]
stop TerminatorFunc[I, R]
producer *helpers.Producer[I, O]
pool *async.WorkerPool[I, O]
consumer *helpers.Consumer[O]
cancel TerminatorFunc[I, O]
stop TerminatorFunc[I, O]
}

func start[I, R any]() *pipeline[I, R] {
pipe := &pipeline[I, R]{
resultsCh: make(chan async.JobResult[R], ResultChSize),
func start[I, O any]() *pipeline[I, O] {
pipe := &pipeline[I, O]{
outputsCh: make(chan async.JobOutput[O], OutputsChSize),
stop: noOp,
cancel: noOp,
}

return pipe
}

func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderFunc[I]) {
func (p *pipeline[I, O]) produce(ctx context.Context, provider helpers.ProviderFunc[I]) {
p.cancel = func(ctx context.Context, delay time.Duration, cancellations ...context.CancelFunc) {
go helpers.CancelProducerAfter[I, R](
go helpers.CancelProducerAfter[I, O](
delay,
cancellations...,
)
Expand All @@ -112,7 +112,7 @@ func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderF
)
}

p.producer = helpers.StartProducer[I, R](
p.producer = helpers.StartProducer[I, O](
ctx,
&p.wg,
JobChSize,
Expand All @@ -123,25 +123,25 @@ func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderF
p.wg.Add(1)
}

func (p *pipeline[I, R]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, R]) {
p.pool = async.NewWorkerPool[I, R](
&async.NewWorkerPoolParams[I, R]{
func (p *pipeline[I, O]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, O]) {
p.pool = async.NewWorkerPool[I, O](
&async.NewWorkerPoolParams[I, O]{
NoWorkers: noWorkers,
Exec: executive,
JobsCh: p.producer.JobsCh,
CancelCh: make(async.CancelStream),
Quit: &p.wg,
})

go p.pool.Start(ctx, p.resultsCh)
go p.pool.Start(ctx, p.outputsCh)

p.wg.Add(1)
}

func (p *pipeline[I, R]) consume(ctx context.Context) {
func (p *pipeline[I, O]) consume(ctx context.Context) {
p.consumer = helpers.StartConsumer(ctx,
&p.wg,
p.resultsCh,
p.outputsCh,
)

p.wg.Add(1)
Expand All @@ -152,7 +152,7 @@ var _ = Describe("WorkerPool", func() {
Context("and: Stopped", func() {
It("🧪 should: receive and process all", func(ctx SpecContext) {
defer leaktest.Check(GinkgoT())()
pipe := start[TestJobInput, TestJobResult]()
pipe := start[TestJobInput, TestJobOutput]()

By("👾 WAIT-GROUP ADD(producer)")
pipe.produce(ctx, func() TestJobInput {
Expand All @@ -178,15 +178,15 @@ var _ = Describe("WorkerPool", func() {
)

Expect(pipe.producer.Count).To(Equal(pipe.consumer.Count))
Eventually(ctx, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed())
Eventually(ctx, pipe.outputsCh).WithTimeout(time.Second * 5).Should(BeClosed())
Eventually(ctx, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed())
}, SpecTimeout(time.Second*5))
})

Context("and: Cancelled", func() {
It("🧪 should: handle cancellation and shutdown cleanly", func(ctxSpec SpecContext) {
defer leaktest.Check(GinkgoT())()
pipe := start[TestJobInput, TestJobResult]()
pipe := start[TestJobInput, TestJobOutput]()

ctxCancel, cancel := context.WithCancel(ctxSpec)
cancellations := []context.CancelFunc{cancel}
Expand Down Expand Up @@ -221,7 +221,7 @@ var _ = Describe("WorkerPool", func() {
// which jobs were not processed, each indicated with their corresponding Input
// value.

// Eventually(ctxCancel, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed())
// Eventually(ctxCancel, pipe.outputsCh).WithTimeout(time.Second * 5).Should(BeClosed())
// Eventually(ctxCancel, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed())

}, SpecTimeout(time.Second*5))
Expand Down
Loading

0 comments on commit ba8aa93

Please sign in to comment.