From 425da7d7d13231b9e2e609ef490c566deb9ce53f Mon Sep 17 00:00:00 2001 From: Lambels Date: Mon, 12 Dec 2022 20:27:50 +0200 Subject: [PATCH] dixed bugs + refactor --- broadcaster.go | 137 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 104 insertions(+), 33 deletions(-) diff --git a/broadcaster.go b/broadcaster.go index e869b71..a403547 100644 --- a/broadcaster.go +++ b/broadcaster.go @@ -8,49 +8,92 @@ import ( "golang.org/x/sync/errgroup" ) -// handler handels a message value from a transformer. -// responsible for decrementing the waitgroup. +// handlerValue handels a value from the broadcaster. type handlerValue func(ctx context.Context, wg *sync.WaitGroup, id int, v MessagePacket) error +// handels the exit from the transformer. If true, the exit was caused by a context cancel. type handlerExit func(*sync.WaitGroup, bool) +// waiter indicates a packet waiting to be handeled. type waiter struct { - idT, idV int - skipped bool - value MessagePacket + // idT represents the transformers index in the transformer slice. + idT int + // idV represents the id of the packet. + // It is used to reconstruct the stream on the recieving end (processValues go-routine) + // in such way that the results go out in the same order they come in. + idV int + // skipped indicates wether the packet was skipped. + skipped bool + // packet copy. + value MessagePacket } +// packetBroadcaster broadcasts packages to the transformers. type packetBroadcaster struct { src <-chan MessagePacket ctx context.Context - wg *sync.WaitGroup + // lWg is a local waitgroup used to monitor how many transformer workers are + // currently writing to their wait queues. + // + // Two separate waitgroups are used to eliminate the deadlock which would + // occur when the processing go routine would wait for an exit signal to + // process its last value and the exit signal would wait for the processing + // go routine to finish off its last value for it to be sent. + lWg sync.WaitGroup + // pWg is a waitgroup used to monitor the values which are and will be processed. + pWg *sync.WaitGroup + // used to run go-routines: processValues and runTransformer. runner *errgroup.Group transformers []Transformer - receive []chan *waiter + // the wait queue for waiters. + receive []chan *waiter + // valuesCount is used to give valuesCount int + // handels a value from the transformer. handleValue handlerValue - handleSkip handlerValue - handleExit handlerExit + // handels a skipped value either from a transformer (via ErrSkip) or from a + // complete layer skip (via the packet Skip field). + // + // If the packet is skipped via a transformer the id argument will be >= 0 . + // + // If the packet has skipped the whole layer the id argument will be = -1. + handleSkip handlerValue + handleExit handlerExit } func newPacketBroadcatser(ctx context.Context, src <-chan MessagePacket, g *errgroup.Group, t []Transformer, handleValue, handleSkip handlerValue, handleExit handlerExit) *packetBroadcaster { + recievers := make([]chan *waiter, len(t)) + for i := range recievers { + recievers[i] = make(chan *waiter) + } + return &packetBroadcaster{ - src: src, - wg: &sync.WaitGroup{}, - runner: g, + src: src, + ctx: ctx, + + lWg: sync.WaitGroup{}, + pWg: &sync.WaitGroup{}, + runner: g, + + transformers: t, + receive: recievers, + handleValue: handleValue, + handleSkip: handleSkip, handleExit: handleExit, } } +// StartListen starts the packet broadcaster in a separate go routine, +// reading values from the src channel. func (b *packetBroadcaster) StartListen() { for _, ch := range b.receive { b.runner.Go( - b.recieveValues(ch), + b.processValues(ch), ) } @@ -77,7 +120,7 @@ func (b *packetBroadcaster) listen() { } idV := b.valuesCount - b.wg.Add(len(b.transformers)) + b.lWg.Add(len(b.transformers)) for i, t := range b.transformers { b.runner.Go(b.runTransformer(t, v, i, idV)) } @@ -87,16 +130,26 @@ func (b *packetBroadcaster) listen() { } func (b *packetBroadcaster) exit(cancel bool) { - b.handleExit(b.wg, cancel) - b.wg.Wait() + // wait for all values to be writted to their specific recieve channels. + b.lWg.Wait() for _, ch := range b.receive { close(ch) } + b.handleExit(b.pWg, cancel) } -// runTransformer runs the transformer and sends the value to the waiter queue. +// runTransformer runs the transformer with the provided value. +// +// If the transformation is sucessful the value ends up in a wait queue to be ran by +// the value processor go routine. (waitgroup responsability shifted to value processor go routine) +// +// If the context is cancelled the routine exits and the waitgroup is decremented cleaning everything up. +// +// If the transformer returns an error, simillarly routine exits and the waitgroup is decremented. +// When the routine exits it also returns the transformer error cancelling the context. func (b *packetBroadcaster) runTransformer(t Transformer, v MessagePacket, idT, idV int) func() error { return func() error { + defer b.lWg.Done() out, err := t.Transform(b.ctx, v) w := &waiter{ idT: idT, @@ -111,8 +164,10 @@ func (b *packetBroadcaster) runTransformer(t Transformer, v MessagePacket, idT, } ch := b.receive[idT] + b.pWg.Add(1) // shift wg responsability to processor. select { case <-b.ctx.Done(): + b.pWg.Done() // decrement processor wg since it will never reach it. return b.ctx.Err() case ch <- w: return nil @@ -127,12 +182,29 @@ func (s byIdV) Len() int { return len(s) } func (s byIdV) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s byIdV) Less(i, j int) bool { return s[i].idV < s[j].idV } -// recieveValues recieves values from the waiter queue in order. -func (b *packetBroadcaster) recieveValues(ch <-chan *waiter) func() error { - return func() error { +// processValues processes the wait queue for the specific transformer +func (b *packetBroadcaster) processValues(ch <-chan *waiter) func() error { + return func() (err error) { var localIdV int waiterBuf := make([]*waiter, 0) + defer func() { + // if exciting with error just decrement pWg since values will be unsignificant. + if err != nil { + for range waiterBuf { + b.pWg.Done() + } + return + } + + sort.Sort(byIdV(waiterBuf)) + for _, w := range waiterBuf { + if err = b.runWaiter(w); err != nil { + return + } + } + }() + for { // first check if we can process any values with what we currently have. sort.Sort(byIdV(waiterBuf)) @@ -141,22 +213,18 @@ func (b *packetBroadcaster) recieveValues(ch <-chan *waiter) func() error { break } - err := b.runWaiter(w) + err = b.runWaiter(w) if err != nil { return err } // shrink via reslicing (array wont grow allot) - waiterBuf = waiterBuf[i:] + waiterBuf = waiterBuf[i+1:] } select { case <-b.ctx.Done(): - // values which will never run but still occupy the wg. - for range waiterBuf { - b.wg.Done() - } - - return b.ctx.Err() + err = b.ctx.Err() + return err case w, ok := <-ch: if !ok { return nil @@ -164,24 +232,27 @@ func (b *packetBroadcaster) recieveValues(ch <-chan *waiter) func() error { switch { case w.idV < localIdV: // run now but dont increment idV. - return b.runWaiter(w) + if err = b.runWaiter(w); err != nil { + return err + } case w.idV == localIdV: // run now and increment idV. localIdV++ - return b.runWaiter(w) + if err = b.runWaiter(w); err != nil { + return err + } default: // waiter which will run in the future. waiterBuf = append(waiterBuf, w) } } - } } } func (b *packetBroadcaster) runWaiter(w *waiter) error { if w.skipped { - return b.handleSkip(b.ctx, b.wg, w.idT, w.value) + return b.handleSkip(b.ctx, b.pWg, w.idT, w.value) } - return b.handleValue(b.ctx, b.wg, w.idT, w.value) + return b.handleValue(b.ctx, b.pWg, w.idT, w.value) }