Skip to content

Commit

Permalink
add sync and change debug page to do the shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
demmer committed Apr 25, 2024
1 parent c4c2c24 commit 5132118
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"os"
"sort"
"sync"
"time"

"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -64,6 +65,7 @@ type JSONGateResolverBuilder struct {
affinityField string
affinityValue string

mu sync.RWMutex
targets map[string][]targetHost
resolvers []*JSONGateResolver

Expand All @@ -83,7 +85,6 @@ type JSONGateResolver struct {
target resolver.Target
clientConn resolver.ClientConn
poolType string
affinity string
}

var (
Expand Down Expand Up @@ -156,7 +157,7 @@ func (b *JSONGateResolverBuilder) start() error {

parseCount.Add("changed", 1)

log.Infof("loaded targets, pool types %v, affinity groups %v", poolTypes, affinityTypes)
log.Infof("loaded targets, pool types %v, affinity %s, groups %v", poolTypes, *affinityValue, affinityTypes)

// Start a config watcher
b.ticker = time.NewTicker(1 * time.Second)
Expand Down Expand Up @@ -292,17 +293,30 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
targetCount.Set(poolType, int64(len(targets[poolType])))
}

b.mu.Lock()
b.targets = targets
b.mu.Unlock()

return true, nil
}

// Update the current list of hosts for the given resolver
func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {

log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)
func (b *JSONGateResolverBuilder) GetPools() []string {
b.mu.RLock()
defer b.mu.RUnlock()
var pools []string
for pool := range b.targets {
pools = append(pools, pool)
}
sort.Strings(pools)
return pools
}

targets := b.targets[r.poolType]
func (b *JSONGateResolverBuilder) GetTargets(poolType string) []targetHost {
// Copy the target slice
b.mu.RLock()
targets := []targetHost{}
targets = append(targets, b.targets[poolType]...)
b.mu.RUnlock()

// Shuffle to ensure every host has a different order to iterate through, putting
// the affinity matching (e.g. same az) hosts at the front and the non-matching ones
Expand All @@ -315,7 +329,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
for i := 0; i < n-1; i++ {
j := head + b.rand.Intn(tail-head+1)

if r.affinity == "" || r.affinity == targets[j].Affinity {
if *affinityField != "" && *affinityValue == targets[j].Affinity {
targets[head], targets[j] = targets[j], targets[head]
head++
} else {
Expand All @@ -324,6 +338,16 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
}
}

return targets
}

// Update the current list of hosts for the given resolver
func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {

log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)

targets := b.GetTargets(r.poolType)

var addrs []resolver.Address
for _, target := range targets {
addrs = append(addrs, resolver.Address{Addr: target.Addr})
Expand Down Expand Up @@ -354,7 +378,6 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie
target: target,
clientConn: cc,
poolType: poolType,
affinity: b.affinityValue,
}

b.update(r)
Expand All @@ -366,17 +389,17 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie
// debugTargets will return the builder's targets with a sorted slice of
// poolTypes for rendering debug output
func (b *JSONGateResolverBuilder) debugTargets() any {
var pools []string
pools := b.GetPools()
targets := map[string][]targetHost{}
for pool := range b.targets {
pools = append(pools, pool)
targets[pool] = b.GetTargets(pool)
}
sort.Strings(pools)
return struct {
Pools []string
Targets map[string][]targetHost
}{
Pools: pools,
Targets: b.targets,
Targets: targets,
}
}

Expand Down

0 comments on commit 5132118

Please sign in to comment.