Skip to content

Commit

Permalink
ref(async): rename wait group interfaces (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 27, 2023
1 parent e099daa commit cbb17c7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 34 deletions.
58 changes: 34 additions & 24 deletions async/annotated-wait-group.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,61 @@ type namesCollection map[GoRoutineName]string
// without the go routine name behind the same methods name, ie Add needs to be
// able to be invoked either way.

// AssistedAdder is the interface that is a restricted view of a wait group
// AnnotatedWgAdder is the interface that is a restricted view of a wait group
// that only allows adding to the wait group with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
type AssistedAdder interface {
type AnnotatedWgAdder interface {
Add(delta int, name ...GoRoutineName)
}

// AssistedQuitter is the interface that is a restricted view of a wait group
// AnnotatedWgQuitter is the interface that is a restricted view of a wait group
// that only allows Done signalling on the wait group with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
type AssistedQuitter interface {
type AnnotatedWgQuitter interface {
Done(name ...GoRoutineName)
}

// AssistedWaiter is the interface that is a restricted view of a wait group
// that only allows waiting on the wait group with the addition of being
// AnnotatedWgAQ is the interface that is a restricted view of a wait group
// that allows adding to the wait group and Done signalling with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
type AssistedWaiter interface {
Wait(name ...GoRoutineName)
type AnnotatedWgAQ interface {
AnnotatedWgAdder
AnnotatedWgQuitter
}

// WaitGroupEx the extended WaitGroup
type WaitGroupEx interface {
AssistedAdder
AssistedQuitter
AssistedWaiter
// AnnotatedWgWaiter is the interface that is a restricted view of a wait group
// that only allows waiting on the wait group with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
type AnnotatedWgWaiter interface {
Wait(name ...GoRoutineName)
}

// AssistedCounter is the interface that is a restricted view of a wait group
// AnnotatedWgCounter is the interface that is a restricted view of a wait group
// that only allows querying the wait group count. This interface
// can be acquired from the wait group using a standard interface type query.
type AssistedCounter interface {
type AnnotatedWgCounter interface {
Count() int
}

type waitGroupAssister struct {
// WaitGroupEx the extended WaitGroup
type WaitGroupEx interface {
AnnotatedWgAdder
AnnotatedWgQuitter
AnnotatedWgWaiter
AnnotatedWgCounter
}

type waitGroupEx struct {
counter int32
names namesCollection
waitGroupName string
}

func (a *waitGroupAssister) Add(delta int, name ...GoRoutineName) {
func (a *waitGroupEx) Add(delta int, name ...GoRoutineName) {
a.counter += int32(delta)

if len(name) > 0 {
Expand All @@ -73,7 +83,7 @@ func (a *waitGroupAssister) Add(delta int, name ...GoRoutineName) {
}
}

func (a *waitGroupAssister) Done(name ...GoRoutineName) {
func (a *waitGroupEx) Done(name ...GoRoutineName) {
a.counter--

if len(name) > 0 {
Expand All @@ -83,20 +93,20 @@ func (a *waitGroupAssister) Done(name ...GoRoutineName) {
}
}

func (a *waitGroupAssister) Wait(name ...GoRoutineName) {
func (a *waitGroupEx) Wait(name ...GoRoutineName) {
if len(name) > 0 {
a.indicate("🧭🧭🧭", string(name[0]), "Wait")
}
}

func (a *waitGroupAssister) indicate(highlight, name, op string) {
func (a *waitGroupEx) indicate(highlight, name, op string) {
fmt.Printf(
" %v [[ WaitGroupAssister(%v).%v ]] - gr-name: '%v' (count: '%v') (running: '%v')\n",
highlight, a.waitGroupName, op, name, a.counter, a.running(),
)
}

func (a *waitGroupAssister) running() string {
func (a *waitGroupEx) running() string {
runners := lo.Map(lo.Keys(a.names), func(item GoRoutineName, _ int) string {
return string(item)
})
Expand All @@ -109,15 +119,15 @@ func (a *waitGroupAssister) running() string {
// diagnosing concurrency issues.
type AnnotatedWaitGroup struct {
wg sync.WaitGroup
assistant waitGroupAssister
assistant waitGroupEx
mux sync.Mutex
}

// NewAnnotatedWaitGroup creates a new AnnotatedWaitGroup instance containing
// the core WaitGroup instance.
func NewAnnotatedWaitGroup(name string) *AnnotatedWaitGroup {
func NewAnnotatedWaitGroup(name string) WaitGroupEx {
return &AnnotatedWaitGroup{
assistant: waitGroupAssister{
assistant: waitGroupEx{
waitGroupName: name,
names: make(namesCollection),
},
Expand Down
6 changes: 3 additions & 3 deletions async/annotated-wait-group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ var _ = Describe("AnnotatedWaitGroup", func() {

Context("Add", func() {
It("should: add", func() {
var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("add-unit-test")
wg := async.NewAnnotatedWaitGroup("add-unit-test")

wg.Add(1, "producer")
})
})

Context("Done", func() {
It("should: quit", func() {
var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("done-unit-test")
wg := async.NewAnnotatedWaitGroup("done-unit-test")

wg.Add(1, "producer")
wg.Done("producer")
Expand All @@ -29,7 +29,7 @@ var _ = Describe("AnnotatedWaitGroup", func() {

Context("Wait", func() {
It("should: quit", func() {
var wg async.WaitGroupEx = async.NewAnnotatedWaitGroup("wait-unit-test")
wg := async.NewAnnotatedWaitGroup("wait-unit-test")

wg.Add(1, "producer")
go func() {
Expand Down
4 changes: 2 additions & 2 deletions async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ type WorkerPool[I, O any] struct {
noWorkers int
SourceJobsChIn JobStreamR[I]

Quitter AssistedQuitter
Quitter AnnotatedWgQuitter
}

type NewWorkerPoolParams[I, O any] struct {
NoWorkers int
Exec ExecutiveFunc[I, O]
JobsCh chan Job[I]
CancelCh CancelStream
Quitter AssistedQuitter
Quitter AnnotatedWgQuitter
}

func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O] {
Expand Down
2 changes: 1 addition & 1 deletion async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ var _ = Describe("WorkerPool", func() {
pipe := start[TestJobInput, TestJobOutput]()

defer func() {
if counter, ok := (pipe.wgex).(async.AssistedCounter); ok {
if counter, ok := (pipe.wgex).(async.AnnotatedWgCounter); ok {
fmt.Printf("🎈🎈🎈🎈 remaining count: '%v'\n", counter.Count())
}
}()
Expand Down
4 changes: 2 additions & 2 deletions internal/helpers/test-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
)

type Consumer[O any] struct {
quitter async.AssistedQuitter
quitter async.AnnotatedWgQuitter
RoutineName async.GoRoutineName
OutputsChIn async.OutputStreamR[O]
Count int
}

func StartConsumer[O any](
ctx context.Context,
quitter async.AssistedQuitter,
quitter async.AnnotatedWgQuitter,
outputsChIn async.OutputStreamR[O],
) *Consumer[O] {
consumer := &Consumer[O]{
Expand Down
4 changes: 2 additions & 2 deletions internal/helpers/test-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type terminationStream chan termination
type ProviderFunc[I any] func() I

type Producer[I, O any] struct {
quitter async.AssistedQuitter
quitter async.AnnotatedWgQuitter
RoutineName async.GoRoutineName
sequenceNo int
provider ProviderFunc[I]
Expand All @@ -30,7 +30,7 @@ type Producer[I, O any] struct {
// indicate end of the work load.
func StartProducer[I, O any](
ctx context.Context,
quitter async.AssistedQuitter,
quitter async.AnnotatedWgQuitter,
capacity int,
provider ProviderFunc[I],
delay int,
Expand Down

0 comments on commit cbb17c7

Please sign in to comment.