Skip to content

Commit

Permalink
Don't allow session sync to block channel. Fixes #1849
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Mar 20, 2024
1 parent 37e5394 commit 1e17a70
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
2 changes: 1 addition & 1 deletion controller/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *Controller) Initialize() {
//after InitPersistence
c.AppEnv.Broker = env.NewBroker(c.AppEnv, sync2.NewInstantStrategy(c.AppEnv, sync2.InstantStrategyOptions{
MaxQueuedRouterConnects: 100,
MaxQueuedClientHellos: 100,
MaxQueuedClientHellos: 1000,
RouterConnectWorkerCount: 10,
SyncWorkerCount: 10,
RouterTxBufferSize: 100,
Expand Down
31 changes: 29 additions & 2 deletions controller/sync_strats/sync_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,34 @@ func (strategy *InstantStrategy) ReceiveResync(routerId string, _ *edge_ctrl_pb.

rtx.RouterModelIndex = nil

strategy.receivedClientHelloQueue <- rtx
strategy.queueClientHello(rtx)
}

func (strategy *InstantStrategy) queueClientHello(rtx *RouterSender) {
select {
case strategy.receivedClientHelloQueue <- rtx:
return
default:
}

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

go func() {
for {
if ch := rtx.Router.Control; ch == nil || ch.IsClosed() {
return
}

select {
case strategy.receivedClientHelloQueue <- rtx:
return
case <-strategy.stopNotify:
return
case <-ticker.C:
}
}
}()
}

func (strategy *InstantStrategy) ReceiveClientHello(routerId string, msg *channel.Message, respHello *edge_ctrl_pb.ClientHello) {
Expand Down Expand Up @@ -589,7 +616,7 @@ func (strategy *InstantStrategy) ReceiveClientHello(routerId string, msg *channe

serverVersion := build.GetBuildInfo().Version()
logger.Infof("edge router sent hello with version [%s] to controller with version [%s]", respHello.Version, serverVersion)
strategy.receivedClientHelloQueue <- rtx
strategy.queueClientHello(rtx)
}

func (strategy *InstantStrategy) synchronize(rtx *RouterSender) {
Expand Down

0 comments on commit 1e17a70

Please sign in to comment.