From 53ee8afde7b88813a59cd5f06c348aab9a44fdd5 Mon Sep 17 00:00:00 2001 From: Dede Lamb Date: Wed, 24 Apr 2024 19:07:57 +1000 Subject: [PATCH 1/6] move organization of target hosts to parse time --- go/vt/vtgateproxy/discovery.go | 93 ++++++++++++-------------------- go/vt/vtgateproxy/vtgateproxy.go | 4 +- 2 files changed, 36 insertions(+), 61 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 40df37f376b..04cc94b1185 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -23,6 +23,7 @@ import ( "io" "math/rand" "os" + "sort" "time" "google.golang.org/grpc/resolver" @@ -60,8 +61,9 @@ type JSONGateResolverBuilder struct { portField string poolTypeField string affinityField string + affinityValue string - targets []targetHost + targets map[string][]targetHost resolvers []*JSONGateResolver rand *rand.Rand @@ -96,13 +98,16 @@ func RegisterJSONGateResolver( portField string, poolTypeField string, affinityField string, + affinityValue string, ) (*JSONGateResolverBuilder, error) { jsonDiscovery := &JSONGateResolverBuilder{ + targets: map[string][]targetHost{}, jsonPath: jsonPath, addressField: addressField, portField: portField, poolTypeField: poolTypeField, affinityField: affinityField, + affinityValue: affinityValue, } resolver.Register(jsonDiscovery) @@ -138,17 +143,19 @@ func (b *JSONGateResolverBuilder) start() error { poolTypes := map[string]int{} affinityTypes := map[string]int{} - for _, t := range b.targets { - count := poolTypes[t.poolType] - poolTypes[t.poolType] = count + 1 + for _, ts := range b.targets { + for _, t := range ts { + count := poolTypes[t.poolType] + poolTypes[t.poolType] = count + 1 - count = affinityTypes[t.affinity] - affinityTypes[t.affinity] = count + 1 + count = affinityTypes[t.affinity] + affinityTypes[t.affinity] = count + 1 + } } buildCount.Add(1) - log.Infof("loaded %d targets, pool types %v, affinity groups %v", len(b.targets), poolTypes, affinityTypes) + log.Infof("loaded targets, pool types %v, affinity groups %v", poolTypes, affinityTypes) // Start a config watcher b.ticker = time.NewTicker(1 * time.Second) @@ -217,7 +224,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { return false, fmt.Errorf("error parsing JSON discovery file %s: %v", b.jsonPath, err) } - var targets []targetHost + var targets = map[string][]targetHost{} for _, host := range hosts { address, hasAddress := host[b.addressField] port, hasPort := host[b.portField] @@ -258,8 +265,21 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s has invalid value %v", b.jsonPath, b.portField, port) } - targets = append(targets, targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}) + target := targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)} + targets[target.poolType] = append(targets[target.poolType], target) + } + + for poolType := range targets { + if b.affinityField != "" { + sort.Slice(targets[poolType], func(i, j int) bool { + return b.affinityValue == targets[poolType][i].affinity + }) + } + if len(targets[poolType]) > *numConnections { + targets[poolType] = targets[poolType][:*numConnections] + } } + b.targets = targets return true, nil @@ -270,17 +290,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) - // filter to only targets that match the pool type. if unset, this will just be a copy - // of the full target list. - targets := []targetHost{} - for _, target := range b.targets { - if r.poolType == target.poolType { - targets = append(targets, target) - log.V(1000).Infof("matched target %v with type %s", target, r.poolType) - } else { - log.V(1000).Infof("skipping host %v with type %s", target, r.poolType) - } - } + targets := b.targets[r.poolType] // Shuffle to ensure every host has a different order to iterate through, putting // the affinity matching (e.g. same az) hosts at the front and the non-matching ones @@ -302,32 +312,12 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { } } - // Grab the first N addresses, and voila! var addrs []resolver.Address - targets = targets[:min(*numConnections, len(targets))] for _, target := range targets { addrs = append(addrs, resolver.Address{Addr: target.addr}) } - // Count some metrics - var unknown, local, remote int64 - for _, target := range targets { - if r.affinity == "" { - unknown++ - } else if r.affinity == target.affinity { - local++ - } else { - remote++ - } - } - if unknown != 0 { - affinityCount.Add("unknown", unknown) - } - affinityCount.Add("local", local) - affinityCount.Add("remote", remote) - poolTypeCount.Add(r.poolType, int64(len(targets))) - - log.V(100).Infof("updated targets for %s to %v (local %d / remote %d)", r.target.URL.String(), targets, local, remote) + log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) r.clientConn.UpdateState(resolver.State{Addresses: addrs}) } @@ -346,19 +336,13 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie } } - // Affinity on the other hand is just an optimization - affinity := "" - if b.affinityField != "" { - affinity = attrs.Get(b.affinityField) - } - - log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity) + log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, b.affinityValue) r := &JSONGateResolver{ target: target, clientConn: cc, poolType: poolType, - affinity: affinity, + affinity: b.affinityValue, } b.update(r) @@ -372,14 +356,3 @@ func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} func (r *JSONGateResolver) Close() { log.Infof("Closing resolver for target %s", r.target.URL.String()) } - -// Utilities -func min(a, b int) int { - if a < b { - return a - } - return b -} - -func init() { -} diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index bdf44348450..86ebb6350ff 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -43,7 +43,8 @@ var ( vtgateHostsFile = flag.String("vtgate_hosts_file", "", "json file describing the host list to use for vtgate:// resolution") numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain") poolTypeField = flag.String("pool_type_field", "", "Field name used to specify the target vtgate type and filter the hosts") - affinityField = flag.String("affinity_field", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'") + affinityField = flag.String("affinity_field", "", "Attribute (JSON file) used to specify the routing affinity , e.g. 'az_id'") + affinityValue = flag.String("affinity_value", "", "Value to match for routing affinity , e.g. 'use-az1'") addressField = flag.String("address_field", "address", "field name in the json file containing the address") portField = flag.String("port_field", "port", "field name in the json file containing the port") @@ -194,5 +195,6 @@ func Init() { *portField, *poolTypeField, *affinityField, + *affinityValue, ) } From 3037810f21ed7b8a41533cdd8fbd59cd44f98303 Mon Sep 17 00:00:00 2001 From: Dede Lamb Date: Wed, 24 Apr 2024 19:58:16 +1000 Subject: [PATCH 2/6] rework metrics and logging of parse errors --- go/vt/vtgateproxy/discovery.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 04cc94b1185..0463dfcd1c6 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -86,10 +86,8 @@ type JSONGateResolver struct { } var ( - buildCount = stats.NewCounter("JsonDiscoveryBuild", "JSON host discovery rebuilt the host list") - unchangedCount = stats.NewCounter("JsonDiscoveryUnchanged", "JSON host discovery parsed and determined no change to the file") - affinityCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostAffinity", "Count of hosts returned from discovery by AZ affinity", "affinity") - poolTypeCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostPoolType", "Count of hosts returned from discovery by pool type", "type") + parseCount = stats.NewCountersWithSingleLabel("JsonDiscoveryParseCount", "Count of results of JSON host file parsing (changed, unchanged, error)", "result") + targetCount = stats.NewGaugesWithSingleLabel("JsonDiscoveryTargetCount", "Count of hosts returned from discovery by pool type", "pool") ) func RegisterJSONGateResolver( @@ -153,7 +151,7 @@ func (b *JSONGateResolverBuilder) start() error { } } - buildCount.Add(1) + parseCount.Add("changed", 1) log.Infof("loaded targets, pool types %v, affinity groups %v", poolTypes, affinityTypes) @@ -165,10 +163,12 @@ func (b *JSONGateResolverBuilder) start() error { } go func() { + var parseErr error for range b.ticker.C { checkFileStat, err := os.Stat(b.jsonPath) if err != nil { log.Errorf("Error stat'ing config %v\n", err) + parseCount.Add("error", 1) continue } isUnchanged := checkFileStat.Size() == fileStat.Size() && checkFileStat.ModTime() == fileStat.ModTime() @@ -180,12 +180,20 @@ func (b *JSONGateResolverBuilder) start() error { fileStat = checkFileStat contentsChanged, err := b.parse() - if err != nil || !contentsChanged { - unchangedCount.Add(1) + if err != nil { + parseCount.Add("error", 1) + if parseErr == nil || err.Error() != parseErr.Error() { + parseErr = err + log.Error(err) + } continue } - - buildCount.Add(1) + if !contentsChanged { + parseCount.Add("unchanged", 1) + continue + } + parseErr = nil + parseCount.Add("changed", 1) // notify all the resolvers that the targets changed for _, r := range b.resolvers { @@ -278,6 +286,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { if len(targets[poolType]) > *numConnections { targets[poolType] = targets[poolType][:*numConnections] } + targetCount.Set(poolType, int64(len(targets[poolType]))) } b.targets = targets From 101d2beeea19175fc8a654711a6464c18da892ba Mon Sep 17 00:00:00 2001 From: Dede Lamb Date: Wed, 24 Apr 2024 21:35:45 +1000 Subject: [PATCH 3/6] add discovery bits to debug status page --- go/vt/vtgateproxy/discovery.go | 67 ++++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 0463dfcd1c6..2b3f90d35ad 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/servenv" ) // File based discovery for vtgate grpc endpoints @@ -72,9 +73,9 @@ type JSONGateResolverBuilder struct { } type targetHost struct { - addr string - poolType string - affinity string + Addr string + PoolType string + Affinity string } // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). @@ -116,6 +117,8 @@ func RegisterJSONGateResolver( return nil, err } + servenv.AddStatusPart("JSON Discovery", targetsTemplate, jsonDiscovery.debugTargets) + return jsonDiscovery, nil } @@ -143,11 +146,11 @@ func (b *JSONGateResolverBuilder) start() error { for _, ts := range b.targets { for _, t := range ts { - count := poolTypes[t.poolType] - poolTypes[t.poolType] = count + 1 + count := poolTypes[t.PoolType] + poolTypes[t.PoolType] = count + 1 - count = affinityTypes[t.affinity] - affinityTypes[t.affinity] = count + 1 + count = affinityTypes[t.Affinity] + affinityTypes[t.Affinity] = count + 1 } } @@ -274,13 +277,13 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { } target := targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)} - targets[target.poolType] = append(targets[target.poolType], target) + targets[target.PoolType] = append(targets[target.PoolType], target) } for poolType := range targets { if b.affinityField != "" { sort.Slice(targets[poolType], func(i, j int) bool { - return b.affinityValue == targets[poolType][i].affinity + return b.affinityValue == targets[poolType][i].Affinity }) } if len(targets[poolType]) > *numConnections { @@ -312,7 +315,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { for i := 0; i < n-1; i++ { j := head + b.rand.Intn(tail-head+1) - if r.affinity == "" || r.affinity == targets[j].affinity { + if r.affinity == "" || r.affinity == targets[j].Affinity { targets[head], targets[j] = targets[j], targets[head] head++ } else { @@ -323,7 +326,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { var addrs []resolver.Address for _, target := range targets { - addrs = append(addrs, resolver.Address{Addr: target.addr}) + addrs = append(addrs, resolver.Address{Addr: target.Addr}) } log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) @@ -360,8 +363,50 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie return r, nil } +// debugTargets will return the builder's targets with a sorted slice of +// poolTypes for rendering debug output +func (b *JSONGateResolverBuilder) debugTargets() any { + var pools []string + for pool := range b.targets { + pools = append(pools, pool) + } + sort.Strings(pools) + return struct { + Pools []string + Targets map[string][]targetHost + }{ + Pools: pools, + Targets: b.targets, + } +} + func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} func (r *JSONGateResolver) Close() { log.Infof("Closing resolver for target %s", r.target.URL.String()) } + +const ( + // targetsTemplate is a HTML template to display the gate resolver's target hosts. + targetsTemplate = ` + + +{{range $i, $p := .Pools}} + + +{{range index $.Targets $p}} + + + {{end}} +{{end}} +
{{$p}}
{{.Addr}}{{.Affinity}}
+` +) From c4c2c24988087e30a363960299a3f9aef7deb84a Mon Sep 17 00:00:00 2001 From: Dede Lamb Date: Thu, 25 Apr 2024 07:34:13 +1000 Subject: [PATCH 4/6] reset parseErr in the right place --- go/vt/vtgateproxy/discovery.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 2b3f90d35ad..ed69086d687 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -191,11 +191,11 @@ func (b *JSONGateResolverBuilder) start() error { } continue } + parseErr = nil if !contentsChanged { parseCount.Add("unchanged", 1) continue } - parseErr = nil parseCount.Add("changed", 1) // notify all the resolvers that the targets changed From 5132118355c3a11622280424a90239723ba35905 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Wed, 24 Apr 2024 17:56:56 -0700 Subject: [PATCH 5/6] add sync and change debug page to do the shuffle --- go/vt/vtgateproxy/discovery.go | 49 +++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index ed69086d687..4b533cc0002 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -24,6 +24,7 @@ import ( "math/rand" "os" "sort" + "sync" "time" "google.golang.org/grpc/resolver" @@ -64,6 +65,7 @@ type JSONGateResolverBuilder struct { affinityField string affinityValue string + mu sync.RWMutex targets map[string][]targetHost resolvers []*JSONGateResolver @@ -83,7 +85,6 @@ type JSONGateResolver struct { target resolver.Target clientConn resolver.ClientConn poolType string - affinity string } var ( @@ -156,7 +157,7 @@ func (b *JSONGateResolverBuilder) start() error { parseCount.Add("changed", 1) - log.Infof("loaded targets, pool types %v, affinity groups %v", poolTypes, affinityTypes) + log.Infof("loaded targets, pool types %v, affinity %s, groups %v", poolTypes, *affinityValue, affinityTypes) // Start a config watcher b.ticker = time.NewTicker(1 * time.Second) @@ -292,17 +293,30 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { targetCount.Set(poolType, int64(len(targets[poolType]))) } + b.mu.Lock() b.targets = targets + b.mu.Unlock() return true, nil } -// Update the current list of hosts for the given resolver -func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { - - log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) +func (b *JSONGateResolverBuilder) GetPools() []string { + b.mu.RLock() + defer b.mu.RUnlock() + var pools []string + for pool := range b.targets { + pools = append(pools, pool) + } + sort.Strings(pools) + return pools +} - targets := b.targets[r.poolType] +func (b *JSONGateResolverBuilder) GetTargets(poolType string) []targetHost { + // Copy the target slice + b.mu.RLock() + targets := []targetHost{} + targets = append(targets, b.targets[poolType]...) + b.mu.RUnlock() // Shuffle to ensure every host has a different order to iterate through, putting // the affinity matching (e.g. same az) hosts at the front and the non-matching ones @@ -315,7 +329,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { for i := 0; i < n-1; i++ { j := head + b.rand.Intn(tail-head+1) - if r.affinity == "" || r.affinity == targets[j].Affinity { + if *affinityField != "" && *affinityValue == targets[j].Affinity { targets[head], targets[j] = targets[j], targets[head] head++ } else { @@ -324,6 +338,16 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { } } + return targets +} + +// Update the current list of hosts for the given resolver +func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { + + log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) + + targets := b.GetTargets(r.poolType) + var addrs []resolver.Address for _, target := range targets { addrs = append(addrs, resolver.Address{Addr: target.Addr}) @@ -354,7 +378,6 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie target: target, clientConn: cc, poolType: poolType, - affinity: b.affinityValue, } b.update(r) @@ -366,17 +389,17 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie // debugTargets will return the builder's targets with a sorted slice of // poolTypes for rendering debug output func (b *JSONGateResolverBuilder) debugTargets() any { - var pools []string + pools := b.GetPools() + targets := map[string][]targetHost{} for pool := range b.targets { - pools = append(pools, pool) + targets[pool] = b.GetTargets(pool) } - sort.Strings(pools) return struct { Pools []string Targets map[string][]targetHost }{ Pools: pools, - Targets: b.targets, + Targets: targets, } } From 621ba65276d7ab7ce65095c2b0988fd8df1700db Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Wed, 24 Apr 2024 18:10:05 -0700 Subject: [PATCH 6/6] unrelated but just move some code around --- go/vt/vtgateproxy/discovery.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 4b533cc0002..a4b48340bfd 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -57,6 +57,19 @@ import ( // type: Only select from hosts of this type (required) // +// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). +type JSONGateResolver struct { + target resolver.Target + clientConn resolver.ClientConn + poolType string +} + +func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} + +func (r *JSONGateResolver) Close() { + log.Infof("Closing resolver for target %s", r.target.URL.String()) +} + type JSONGateResolverBuilder struct { jsonPath string addressField string @@ -80,13 +93,6 @@ type targetHost struct { Affinity string } -// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). -type JSONGateResolver struct { - target resolver.Target - clientConn resolver.ClientConn - poolType string -} - var ( parseCount = stats.NewCountersWithSingleLabel("JsonDiscoveryParseCount", "Count of results of JSON host file parsing (changed, unchanged, error)", "result") targetCount = stats.NewGaugesWithSingleLabel("JsonDiscoveryTargetCount", "Count of hosts returned from discovery by pool type", "pool") @@ -403,12 +409,6 @@ func (b *JSONGateResolverBuilder) debugTargets() any { } } -func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} - -func (r *JSONGateResolver) Close() { - log.Infof("Closing resolver for target %s", r.target.URL.String()) -} - const ( // targetsTemplate is a HTML template to display the gate resolver's target hosts. targetsTemplate = `