From 82c4df9727fc7c068769c12ee606959e2c1b0c68 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Wed, 5 Jun 2024 12:39:58 +0100 Subject: [PATCH] Fix first ready bug Signed-off-by: Henry Robinson --- go/vt/vtgateproxy/firstready_balancer.go | 57 +++++++++++------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/go/vt/vtgateproxy/firstready_balancer.go b/go/vt/vtgateproxy/firstready_balancer.go index ed884224c64..02c296480dd 100644 --- a/go/vt/vtgateproxy/firstready_balancer.go +++ b/go/vt/vtgateproxy/firstready_balancer.go @@ -32,10 +32,11 @@ limitations under the License. import ( "errors" - "sync" + "sync/atomic" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/resolver" "vitess.io/vitess/go/vt/log" ) @@ -54,8 +55,16 @@ func init() { // Once a conn is chosen and is in the ready state, it will remain as the // active subconn even if other connections become available. type frPickerBuilder struct { - mu sync.Mutex - currentConn balancer.SubConn +} + +type frConnWithAddr struct { + subConn balancer.SubConn + addr resolver.Address +} + +type frPicker struct { + subConns []frConnWithAddr + cur uint32 } func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { @@ -65,36 +74,24 @@ func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { return base.NewErrPicker(errors.New("no available connections")) } - f.mu.Lock() - defer f.mu.Unlock() - - // If we've already chosen a subconn, and it is still in the ready list, then - // no need to change state - if f.currentConn != nil { - log.V(100).Infof("first_ready: currentConn is active, checking if still ready") - for sc := range info.ReadySCs { - if f.currentConn == sc { - log.V(100).Infof("first_ready: currentConn still active - not changing") - return f - } - } + subConns := make([]frConnWithAddr, len(info.ReadySCs)) + idx := 0 + for sc, k := range info.ReadySCs { + subConns[idx] = frConnWithAddr{subConn: sc, addr: k.Address} + idx++ } - // Otherwise either we don't have an active conn or the conn we were using is - // no longer active, so pick an arbitrary new one out of the map. - log.V(100).Infof("first_ready: currentConn is not active, picking a new one") - for sc := range info.ReadySCs { - f.currentConn = sc - break - } - - return f + return &frPicker{subConns: subConns, cur: 0} } // Pick simply returns the currently chosen conn -func (f *frPickerBuilder) Pick(balancer.PickInfo) (balancer.PickResult, error) { - f.mu.Lock() - defer f.mu.Unlock() - - return balancer.PickResult{SubConn: f.currentConn}, nil +func (f *frPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) { + curIndex := atomic.LoadUint32(&f.cur) + return balancer.PickResult{SubConn: f.subConns[curIndex].subConn, Done: func(info balancer.DoneInfo) { + if info.Err != nil { + // Only try to move the index at most 1 - if someone else raced and advanced it, do nothing. + nextIndex := (curIndex + 1) % uint32(len(f.subConns)) + atomic.CompareAndSwapUint32(&f.cur, curIndex, nextIndex) + } + }}, nil }