Skip to content

Commit

Permalink
dixed bugs + refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Lambels committed Dec 12, 2022
1 parent 5a396c0 commit 425da7d
Showing 1 changed file with 104 additions and 33 deletions.
137 changes: 104 additions & 33 deletions broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,92 @@ import (
"golang.org/x/sync/errgroup"
)

// handler handels a message value from a transformer.
// responsible for decrementing the waitgroup.
// handlerValue handels a value from the broadcaster.
type handlerValue func(ctx context.Context, wg *sync.WaitGroup, id int, v MessagePacket) error

// handels the exit from the transformer. If true, the exit was caused by a context cancel.
type handlerExit func(*sync.WaitGroup, bool)

// waiter indicates a packet waiting to be handeled.
type waiter struct {
idT, idV int
skipped bool
value MessagePacket
// idT represents the transformers index in the transformer slice.
idT int
// idV represents the id of the packet.
// It is used to reconstruct the stream on the recieving end (processValues go-routine)
// in such way that the results go out in the same order they come in.
idV int
// skipped indicates wether the packet was skipped.
skipped bool
// packet copy.
value MessagePacket
}

// packetBroadcaster broadcasts packages to the transformers.
type packetBroadcaster struct {
src <-chan MessagePacket
ctx context.Context

wg *sync.WaitGroup
// lWg is a local waitgroup used to monitor how many transformer workers are
// currently writing to their wait queues.
//
// Two separate waitgroups are used to eliminate the deadlock which would
// occur when the processing go routine would wait for an exit signal to
// process its last value and the exit signal would wait for the processing
// go routine to finish off its last value for it to be sent.
lWg sync.WaitGroup
// pWg is a waitgroup used to monitor the values which are and will be processed.
pWg *sync.WaitGroup
// used to run go-routines: processValues and runTransformer.
runner *errgroup.Group

transformers []Transformer
receive []chan *waiter
// the wait queue for waiters.
receive []chan *waiter

// valuesCount is used to give
valuesCount int

// handels a value from the transformer.
handleValue handlerValue
handleSkip handlerValue
handleExit handlerExit
// handels a skipped value either from a transformer (via ErrSkip) or from a
// complete layer skip (via the packet Skip field).
//
// If the packet is skipped via a transformer the id argument will be >= 0 .
//
// If the packet has skipped the whole layer the id argument will be = -1.
handleSkip handlerValue
handleExit handlerExit
}

func newPacketBroadcatser(ctx context.Context, src <-chan MessagePacket, g *errgroup.Group, t []Transformer, handleValue, handleSkip handlerValue, handleExit handlerExit) *packetBroadcaster {
recievers := make([]chan *waiter, len(t))
for i := range recievers {
recievers[i] = make(chan *waiter)
}

return &packetBroadcaster{
src: src,
wg: &sync.WaitGroup{},
runner: g,
src: src,
ctx: ctx,

lWg: sync.WaitGroup{},
pWg: &sync.WaitGroup{},
runner: g,

transformers: t,
receive: recievers,

handleValue: handleValue,
handleSkip: handleSkip,
handleExit: handleExit,
}
}

// StartListen starts the packet broadcaster in a separate go routine,
// reading values from the src channel.
func (b *packetBroadcaster) StartListen() {
for _, ch := range b.receive {
b.runner.Go(
b.recieveValues(ch),
b.processValues(ch),
)
}

Expand All @@ -77,7 +120,7 @@ func (b *packetBroadcaster) listen() {
}

idV := b.valuesCount
b.wg.Add(len(b.transformers))
b.lWg.Add(len(b.transformers))
for i, t := range b.transformers {
b.runner.Go(b.runTransformer(t, v, i, idV))
}
Expand All @@ -87,16 +130,26 @@ func (b *packetBroadcaster) listen() {
}

func (b *packetBroadcaster) exit(cancel bool) {
b.handleExit(b.wg, cancel)
b.wg.Wait()
// wait for all values to be writted to their specific recieve channels.
b.lWg.Wait()
for _, ch := range b.receive {
close(ch)
}
b.handleExit(b.pWg, cancel)
}

// runTransformer runs the transformer and sends the value to the waiter queue.
// runTransformer runs the transformer with the provided value.
//
// If the transformation is sucessful the value ends up in a wait queue to be ran by
// the value processor go routine. (waitgroup responsability shifted to value processor go routine)
//
// If the context is cancelled the routine exits and the waitgroup is decremented cleaning everything up.
//
// If the transformer returns an error, simillarly routine exits and the waitgroup is decremented.
// When the routine exits it also returns the transformer error cancelling the context.
func (b *packetBroadcaster) runTransformer(t Transformer, v MessagePacket, idT, idV int) func() error {
return func() error {
defer b.lWg.Done()
out, err := t.Transform(b.ctx, v)
w := &waiter{
idT: idT,
Expand All @@ -111,8 +164,10 @@ func (b *packetBroadcaster) runTransformer(t Transformer, v MessagePacket, idT,
}

ch := b.receive[idT]
b.pWg.Add(1) // shift wg responsability to processor.
select {
case <-b.ctx.Done():
b.pWg.Done() // decrement processor wg since it will never reach it.
return b.ctx.Err()
case ch <- w:
return nil
Expand All @@ -127,12 +182,29 @@ func (s byIdV) Len() int { return len(s) }
func (s byIdV) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byIdV) Less(i, j int) bool { return s[i].idV < s[j].idV }

// recieveValues recieves values from the waiter queue in order.
func (b *packetBroadcaster) recieveValues(ch <-chan *waiter) func() error {
return func() error {
// processValues processes the wait queue for the specific transformer
func (b *packetBroadcaster) processValues(ch <-chan *waiter) func() error {
return func() (err error) {
var localIdV int
waiterBuf := make([]*waiter, 0)

defer func() {
// if exciting with error just decrement pWg since values will be unsignificant.
if err != nil {
for range waiterBuf {
b.pWg.Done()
}
return
}

sort.Sort(byIdV(waiterBuf))
for _, w := range waiterBuf {
if err = b.runWaiter(w); err != nil {
return
}
}
}()

for {
// first check if we can process any values with what we currently have.
sort.Sort(byIdV(waiterBuf))
Expand All @@ -141,47 +213,46 @@ func (b *packetBroadcaster) recieveValues(ch <-chan *waiter) func() error {
break
}

err := b.runWaiter(w)
err = b.runWaiter(w)
if err != nil {
return err
}
// shrink via reslicing (array wont grow allot)
waiterBuf = waiterBuf[i:]
waiterBuf = waiterBuf[i+1:]
}

select {
case <-b.ctx.Done():
// values which will never run but still occupy the wg.
for range waiterBuf {
b.wg.Done()
}

return b.ctx.Err()
err = b.ctx.Err()
return err
case w, ok := <-ch:
if !ok {
return nil
}

switch {
case w.idV < localIdV: // run now but dont increment idV.
return b.runWaiter(w)
if err = b.runWaiter(w); err != nil {
return err
}

case w.idV == localIdV: // run now and increment idV.
localIdV++
return b.runWaiter(w)
if err = b.runWaiter(w); err != nil {
return err
}

default: // waiter which will run in the future.
waiterBuf = append(waiterBuf, w)
}
}

}
}
}

func (b *packetBroadcaster) runWaiter(w *waiter) error {
if w.skipped {
return b.handleSkip(b.ctx, b.wg, w.idT, w.value)
return b.handleSkip(b.ctx, b.pWg, w.idT, w.value)
}
return b.handleValue(b.ctx, b.wg, w.idT, w.value)
return b.handleValue(b.ctx, b.pWg, w.idT, w.value)
}

0 comments on commit 425da7d

Please sign in to comment.