Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement pool cancellation #15

Closed
plastikfan opened this issue Aug 12, 2023 · 2 comments · Fixed by #17
Closed

implement pool cancellation #15

plastikfan opened this issue Aug 12, 2023 · 2 comments · Fixed by #17
Assignees
Labels
feature New feature or request

Comments

@plastikfan
Copy link
Contributor

plastikfan commented Aug 12, 2023

Output that shows an example run showing the end of run being triggered by stopping the producer ...
WorkerPool when given: a stream of jobs and: Stopped 🧪 should: receive and process all
/home/plastikfan/dev/github/go/snivilised/lorax/async/worker-pool_test.go:161

  Captured StdOut/StdErr Output >>
  ===> 🧊 WorkerPool.run
                >>> 💤 StopAfter - Sleeping before requesting stop (200ms) ...
  <<<< 💠 consumer.run ...
  >>>> ✨ producer.run ...
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:1 Recipient:💀 skeletor}'
  ===> 🧊 (#workers: '0') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(❤️)WORKER-ID-1:acf800d5-5198-48a3-8e29-d0c6351234ad' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:a6dd3d16-e3da-4287-8c9c-a04fe160c2e0)
        ---> 🚀 worker.run((❤️)WORKER-ID-1:acf800d5-5198-48a3-8e29-d0c6351234ad)(input:'{1 💀 skeletor}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:2 Recipient:👹 ogre}'
  ===> 🧊 (#workers: '1') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💙)WORKER-ID-2:e737ea63-f448-4bed-b271-a7ed8381decd' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:32b4b001-91d7-47c1-b4ab-bb6331c15034)
        ---> 🚀 worker.run((💙)WORKER-ID-2:e737ea63-f448-4bed-b271-a7ed8381decd)(input:'{2 👹 ogre}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:3 Recipient:🧙 gandalf}'
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💚)WORKER-ID-3:7d64a128-5be7-48a7-9289-fa47a3a9513b' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:f0adb058-d971-4890-b4eb-f94332105ea6)
        ---> 🚀 worker.run((💚)WORKER-ID-3:7d64a128-5be7-48a7-9289-fa47a3a9513b)(input:'{3 🧙 gandalf}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:4 Recipient:🐉 smaug}'
  ===> 🧊 (#workers: '3') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💜)WORKER-ID-4:eaae46b6-5647-443b-8806-ac3b88b745b1' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:58228323-0e51-4027-a401-ec27c443dc1a)
        ---> 🚀 worker.run((💜)WORKER-ID-4:eaae46b6-5647-443b-8806-ac3b88b745b1)(input:'{4 🐉 smaug}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:5 Recipient:😺 garfield}'
  ===> 🧊 (#workers: '4') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💛)WORKER-ID-5:ea85f951-fb40-4c06-86d4-e69df4478dd0' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:8e4eb3a0-159e-4bb0-bba8-2281a1630bc0)
        ---> 🚀 worker.run((💛)WORKER-ID-5:ea85f951-fb40-4c06-86d4-e69df4478dd0)(input:'{5 😺 garfield}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:6 Recipient:👹 ogre}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:c38c01ab-fca9-4598-8cc7-f6e831dc75c3)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:7 Recipient:👹 ogre}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:c422caec-73ce-4104-a819-33ad857bd091)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:8 Recipient:👹 ogre}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:d2f02c54-41de-4b99-abfb-5e48fe348f77)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:9 Recipient:💀 skeletor}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:fd485c87-7ea3-4cd0-bf34-c4b4a2bb8870)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:10 Recipient:👻 caspar}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:a5a373cb-5bac-4d15-951f-4a280568c028)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:11 Recipient:👻 caspar}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:12 Recipient:🧙 gandalf}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:13 Recipient:😺 garfield}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:14 Recipient:🧙 gandalf}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:15 Recipient:👽 paul}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:16 Recipient:🦄 pegasus}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:17 Recipient:👹 ogre}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:18 Recipient:💩 poo}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:19 Recipient:🦄 pegasus}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:20 Recipient:💩 poo}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:21 Recipient:👾 xenomorph}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> 🧲 producer terminating ...
        ---> 🚀 worker.run((💚)WORKER-ID-3:7d64a128-5be7-48a7-9289-fa47a3a9513b)(input:'{6 👹 ogre}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:6e3bf32b-702a-428e-a850-070964786a1c)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  >>>> ✨ producer.item, posted item: '{sequenceNo:22 Recipient:🤖 rusty}'
  >>>> ✨ producer.run - termination detected (running: false)
  >>>> producer.run - finished (QUIT). ✨✨✨
                >>> StopAfter - 🍧🍧🍧 stop submitted.
  <<<< 💠 consumer.run - new result arrived(#1): '                      ---> 🍉🍉🍉 [Seq: 3] Hello: '🧙 gandalf''
        ---> 🚀 worker.run((💛)WORKER-ID-5:ea85f951-fb40-4c06-86d4-e69df4478dd0)(input:'{7 👹 ogre}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:b516197f-1d97-4b34-85b4-069228cb1100)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#2): '                      ---> 🍉🍉🍉 [Seq: 5] Hello: '😺 garfield''
        ---> 🚀 worker.run((💙)WORKER-ID-2:e737ea63-f448-4bed-b271-a7ed8381decd)(input:'{8 👹 ogre}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:a8f47181-adb3-4a33-a65d-0235638b1de0)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#3): '                      ---> 🍉🍉🍉 [Seq: 2] Hello: '👹 ogre''
        ---> 🚀 worker.run((💚)WORKER-ID-3:7d64a128-5be7-48a7-9289-fa47a3a9513b)(input:'{9 💀 skeletor}')
  <<<< 💠 consumer.run - new result arrived(#4): '                      ---> 🍉🍉🍉 [Seq: 6] Hello: '👹 ogre''
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:4a8073e0-eb92-4e62-92a9-c836db951f76)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
        ---> 🚀 worker.run((💚)WORKER-ID-3:7d64a128-5be7-48a7-9289-fa47a3a9513b)(input:'{10 👻 caspar}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:f4749725-a84b-4c5b-bf4a-f5d8ea5dd034)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#5): '                      ---> 🍉🍉🍉 [Seq: 9] Hello: '💀 skeletor''
        ---> 🚀 worker.run((❤️)WORKER-ID-1:acf800d5-5198-48a3-8e29-d0c6351234ad)(input:'{11 👻 caspar}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:6b31ecc9-2442-460c-85e5-7edb7085fbb5)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#6): '                      ---> 🍉🍉🍉 [Seq: 1] Hello: '💀 skeletor''
        ---> 🚀 worker.run((💜)WORKER-ID-4:eaae46b6-5647-443b-8806-ac3b88b745b1)(input:'{12 🧙 gandalf}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:07797cdc-35b3-4e36-a5e1-955a3a4b7476)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#7): '                      ---> 🍉🍉🍉 [Seq: 4] Hello: '🐉 smaug''
        ---> 🚀 worker.run((💙)WORKER-ID-2:e737ea63-f448-4bed-b271-a7ed8381decd)(input:'{13 😺 garfield}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:c9136e8f-aa0a-4eb4-91c9-1c5f167c75e5)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#8): '                      ---> 🍉🍉🍉 [Seq: 8] Hello: '👹 ogre''
        ---> 🚀 worker.run((💜)WORKER-ID-4:eaae46b6-5647-443b-8806-ac3b88b745b1)(input:'{14 🧙 gandalf}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:dbac676d-6fa1-43d6-92b4-99097b3f2a97)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#9): '                      ---> 🍉🍉🍉 [Seq: 12] Hello: '🧙 gandalf''
        ---> 🚀 worker.run((💛)WORKER-ID-5:ea85f951-fb40-4c06-86d4-e69df4478dd0)(input:'{15 👽 paul}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:09b1bb8c-7631-4516-bfad-7f55f2163106)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#10): '                     ---> 🍉🍉🍉 [Seq: 7] Hello: '👹 ogre''
        ---> 🚀 worker.run((💚)WORKER-ID-3:7d64a128-5be7-48a7-9289-fa47a3a9513b)(input:'{16 🦄 pegasus}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:5c630a2f-65f2-4cdc-a81a-43f19a9bb126)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#11): '                     ---> 🍉🍉🍉 [Seq: 10] Hello: '👻 caspar''
        ---> 🚀 worker.run((💙)WORKER-ID-2:e737ea63-f448-4bed-b271-a7ed8381decd)(input:'{17 👹 ogre}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:f715ee78-33fd-499c-945a-460a4bba74cf)
  ===> 🚀 WorkerPool.run(source jobs chan closed) 🟥🟥🟥
  !!!! 🧊 WorkerPool.drain - waiting for remaining workers: 5 (#GRs: 18); 🧊🧊🧊
  <<<< 💠 consumer.run - new result arrived(#12): '                     ---> 🍉🍉🍉 [Seq: 13] Hello: '😺 garfield''
        ---> 🚀 worker.run((❤️)WORKER-ID-1:acf800d5-5198-48a3-8e29-d0c6351234ad)(input:'{18 💩 poo}')
  <<<< 💠 consumer.run - new result arrived(#13): '                     ---> 🍉🍉🍉 [Seq: 11] Hello: '👻 caspar''
        ---> 🚀 worker.run((💜)WORKER-ID-4:eaae46b6-5647-443b-8806-ac3b88b745b1)(input:'{19 🦄 pegasus}')
  <<<< 💠 consumer.run - new result arrived(#14): '                     ---> 🍉🍉🍉 [Seq: 14] Hello: '🧙 gandalf''
  <<<< 💠 consumer.run - new result arrived(#15): '                     ---> 🍉🍉🍉 [Seq: 19] Hello: '🦄 pegasus''
        ---> 🚀 worker.run((💜)WORKER-ID-4:eaae46b6-5647-443b-8806-ac3b88b745b1)(input:'{20 💩 poo}')
        ---> 🚀 worker.run((💛)WORKER-ID-5:ea85f951-fb40-4c06-86d4-e69df4478dd0)(input:'{21 👾 xenomorph}')
  <<<< 💠 consumer.run - new result arrived(#16): '                     ---> 🍉🍉🍉 [Seq: 15] Hello: '👽 paul''
        ---> 🚀 worker.run((💛)WORKER-ID-5:ea85f951-fb40-4c06-86d4-e69df4478dd0)(input:'{22 🤖 rusty}')
  <<<< 💠 consumer.run - new result arrived(#17): '                     ---> 🍉🍉🍉 [Seq: 21] Hello: '👾 xenomorph''
        ---> 🚀 worker.run((❤️)WORKER-ID-1:acf800d5-5198-48a3-8e29-d0c6351234ad)(jobs chan closed) 🟥🟥🟥
        <--- 🚀 worker.run((❤️)WORKER-ID-1:acf800d5-5198-48a3-8e29-d0c6351234ad) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((❤️)WORKER-ID-1:acf800d5-5198-48a3-8e29-d0c6351234ad) finished, remaining: '4' 🟥
  <<<< 💠 consumer.run - new result arrived(#18): '                     ---> 🍉🍉🍉 [Seq: 18] Hello: '💩 poo''
        ---> 🚀 worker.run((💜)WORKER-ID-4:eaae46b6-5647-443b-8806-ac3b88b745b1)(jobs chan closed) 🟥🟥🟥
        <--- 🚀 worker.run((💜)WORKER-ID-4:eaae46b6-5647-443b-8806-ac3b88b745b1) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💜)WORKER-ID-4:eaae46b6-5647-443b-8806-ac3b88b745b1) finished, remaining: '3' 🟥
  <<<< 💠 consumer.run - new result arrived(#19): '                     ---> 🍉🍉🍉 [Seq: 20] Hello: '💩 poo''
        ---> 🚀 worker.run((💙)WORKER-ID-2:e737ea63-f448-4bed-b271-a7ed8381decd)(jobs chan closed) 🟥🟥🟥
        <--- 🚀 worker.run((💙)WORKER-ID-2:e737ea63-f448-4bed-b271-a7ed8381decd) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💙)WORKER-ID-2:e737ea63-f448-4bed-b271-a7ed8381decd) finished, remaining: '2' 🟥
  <<<< 💠 consumer.run - new result arrived(#20): '                     ---> 🍉🍉🍉 [Seq: 17] Hello: '👹 ogre''
        ---> 🚀 worker.run((💛)WORKER-ID-5:ea85f951-fb40-4c06-86d4-e69df4478dd0)(jobs chan closed) 🟥🟥🟥
        <--- 🚀 worker.run((💛)WORKER-ID-5:ea85f951-fb40-4c06-86d4-e69df4478dd0) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💛)WORKER-ID-5:ea85f951-fb40-4c06-86d4-e69df4478dd0) finished, remaining: '1' 🟥
  <<<< 💠 consumer.run - new result arrived(#21): '                     ---> 🍉🍉🍉 [Seq: 22] Hello: '🤖 rusty''
        ---> 🚀 worker.run((💚)WORKER-ID-3:7d64a128-5be7-48a7-9289-fa47a3a9513b)(jobs chan closed) 🟥🟥🟥
        <--- 🚀 worker.run((💚)WORKER-ID-3:7d64a128-5be7-48a7-9289-fa47a3a9513b) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💚)WORKER-ID-3:7d64a128-5be7-48a7-9289-fa47a3a9513b) finished, remaining: '0' 🟥
  ===> 🧊 WorkerPool.run - drain complete (workers count: '0'). 🎃🎃🎃
  <--- WorkerPool.run (QUIT). 🧊🧊🧊
  <<<< 💠 consumer.run - new result arrived(#22): '                     ---> 🍉🍉🍉 [Seq: 16] Hello: '🦄 pegasus''
  <<<< 💠 consumer.run - no more results available (running: false)
  <<<< consumer.run - finished (QUIT). 💠💠💠
  <--- orpheus(alpha) finished Counts >>> (Producer: '22', Consumer: '22'). 🎯🎯🎯
  << Captured StdOut/StdErr Output
------------------------------
 SUCCESS! 2.152288413s

We have a couple of options for cancellation.

  • 1️⃣ follow the same route as stopping the producer. Might be easier to implement, but may not be in the spirt of golang concurrency, IE, we should be able to cancel the context and get the logic correct so that GRs are not leaked and there are no deadlocks or race conditions.

  • 2️⃣ implement cancellation via context: This should be the way to fix this, but currently, this is not working due to deadlock.

Given a worker pool, its interface is a channel of jobs and a context in which to run. So we know that the pool will complete when the producer stops. So what about cancellation. If we cancel the producer, the producer can follow the same procedure for a stop. Or do we issue a cancellation to the producer and the pool? If we cancel the pool, how are we guaranteed that the producer is stooped?

What we should say is that the producer and the pool work under the same context. So if the context is cancelled, then the producer should stop and the pool should react to the cancellation.

STOP: When the producer indicates no more work, it closes the job channel. In response to this closure, the pool, closes its worker job channel, exits its run loop to enter the drain loop. When the worker sees that the workers job channel is closed, it exits it run loop, then sends a finished notification to the pool.

So here we have 2 notification for the pool, ie the source jobs closure and the multiple finished notifications from the workers. This interaction works fine.

@plastikfan plastikfan added the feature New feature or request label Aug 12, 2023
@plastikfan plastikfan self-assigned this Aug 12, 2023
@plastikfan
Copy link
Contributor Author

plastikfan commented Aug 13, 2023

some legacy code:

test-producer.go

func CancelProducerAfter[I, R any](
	_ context.Context,
	delay time.Duration,
	funcs ...any,
) {
	fmt.Printf("		>>> 💤 CancelAfter - Sleeping before requesting cancellation (%v) ...\n", delay)
	<-time.After(delay)

	if len(funcs) > 0 {
		c, ok := funcs[0].(context.CancelFunc)
		if ok && c != nil {
			fmt.Printf("		>>> CancelAfter - 🛑🛑🛑 cancellation submitted.\n")
			c()
			fmt.Printf("		>>> CancelAfter - ➖➖➖ CANCELLED\n")
		}
	} else {
		fmt.Printf("		>>> CancelAfter - ✖️✖️✖️ cancellation attempt benign.\n")
	}
}
Output that shows an example run of the worker-pool with cancellation ...
                >>> 💤 CancelAfter - Sleeping before requesting cancellation (200ms) ...
  <<<< 💠 consumer.run ...
  >>>> ✨ producer.run ...
  ===> 🧊 WorkerPool.run
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:1 Recipient:🤖 rusty}'
  ===> 🧊 (#workers: '0') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(❤️)WORKER-ID-1:fd466651-7e61-40d2-a49c-f1ac1a7050fb' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:dbaf8187-7d2f-4669-abb6-1bb379583ce7)
        ---> 🚀 worker.run((❤️)WORKER-ID-1:fd466651-7e61-40d2-a49c-f1ac1a7050fb)(input:'{1 🤖 rusty}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, posted item: '{sequenceNo:2 Recipient:😺 garfield}'
  ===> 🧊 (#workers: '1') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💙)WORKER-ID-2:f90fd9c8-f199-4b46-a042-5cddbd1a3810' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:53f32056-7dbd-4215-a5b3-fd90625033e3)
        ---> 🚀 worker.run((💙)WORKER-ID-2:f90fd9c8-f199-4b46-a042-5cddbd1a3810)(input:'{2 😺 garfield}')
  >>>> ✨ producer.run - default (running: true) ...
  ...
                >>> CancelAfter - 🛑🛑🛑 cancellation submitted.
                >>> CancelAfter - ➖➖➖ CANCELLED
        ---> 🚀 worker.run((❤️)WORKER-ID-1:fd466651-7e61-40d2-a49c-f1ac1a7050fb)(input:'{6 👾 xenomorph}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:4ea4da8f-edf1-43a7-a752-97957c13a858)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  >>>> ✨ producer.item, posted item: '{sequenceNo:22 Recipient:💩 poo}'
  <<<< 💠 consumer.run - new result arrived(#1): '                      ---> 🍉🍉🍉 [Seq: 1] Hello: '🤖 rusty''
  >>>> ✨ producer.run - default (running: true) ...
...
        ---> 🚀 worker.run((💙)WORKER-ID-2:f90fd9c8-f199-4b46-a042-5cddbd1a3810)(input:'{49 🐉 smaug}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:e074e827-f94d-44e7-92c0-f2aacf2dcf15)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  >>>> ✨ producer.item, posted item: '{sequenceNo:65 Recipient:💩 poo}'
  <<<< 💠 consumer.run - new result arrived(#44): '                     ---> 🍉🍉🍉 [Seq: 41] Hello: '🧛‍♀️ vampire''
  >>>> ✨ producer.run - default (running: true) ...
  ===> 🧊 (#workers: '5') WorkerPool.run - done received ☢️☢️☢️
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 (#workers: '5') WorkerPool.run - done received ☢️☢️☢️
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 (#workers: '5') WorkerPool.run - done received ☢️☢️☢️
  ===> 🧊 WorkerPool.run - done received ☢️☢️☢️
  ===> 🧊 cancelling worker '(💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d' of 5 📛📛📛...
  ===> 🧊 cancelling worker '(❤️)WORKER-ID-1:fd466651-7e61-40d2-a49c-f1ac1a7050fb' of 5 📛📛📛...
  ===> 🧊 cancelling worker '(💙)WORKER-ID-2:f90fd9c8-f199-4b46-a042-5cddbd1a3810' of 5 📛📛📛...
  ===> 🧊 cancelling worker '(💚)WORKER-ID-3:b12cafc7-7e1c-4f27-954d-cca03204be0c' of 5 📛📛📛...
  ===> 🧊 cancelling worker '(💜)WORKER-ID-4:41459430-97cc-4a3f-8e75-3debd5f99276' of 5 📛📛📛...
  !!!! 🧊 WorkerPool.drain - waiting for remaining workers: 5 (#GRs: 19); 🧊🧊🧊
  >>>> ✨ producer.item, posted item: '{sequenceNo:66 Recipient:💩 poo}'
  >>>> 💠 producer.run - done received ⛔⛔⛔
  >>>> producer.run - finished (QUIT). ✨✨✨
  <<<< 💠 consumer.run - done received 💔💔💔
  <<<< consumer.run - finished (QUIT). 💠💠💠
        ---> 🚀 worker.run((💚)WORKER-ID-3:b12cafc7-7e1c-4f27-954d-cca03204be0c)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💚)WORKER-ID-3:b12cafc7-7e1c-4f27-954d-cca03204be0c) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💚)WORKER-ID-3:b12cafc7-7e1c-4f27-954d-cca03204be0c) finished, remaining: '4' 🟥
        ---> 🚀 worker.run((💙)WORKER-ID-2:f90fd9c8-f199-4b46-a042-5cddbd1a3810)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💙)WORKER-ID-2:f90fd9c8-f199-4b46-a042-5cddbd1a3810) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💙)WORKER-ID-2:f90fd9c8-f199-4b46-a042-5cddbd1a3810) finished, remaining: '3' 🟥
        ---> 🚀 worker.invoke((💜)WORKER-ID-4:41459430-97cc-4a3f-8e75-3debd5f99276)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💜)WORKER-ID-4:41459430-97cc-4a3f-8e75-3debd5f99276)(input:'{50 🦄 pegasus}')
        ---> 🚀 worker.run((❤️)WORKER-ID-1:fd466651-7e61-40d2-a49c-f1ac1a7050fb)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((❤️)WORKER-ID-1:fd466651-7e61-40d2-a49c-f1ac1a7050fb) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((❤️)WORKER-ID-1:fd466651-7e61-40d2-a49c-f1ac1a7050fb) finished, remaining: '2' 🟥
        ---> 🚀 worker.invoke((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(input:'{51 👾 xenomorph}')
        ---> 🚀 worker.invoke((💜)WORKER-ID-4:41459430-97cc-4a3f-8e75-3debd5f99276)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💜)WORKER-ID-4:41459430-97cc-4a3f-8e75-3debd5f99276)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💜)WORKER-ID-4:41459430-97cc-4a3f-8e75-3debd5f99276) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💜)WORKER-ID-4:41459430-97cc-4a3f-8e75-3debd5f99276) finished, remaining: '1' 🟥
        ---> 🚀 worker.invoke((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(input:'{52 👾 xenomorph}')
        ---> 🚀 worker.invoke((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(input:'{53 👺 gobby}')
        ---> 🚀 worker.invoke((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(input:'{54 👹 ogre}')
        ---> 🚀 worker.invoke((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💛)WORKER-ID-5:8db1390f-8f5c-4f7c-96d2-0a354fa4934d) finished, remaining: '0' 🟥
  ===> 🧊 WorkerPool.run - drain complete (workers count: '0'). 🎃🎃🎃
  <--- WorkerPool.run (QUIT). 🧊🧊🧊
  <--- orpheus(alpha) finished Counts >>> (Producer: '66', Consumer: '44'). 🎯🎯🎯
  << Captured StdOut/StdErr Output


  Timeline >>
  STEP: 👾 WAIT-GROUP ADD(producer) @ 08/13/23 16:07:36.825
  STEP: 👾 WAIT-GROUP ADD(worker-pool)
   @ 08/13/23 16:07:36.825
  STEP: 👾 WAIT-GROUP ADD(consumer) @ 08/13/23 16:07:36.825
  STEP: 👾 NOW AWAITING TERMINATION @ 08/13/23 16:07:36.825
  [TIMEDOUT] in [It] - /home/plastikfan/dev/github/go/snivilised/lorax/async/worker-pool_test.go:198 @ 08/13/23 16:07:41.821
  [FAILED] in [It] - /home/plastikfan/dev/github/go/snivilised/lorax/async/worker-pool_test.go:229 @ 08/13/23 16:07:45.348
  << Timeline

  [TIMEDOUT] A spec timeout occurred
  In [It] at: /home/plastikfan/dev/github/go/snivilised/lorax/async/worker-pool_test.go:198 @ 08/13/23 16:07:41.821

@plastikfan
Copy link
Contributor Author

plastikfan commented Aug 14, 2023

🎉🎉🎉 Found the resolution to the cancellation problem, it was in the producer. The producer could get blocked on sending the job, at which point it does not see the cancellation, until this fix was applied (pre-empt the job send, with a select on the context, newly passed into the item() function):

func (p *Producer[I, R]) item(ctx context.Context) bool {
	result := true
	i := p.provider()
	j := async.Job[I]{
		ID:    fmt.Sprintf("JOB-ID:%v", uuid.NewString()),
		Input: i,
	}

	fmt.Printf(">>>> ✨ producer.item, 🟠 waiting to post item: '%+v'\n", i)

	select {
	case <-ctx.Done():
		fmt.Println(">>>> 💠 producer.item - done received ⛔⛔⛔")

		result = false

	case p.JobsCh <- j:
	}
	p.Count++

	if result {
		fmt.Printf(">>>> ✨ producer.item, 🟢 posted item: '%+v'\n", i)
	} else {
		fmt.Printf(">>>> ✨ producer.item, 🔴 item NOT posted: '%+v'\n", i)
	}

	return result
}
Output that shows an example run showing the cancellation working ...
WorkerPool when given: a stream of jobs and: Cancelled 🧪 should: handle cancellation and shutdown cleanly
/home/plastikfan/dev/github/go/snivilised/lorax/async/worker-pool_test.go:196

  Captured StdOut/StdErr Output >>
  >>>> ✨ producer.run ...(ctx:*internal.specContext.WithCancel)
                >>> 💤 CancelAfter - Sleeping before requesting cancellation (200ms) ...
  ===> 🧊 WorkerPool.run ...(ctx:*internal.specContext.WithCancel)
  <<<< 💠 consumer.run ...(ctx:*internal.specContext.WithCancel)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:1 Recipient:🤖 rusty}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:1 Recipient:🤖 rusty}'
  ===> 🧊 (#workers: '0') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(❤️)WORKER-ID-1:5a736d26-7fb5-47b1-adfe-62d1aeaf0fa5' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:fc802924-6648-4122-b706-843639161d7f)
        ---> 🚀worker.run((❤️)WORKER-ID-1:5a736d26-7fb5-47b1-adfe-62d1aeaf0fa5) ...(ctx:*internal.specContext.WithCancel)
        ---> 🚀 worker.run((❤️)WORKER-ID-1:5a736d26-7fb5-47b1-adfe-62d1aeaf0fa5)(input:'{1 🤖 rusty}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:2 Recipient:👽 paul}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:2 Recipient:👽 paul}'
  ===> 🧊 (#workers: '1') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💙)WORKER-ID-2:7eeb4c93-5b0a-4691-ba3b-68e16fb34084' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:fc563b13-69d4-4fce-af51-6b155cbdf6f1)
        ---> 🚀worker.run((💙)WORKER-ID-2:7eeb4c93-5b0a-4691-ba3b-68e16fb34084) ...(ctx:*internal.specContext.WithCancel)
        ---> 🚀 worker.run((💙)WORKER-ID-2:7eeb4c93-5b0a-4691-ba3b-68e16fb34084)(input:'{2 👽 paul}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:3 Recipient:🤖 rusty}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:3 Recipient:🤖 rusty}'
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:61a6b91a-be50-4ac7-9b21-801b591e2eae)
        ---> 🚀worker.run((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e) ...(ctx:*internal.specContext.WithCancel)
        ---> 🚀 worker.run((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e)(input:'{3 🤖 rusty}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:4 Recipient:👺 gobby}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:4 Recipient:👺 gobby}'
  ===> 🧊 (#workers: '3') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💜)WORKER-ID-4:9dd26126-31c6-4b9f-acdf-626901105b29' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:194e1813-39d1-4a1c-8730-c7be851106cd)
        ---> 🚀worker.run((💜)WORKER-ID-4:9dd26126-31c6-4b9f-acdf-626901105b29) ...(ctx:*internal.specContext.WithCancel)
        ---> 🚀 worker.run((💜)WORKER-ID-4:9dd26126-31c6-4b9f-acdf-626901105b29)(input:'{4 👺 gobby}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:5 Recipient:💩 poo}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:5 Recipient:💩 poo}'
  ===> 🧊 (#workers: '4') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💛)WORKER-ID-5:f95006fe-79e2-4505-9b41-8129235d386f' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:da782647-11de-49aa-87a2-4287ca64252f)
        ---> 🚀worker.run((💛)WORKER-ID-5:f95006fe-79e2-4505-9b41-8129235d386f) ...(ctx:*internal.specContext.WithCancel)
        ---> 🚀 worker.run((💛)WORKER-ID-5:f95006fe-79e2-4505-9b41-8129235d386f)(input:'{5 💩 poo}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:6 Recipient:🤖 rusty}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:6 Recipient:🤖 rusty}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:69d7e468-da69-4092-980a-bb6f76d315c5)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:7 Recipient:👹 ogre}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:7 Recipient:👹 ogre}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:0df4b0be-c320-428a-ab5d-489b63d53810)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:8 Recipient:💩 poo}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:8 Recipient:💩 poo}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:339d980f-eb69-4f3d-bdc6-ce5ff9706d30)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:9 Recipient:🦄 pegasus}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:9 Recipient:🦄 pegasus}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:293e0563-4b02-4619-97f3-54f9a6728e86)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:10 Recipient:👹 ogre}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:10 Recipient:👹 ogre}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:724446d6-4b18-4979-ac97-30f9d4dfdc1c)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:11 Recipient:💀 skeletor}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:11 Recipient:💀 skeletor}'
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:12 Recipient:👿 nick}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:12 Recipient:👿 nick}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:13 Recipient:👿 nick}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:13 Recipient:👿 nick}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:14 Recipient:💩 poo}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:14 Recipient:💩 poo}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:15 Recipient:👹 ogre}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:15 Recipient:👹 ogre}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:16 Recipient:👻 caspar}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:16 Recipient:👻 caspar}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:17 Recipient:👺 gobby}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:17 Recipient:👺 gobby}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:18 Recipient:🤖 rusty}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:18 Recipient:🤖 rusty}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:19 Recipient:👺 gobby}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:19 Recipient:👺 gobby}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:20 Recipient:👾 xenomorph}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:20 Recipient:👾 xenomorph}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:21 Recipient:👻 caspar}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:21 Recipient:👻 caspar}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:22 Recipient:🦄 pegasus}'
        ---> 🚀 worker.run((💜)WORKER-ID-4:9dd26126-31c6-4b9f-acdf-626901105b29)(input:'{6 🤖 rusty}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:9274430e-e292-477c-bdd8-bf7764b4bef3)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:22 Recipient:🦄 pegasus}'
  <<<< 💠 consumer.run - new result arrived(#1): '                      ---> 🍉🍉🍉 [Seq: 4] Hello: '👺 gobby''
        ---> 🚀 worker.run((💛)WORKER-ID-5:f95006fe-79e2-4505-9b41-8129235d386f)(input:'{7 👹 ogre}')
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:9e809fce-205d-4df7-8fe2-500db419c872)
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  <<<< 💠 consumer.run - new result arrived(#2): '                      ---> 🍉🍉🍉 [Seq: 5] Hello: '💩 poo''
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:23 Recipient:👹 ogre}'
  >>>> ✨ producer.item, 🟢 posted item: '{sequenceNo:23 Recipient:👹 ogre}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{sequenceNo:24 Recipient:🧙 gandalf}'
                >>> CancelAfter - 🛑🛑🛑 cancellation submitted.
                >>> CancelAfter - ➖➖➖ CANCELLED
  ===> 🧊 (#workers: '5') WorkerPool.run - done received ☢️☢️☢️
  ===> 🧊 (#workers: '5') WorkerPool.run - new job received
  ===> 🧊 (#workers: '5') WorkerPool.run - done received ☢️☢️☢️
  ===> 🧊 WorkerPool.run - done received ☢️☢️☢️
  !!!! 🧊 WorkerPool.drain - waiting for remaining workers: 5 (#GRs: 19); 🧊🧊🧊
  >>>> 💠 producer.item - done received ⛔⛔⛔
  >>>> ✨ producer.item, 🔴 item NOT posted: '{sequenceNo:24 Recipient:🧙 gandalf}'
  >>>> producer.run - finished (QUIT). ✨✨✨
  <<<< 💠 consumer.run - done received 💔💔💔
  <<<< consumer.run - finished (QUIT). 💠💠💠
        ---> 🚀 worker.run((💙)WORKER-ID-2:7eeb4c93-5b0a-4691-ba3b-68e16fb34084)(input:'{8 💩 poo}')
        ---> 🚀 worker.run((💜)WORKER-ID-4:9dd26126-31c6-4b9f-acdf-626901105b29)(input:'{9 🦄 pegasus}')
        ---> 🚀 worker.invoke((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e)(input:'{10 👹 ogre}')
        ---> 🚀 worker.run((💜)WORKER-ID-4:9dd26126-31c6-4b9f-acdf-626901105b29)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💜)WORKER-ID-4:9dd26126-31c6-4b9f-acdf-626901105b29) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💜)WORKER-ID-4:9dd26126-31c6-4b9f-acdf-626901105b29) finished, remaining: '4' 🟥
        ---> 🚀 worker.invoke((❤️)WORKER-ID-1:5a736d26-7fb5-47b1-adfe-62d1aeaf0fa5)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((❤️)WORKER-ID-1:5a736d26-7fb5-47b1-adfe-62d1aeaf0fa5)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((❤️)WORKER-ID-1:5a736d26-7fb5-47b1-adfe-62d1aeaf0fa5) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((❤️)WORKER-ID-1:5a736d26-7fb5-47b1-adfe-62d1aeaf0fa5) finished, remaining: '3' 🟥
        ---> 🚀 worker.invoke((💙)WORKER-ID-2:7eeb4c93-5b0a-4691-ba3b-68e16fb34084)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💙)WORKER-ID-2:7eeb4c93-5b0a-4691-ba3b-68e16fb34084)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💙)WORKER-ID-2:7eeb4c93-5b0a-4691-ba3b-68e16fb34084) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💙)WORKER-ID-2:7eeb4c93-5b0a-4691-ba3b-68e16fb34084) finished, remaining: '2' 🟥
        ---> 🚀 worker.run((💛)WORKER-ID-5:f95006fe-79e2-4505-9b41-8129235d386f)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💛)WORKER-ID-5:f95006fe-79e2-4505-9b41-8129235d386f) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💛)WORKER-ID-5:f95006fe-79e2-4505-9b41-8129235d386f) finished, remaining: '1' 🟥
        ---> 🚀 worker.invoke((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e)(input:'{11 💀 skeletor}')
        ---> 🚀 worker.invoke((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e)(input:'{12 👿 nick}')
        ---> 🚀 worker.run((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e) (SENT FINISHED). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker((💚)WORKER-ID-3:dc767706-8fdb-4669-a69a-c081b4089e2e) finished, remaining: '0' 🟥
  ===> 🧊 WorkerPool.run - drain complete (workers count: '0'). 🎃🎃🎃
  <--- WorkerPool.run (QUIT). 🧊🧊🧊
  <--- orpheus(alpha) finished Counts >>> (Producer: '24', Consumer: '2'). 🎯🎯🎯
  << Captured StdOut/StdErr Output
------------------------------
 SUCCESS! 2.987972194s

The producer run loop, checks the return value of the item() function and terminates its loop if the return value = false, indicating the job was not sent, due to the done event occurring.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
1 participant