-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
redesign worker GR lifetime to span lifetime of app instead of a new GR for each job #4
Comments
Some temp test code ...
When("given: a stream of jobs", func() {
It("🧪 should: receive and process all", func(specCtx SpecContext) {
var (
wg sync.WaitGroup
)
sequence := 0
resultsCh := make(chan async.JobResult[TestJobResult], ResultChSize)
By("👾 WAIT-GROUP ADD(producer)")
wg.Add(1)
provider := func() TestJobInput {
sequence++
return TestJobInput{
sequenceNo: sequence,
Recipient: "jimmy 🦊",
}
}
producer := helpers.NewProducer[TestJobInput, TestJobResult](
specCtx, &wg, JobChSize, provider, Delay,
)
pool := async.NewWorkerPool[TestJobInput, TestJobResult](
&async.NewWorkerPoolParams[TestJobInput, TestJobResult]{
Exec: &exec{},
JobsCh: producer.JobsCh,
Cancel: make(async.CancelStream),
Quit: &wg,
})
By("👾 WAIT-GROUP ADD(worker-pool)\n")
wg.Add(1)
go pool.Run(specCtx, resultsCh)
By("👾 WAIT-GROUP ADD(consumer)")
wg.Add(1)
consumer := helpers.NewConsumer(specCtx, &wg, resultsCh)
go func() {
snooze := time.Second / 5
fmt.Printf(" >>> 💤 Sleeping before requesting stop (%v) ...\n", snooze)
time.Sleep(snooze)
producer.Stop()
fmt.Printf(" >>> 🍧🍧🍧 stop submitted.\n")
}()
wg.Wait()
fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
producer.Count,
consumer.Count,
)
Expect(producer.Count).To(Equal(consumer.Count))
Eventually(specCtx, resultsCh).WithTimeout(time.Second * 2).Should(BeClosed())
Eventually(specCtx, producer.JobsCh).WithTimeout(time.Second * 2).Should(BeClosed())
}, SpecTimeout(time.Second*2))
})
When("given: cancellation invoked before end of work", func() {
XIt("🧪 should: close down gracefully", func(specCtx SpecContext) {
// this case shows that worker pool needs a redesign. Each worker
// go routine needs to have a lifetime that spans the lifetime of
// the session, rather than a short lifetime that matches that of
// an individual job. This will make processing more reliable,
// especially when it comes to cancellation. As it is, since the
// worker GR only exists for the lifetime of the job, when the
// job is short (in duration), it is very unlikely it will see
// the cancellation request and therefore and therefore likely
// to send to a closed channel (the result channel).
//
var (
wg sync.WaitGroup
)
sequence := 0
resultsCh := make(chan async.JobResult[TestJobResult], ResultChSize)
By("👾 WAIT-GROUP ADD(producer)")
wg.Add(1)
provider := func() TestJobInput {
sequence++
return TestJobInput{
sequenceNo: sequence,
Recipient: "johnny 😈",
}
}
ctx, cancel := context.WithCancel(specCtx)
producer := helpers.NewProducer[TestJobInput, TestJobResult](ctx, &wg, JobChSize, provider, Delay)
pool := async.NewWorkerPool[TestJobInput, TestJobResult](&async.NewWorkerPoolParams[TestJobInput, TestJobResult]{
Exec: &exec{},
JobsCh: producer.JobsCh,
Cancel: make(async.CancelStream),
Quit: &wg,
})
By("👾 WAIT-GROUP ADD(worker-pool)\n")
wg.Add(1)
go pool.Run(ctx, resultsCh)
By("👾 WAIT-GROUP ADD(consumer)")
wg.Add(1)
consumer := helpers.NewConsumer(ctx, &wg, resultsCh)
go func() {
snooze := time.Second / 10
fmt.Printf(" >>> 💤 Sleeping before requesting cancellation (%v) ...\n", snooze)
time.Sleep(snooze)
cancel()
fmt.Printf(" >>> 🍧🍧🍧 cancel submitted.\n")
}()
wg.Wait()
fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
producer.Count,
consumer.Count,
)
Eventually(specCtx, resultsCh).WithTimeout(time.Second * 2).Should(BeClosed())
Eventually(specCtx, producer.JobsCh).WithTimeout(time.Second * 2).Should(BeClosed())
}, SpecTimeout(time.Second*2))
})
Context("ginkgo consumer", func() {
It("🧪 should: receive and process all", func(specCtx SpecContext) {
var (
wg sync.WaitGroup
)
sequence := 0
resultsCh := make(chan async.JobResult[TestJobResult], ResultChSize)
By("👾 WAIT-GROUP ADD(producer)")
wg.Add(1)
provider := func() TestJobInput {
sequence++
return TestJobInput{
sequenceNo: sequence,
Recipient: "cosmo 👽",
}
}
producer := helpers.NewProducer[TestJobInput, TestJobResult](specCtx, &wg, JobChSize, provider, Delay)
pool := async.NewWorkerPool[TestJobInput, TestJobResult](&async.NewWorkerPoolParams[TestJobInput, TestJobResult]{
Exec: &exec{},
JobsCh: producer.JobsCh,
Cancel: make(async.CancelStream),
Quit: &wg,
})
By("👾 WAIT-GROUP ADD(worker-pool)\n")
wg.Add(1)
go pool.Run(specCtx, resultsCh)
By("👾 WAIT-GROUP ADD(consumer)")
wg.Add(1)
consumer := helpers.NewConsumer(specCtx, &wg, resultsCh)
go func() {
snooze := time.Second / 5
fmt.Printf(" >>> 💤 Sleeping before requesting stop (%v) ...\n", snooze)
time.Sleep(snooze)
producer.Stop()
fmt.Printf(" >>> 🍧🍧🍧 stop submitted.\n")
}()
wg.Wait()
fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
producer.Count,
consumer.Count,
)
Expect(producer.Count).To(Equal(consumer.Count))
Eventually(specCtx, resultsCh).WithTimeout(time.Second * 2).Should(BeClosed())
Eventually(specCtx, producer.JobsCh).WithTimeout(time.Second * 2).Should(BeClosed())
}, SpecTimeout(time.Second*2))
}) |
Some temp source code
// Run
func (p *WorkerPool[I, R]) Run(ctx context.Context, resultsOut ResultStreamOut[R]) {
defer func() {
fmt.Printf("<--- WorkerPool finished (Quit). 🧊🧊🧊\n")
p.Quit.Done()
close(resultsOut)
}()
fmt.Printf("---> 🧊 WorkerPool.Run (with %v workers)\n", p.noWorkers)
for running := true; running; {
select {
case <-ctx.Done():
fmt.Println("---> 🧊 WorkerPool.Run - done received ☢️☢️☢️")
running = false
case job, ok := <-p.JobsCh:
if ok {
fmt.Println("---> 🧊 WorkerPool.Run - new job received")
p.dispatch(ctx, &workerInfo[I, R]{
job: job,
resultsOutCh: resultsOut,
finishedOut: p.finishedCh,
})
} else {
running = false
}
case workerID := <-p.finishedCh:
fmt.Printf("---> 🧊 WorkerPool.Run - worker(%v) finished\n", workerID)
delete(p.private.pool, workerID)
}
}
// we still need to wait for all workers to finish ...
//
p.drain(ctx)
}
func (p *WorkerPool[I, R]) dispatch(ctx context.Context, info *workerInfo[I, R]) {
w := &workerWrapper[I, R]{
core: &worker[I, R]{
id: p.composeID(),
fn: p.fn,
},
}
p.private.pool[w.core.id] = w
fmt.Printf("---> 🧊 (pool-size: %v) dispatch worker: id-'%v'\n", len(p.private.pool), w.core.id)
go w.core.accept(ctx, info) // BREAKS: when cancellation occurs, send on closed chan
}
func (w *worker[I, R]) accept(ctx context.Context, info *workerInfo[I, R]) {
fmt.Printf("---> 🚀 worker.accept: '%v', input:'%v'\n", w.id, info.job.Input)
result, _ := w.fn.Invoke(info.job)
select { // BREAKS: when cancellation occurs, send on closed chan
case <-ctx.Done():
fmt.Println("---> 🚀 worker.accept(result) - done received 💥💥💥")
case info.resultsOutCh <- result:
}
select {
case <-ctx.Done():
fmt.Println("---> 🚀 worker.accept(finished) - done received ❌❌❌")
case info.finishedOut <- w.id:
}
} |
Some speculative join channels code:
func JoinChannels[T any](ctx context.Context, inCh <-chan T, outCh chan<- T, fn JoinChannelsFunc[T]) {
fn(inCh, outCh)
select {
case <-ctx.Done():
fmt.Println("---> 💎 JoinChannels(in) - done received")
case item := <-inCh:
select {
case <-ctx.Done():
fmt.Println("---> 💎 JoinChannels(out) - done received")
case outCh <- item:
}
}
} |
An important discovery has been made whilst working on this issue. Care must be taken when using ctx.Done() channel. Must make sure that it is not abused, once a ctx.Done() has fired, the channel is closed so don't try and reuse in subsequent select statements. This is because subsequent select statements that use this channel will short-circuit and the other cases in the select will not get a chance to be selected. Although this did fix a problem of the drain functionality working, it still hasn't exposed the still remaining problem of the wait() being indefinitely blocked, resulting in the test spec timing out. The current state of the test case run is as indicated below: Output that shows an example run of the spec timing out, blocking on the final wait ...
|
What the above spec output shows us is:
after the capacity of 10 is reached, it looks like the producer starts to block until workers have a chance to consume the jobs (This indeed has been confirmed in another run (not illustrated here), by extending the job size, we see that this burst is extended to match the capacity of the buffer).
... and this also shows us that the results are not guaranteed to be received in order, note the #Count vs the sequence number. Depending the requirements of the client application, this ordering may or may not be significant. In future update, we'll add a feature that guarantees the correct sequencing probably using a reactive model.
... when this time elapses, we see the producer closes the job channel to indicate the end of the workload:
The pool and workers continue to process the remaining workload until the closure of the job channel is detected:
The pool needs to wait for the remaining workers to complete their work. They will continue whilst there are still outstanding jobs in the worker job queue. The pool exits its run loop and enters the drain phase:
This is how the pool waits for the remaining workers.
|
Bingo, I found the problem: when the source job queue closure is detected, we then need to close the workers job queue. This fixed the wait deadlock! So a successful run looks like this: Output that shows an example run of the worker-pool with cancellation ...
|
Another lesson to learn from this issue is that it's not necessary to always use a select statement with a ctx.Done() case. Actually, doing so may even cause problems as I discovered. An example of this is in the worker.run method: func (w *worker[I, R]) run(ctx context.Context) {
defer func() {
w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok
fmt.Printf(" <--- 🚀 worker.run(%v) (SENT FINISHED). 🚀🚀🚀\n", w.id)
}() This defer statement sends a finished notification to the pool, but there is little risk in this ever blocking because it is buffered with its capacity being set to the number of workers, ie there is space for every worker to send its finished notification. Because of this, the send does not need to be made pre-emptive by some other means. So, do not make all interactions with channels pre-emptive, it depends on the situation at hand. |
It was discovered that the current design of the worker pool does not work particularly well in the case where cancellation is requested and the lifetime of a job's execution is minutely small, ie the task to be performed is trivial and only takes milliseconds to run. There is also a perplexing characteristic of there being many more worker GRs being active in the pool than the number of CPUs. It was expected that the number of worker GRs would never exceed the number of CPUs, but this almost never occurs.
When a cancellation is received, the supplementary GRs (representing the producer, consumer and worker pool) did act upon the cancellation request in the expected manner. However, the same could not be said about the worker GRs. Since the supplementary GRs' lifetime were long lived, there was enough time for the Go scheduler to run those GRs correctly. But since, with the current design where a worker's GR only lasts for the lifetime of the job, (which for trivial tasks was minute fraction of time) it was difficult if not impossible for the worker GR to receive the cancellation request and therefore have no check against it sending a message to a closed result channel, resulting in a panic. This was confirmed by introducing a delay into the execution path of the worker, which intermittently resulted in a highr number of worker GRs seeing the cancellation request. The problem however, was that it was not very dependable, which probably indicates a race condition (although having said this, using the -race flag never actually resulted in a race condition being reported; but this maybe to be expected as described by "Concurrency In Go" (CiG), specific reference TBD).
Output that shows an example run of the worker-pool with cancellation ...
Worker attempts to write to closed channel
You can add text within a collapsed section.
You can add an image or a code block, too.
The new design should adhere to the fact that the pool should only spin up workers as when work arrives. It should not spin up everything regardless of the workload as that would be inefficient.
the parent child relationship between the pool and the workers as documented on page 91 CiG, in order to prevent GR leakage. So the source cancellation is handled by the pool which delegates to the workers. ❗Remember, GRs are not garbage collected which gives rise to a possible explanation of the larger than expected number of GRs in the current design of the pool at the end of the processing batch.
in the worker pool, create another struct (possibly embedded) that contains any state that is non synchronised and is for the sole use of the worker-pool thread. This way, we can have a clear separation between state that should be consider private to the worker pool GR and therefore shouldn't need to be synchronised, and state that can be accessed by methods that can also ne accessed by other GRs and therefore should be synchronised. This will help to show clear intent in the code as to which GRs can call specific methods.
go routine leakage can be checked for using uber's goleak, also see Detecting Goroutine Leaks with Test Cases
The text was updated successfully, but these errors were encountered: