Skip to content

Commit

Permalink
ref(async): define seq no in job (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 15, 2023
1 parent 807e0f6 commit 6f4d0b6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 43 deletions.
5 changes: 3 additions & 2 deletions async/pool-defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ const (
//

type Job[I any] struct {
ID string
Input I
ID string
Input I
SequenceNo int
}

type ExecutiveFunc[I, R any] func(j Job[I]) (JobResult[R], error)
Expand Down
5 changes: 4 additions & 1 deletion async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ func (p *WorkerPool[I, R]) run(
}
select {
case forwardChOut <- job:
fmt.Printf("===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v)\n", job.ID)
fmt.Printf("===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(%v) [Seq: %v]\n",
job.ID,
job.SequenceNo,
)
case <-ctx.Done(): // ☣️☣️☣️ CHECK THIS, IT MIGHT BE INVALID
fmt.Printf("===> 🧊 (#workers: '%v') WorkerPool.run - done received ☢️☢️☢️\n",
len(p.private.pool),
Expand Down
64 changes: 27 additions & 37 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,29 @@ const (
Delay = 750
)

var audience = []string{
"👻 caspar",
"🧙 gandalf",
"😺 garfield",
"👺 gobby",
"👿 nick",
"👹 ogre",
"👽 paul",
"🦄 pegasus",
"💩 poo",
"🤖 rusty",
"💀 skeletor",
"🐉 smaug",
"🧛‍♀️ vampire",
"👾 xenomorph",
}
var (
audience = []string{
"👻 caspar",
"🧙 gandalf",
"😺 garfield",
"👺 gobby",
"👿 nick",
"👹 ogre",
"👽 paul",
"🦄 pegasus",
"💩 poo",
"🤖 rusty",
"💀 skeletor",
"🐉 smaug",
"🧛‍♀️ vampire",
"👾 xenomorph",
}

type TestJobInput struct {
sequenceNo int // allocated by observer
Recipient string
}
noOp = func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {}
)

func (i TestJobInput) SequenceNo() int {
return i.sequenceNo
type TestJobInput struct {
Recipient string
}

type TestJobResult = string
Expand All @@ -69,7 +68,7 @@ var greeter = func(j async.Job[TestJobInput]) (async.JobResult[TestJobResult], e

result := async.JobResult[TestJobResult]{
Payload: fmt.Sprintf(" ---> 🍉🍉🍉 [Seq: %v] Hello: '%v'",
j.Input.SequenceNo(), j.Input.Recipient,
j.SequenceNo, j.Input.Recipient,
),
}

Expand All @@ -91,12 +90,8 @@ type pipeline[I, R any] struct {
func start[I, R any]() *pipeline[I, R] {
pipe := &pipeline[I, R]{
resultsCh: make(chan async.JobResult[R], ResultChSize),
stop: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
cancel: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
stop: noOp,
cancel: noOp,
}

return pipe
Expand Down Expand Up @@ -160,13 +155,10 @@ var _ = Describe("WorkerPool", func() {
pipe := start[TestJobInput, TestJobResult]()

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.produce(ctx, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++
return TestJobInput{
sequenceNo: sequence,
Recipient: audience[recipient],
Recipient: audience[recipient],
}
})

Expand Down Expand Up @@ -200,13 +192,11 @@ var _ = Describe("WorkerPool", func() {
cancellations := []context.CancelFunc{cancel}

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.produce(ctxCancel, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++

return TestJobInput{
sequenceNo: sequence,
Recipient: audience[recipient],
Recipient: audience[recipient],
}
})

Expand Down
9 changes: 6 additions & 3 deletions internal/helpers/test-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@ func (p *Producer[I, R]) run(ctx context.Context) {
}

func (p *Producer[I, R]) item(ctx context.Context) bool {
p.sequenceNo++
p.Count++

result := true
i := p.provider()
j := async.Job[I]{
ID: fmt.Sprintf("JOB-ID:%v", uuid.NewString()),
Input: i,
ID: fmt.Sprintf("JOB-ID:%v", uuid.NewString()),
Input: i,
SequenceNo: p.sequenceNo,
}

fmt.Printf(">>>> ✨ producer.item, 🟠 waiting to post item: '%+v'\n", i)
Expand All @@ -96,7 +100,6 @@ func (p *Producer[I, R]) item(ctx context.Context) bool {

case p.JobsCh <- j:
}
p.Count++

if result {
fmt.Printf(">>>> ✨ producer.item, 🟢 posted item: '%+v'\n", i)
Expand Down

0 comments on commit 6f4d0b6

Please sign in to comment.