From a2b7686c876ce35d6aafdd7472ea81dc4dff971f Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 24 Jan 2025 17:08:52 +0100 Subject: [PATCH] move things around again Signed-off-by: Tim Vaillancourt --- .../tabletmanager/health/primary_monitor.go | 174 ------------------ go/vt/vttablet/tabletmanager/tm_init.go | 4 +- go/vt/vttablet/tabletserver/controller.go | 3 + .../health/disk_monitor.go | 0 .../health/disk_monitor_test.go | 0 .../health/health.go | 2 +- .../tabletserver/health/tablet_monitor.go | 162 ++++++++++++++++ .../health/tablet_monitor_test.go} | 24 ++- go/vt/vttablet/tabletserver/state_manager.go | 28 +-- go/vt/vttablet/tabletserver/tabletserver.go | 4 + go/vt/vttablet/tabletservermock/controller.go | 6 + 11 files changed, 213 insertions(+), 194 deletions(-) delete mode 100644 go/vt/vttablet/tabletmanager/health/primary_monitor.go rename go/vt/vttablet/{tabletmanager => tabletserver}/health/disk_monitor.go (100%) rename go/vt/vttablet/{tabletmanager => tabletserver}/health/disk_monitor_test.go (100%) rename go/vt/vttablet/{tabletmanager => tabletserver}/health/health.go (92%) create mode 100644 go/vt/vttablet/tabletserver/health/tablet_monitor.go rename go/vt/vttablet/{tabletmanager/health/primary_monitor_test.go => tabletserver/health/tablet_monitor_test.go} (70%) diff --git a/go/vt/vttablet/tabletmanager/health/primary_monitor.go b/go/vt/vttablet/tabletmanager/health/primary_monitor.go deleted file mode 100644 index e91296f7acf..00000000000 --- a/go/vt/vttablet/tabletmanager/health/primary_monitor.go +++ /dev/null @@ -1,174 +0,0 @@ -/* -Copyright 2025 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package health - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/spf13/pflag" - - "vitess.io/vitess/go/vt/log" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/tmclient" -) - -var ( - ErrPrimaryMonitorOpen = vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, - "primary monitor is already open") - ErrPrimaryUnreachable = vterrors.New(vtrpcpb.Code_UNAVAILABLE, - "primary is unreachable") - - primaryMonitorInterval = time.Second * 5 - primaryMonitorTimeout = time.Second * 3 -) - -func registerPrimaryMonitorFlags(fs *pflag.FlagSet) { - fs.DurationVar(&primaryMonitorInterval, "primary-monitor-interval", primaryMonitorInterval, "interval for non-primaries to monitor the primary") - fs.DurationVar(&primaryMonitorTimeout, "primary-monitor-timeout", primaryMonitorTimeout, "timeout for pings when monitoring the primary") -} - -// PrimaryMonitor is an interface for monitoring a primary. -type PrimaryMonitor interface { - Open() error - Close() - IsReachable() error - SetPrimary(*topodatapb.Tablet) -} - -// NewPrimaryMonitor creates a TMClientPrimaryMonitor. -func NewPrimaryMonitor(tmc tmclient.TabletManagerClient) PrimaryMonitor { - return &TMClientPrimaryMonitor{ - pingNowChan: make(chan struct{}, 1), - tmc: tmc, - } -} - -// TMClientPrimaryMonitor monitors the health of a primary by pinging it using grpctmclient. -type TMClientPrimaryMonitor struct { - cancel context.CancelFunc - ctx context.Context - mu sync.Mutex - opened bool - pingNowChan chan struct{} - primary *topodatapb.Tablet - reachable uint32 - tmc tmclient.TabletManagerClient -} - -// getPrimary returns the primary to monitor under lock. -func (pm *TMClientPrimaryMonitor) getPrimary() *topodatapb.Tablet { - pm.mu.Lock() - defer pm.mu.Unlock() - return pm.primary -} - -// SetPrimary sets the primary to monitor under lock. -func (pm *TMClientPrimaryMonitor) SetPrimary(primary *topodatapb.Tablet) { - pm.mu.Lock() - defer pm.mu.Unlock() - pm.primary = primary - if pm.opened { - pm.pingNowChan <- struct{}{} - } -} - -// ping performs a Ping RPC to the tmserver of a primary. -func (pm *TMClientPrimaryMonitor) ping(primary *topodatapb.Tablet) { - if pm.tmc == nil { - return - } - if primary == nil { - return - } - - ctx, cancel := context.WithTimeout(pm.ctx, primaryMonitorTimeout) - defer cancel() - - var reachable uint32 - if err := pm.tmc.Ping(ctx, primary); err != nil { - log.Errorf("Failed to ping primary %s: %+v", topoproto.TabletAliasString(primary.Alias), err) - } else { - reachable = 1 - } - atomic.StoreUint32(&pm.reachable, reachable) -} - -// poll pings the primary periodically and on-demand when the address changes. -// The provided firstPingChan channel is closed after the initial ping. -func (pm *TMClientPrimaryMonitor) poll(firstPingChan chan struct{}) { - // initial ping - pm.ping(pm.primary) - close(firstPingChan) - - // periodic/on-demand ping - ticker := time.NewTicker(primaryMonitorInterval) - defer ticker.Stop() - for { - select { - case <-pm.ctx.Done(): - return - case <-pm.pingNowChan: - pm.ping(pm.getPrimary()) - case <-ticker.C: - pm.ping(pm.getPrimary()) - } - } -} - -// Open opens the primary health monitor. This causes an initial ping, then periodic/on-demand pings in a loop. -func (pm *TMClientPrimaryMonitor) Open() error { - pm.mu.Lock() - defer pm.mu.Unlock() - - if pm.opened { - return ErrPrimaryMonitorOpen - } - - pm.ctx, pm.cancel = context.WithCancel(context.Background()) - firstPingChan := make(chan struct{}, 1) - go pm.poll(firstPingChan) - <-firstPingChan - pm.opened = true - return nil -} - -// Close stops background pings and closes the primary health monitor. -func (pm *TMClientPrimaryMonitor) Close() { - pm.mu.Lock() - defer pm.mu.Unlock() - - if pm.cancel != nil { - pm.cancel() - } - atomic.StoreUint32(&pm.reachable, 0) - pm.opened = false - pm.primary = nil -} - -// IsReachable returns nil if the primary is reachable or an error. -func (pm *TMClientPrimaryMonitor) IsReachable() error { - if atomic.LoadUint32(&pm.reachable) != 1 { - return ErrPrimaryUnreachable - } - return nil -} diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index f29f16b2480..65c69e293e6 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -73,10 +73,10 @@ import ( "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/health" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletserver" + "vitess.io/vitess/go/vt/vttablet/tabletserver/health" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -166,7 +166,6 @@ type TabletManager struct { VDiffEngine *vdiff.Engine Env *vtenv.Environment dhMonitor health.DiskMonitor - primaryMonitor health.PrimaryMonitor // tmc is used to run an RPC against other vttablets. tmc tmclient.TabletManagerClient @@ -376,7 +375,6 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl tm.tmState = newTMState(tm, tablet) tm.actionSema = semaphore.NewWeighted(1) tm.dhMonitor = health.NewDiskMonitor(tm.BatchCtx) - tm.primaryMonitor = health.NewPrimaryMonitor(tm.tmc) tm._waitForGrantsComplete = make(chan struct{}) tm.baseTabletType = tablet.Type diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index c4a4bef99fc..5103546a073 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -122,6 +122,9 @@ type Controller interface { // SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. SetDemotePrimaryStalled() + + // IsPrimaryVttabletUnreachable returns true if the primary vttablet is unreachable. + IsPrimaryVttabletUnreachable() bool } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletmanager/health/disk_monitor.go b/go/vt/vttablet/tabletserver/health/disk_monitor.go similarity index 100% rename from go/vt/vttablet/tabletmanager/health/disk_monitor.go rename to go/vt/vttablet/tabletserver/health/disk_monitor.go diff --git a/go/vt/vttablet/tabletmanager/health/disk_monitor_test.go b/go/vt/vttablet/tabletserver/health/disk_monitor_test.go similarity index 100% rename from go/vt/vttablet/tabletmanager/health/disk_monitor_test.go rename to go/vt/vttablet/tabletserver/health/disk_monitor_test.go diff --git a/go/vt/vttablet/tabletmanager/health/health.go b/go/vt/vttablet/tabletserver/health/health.go similarity index 92% rename from go/vt/vttablet/tabletmanager/health/health.go rename to go/vt/vttablet/tabletserver/health/health.go index 6874790d701..3743c5262f0 100644 --- a/go/vt/vttablet/tabletmanager/health/health.go +++ b/go/vt/vttablet/tabletserver/health/health.go @@ -22,5 +22,5 @@ func init() { for _, daemon := range []string{"vtcombo", "vttablet"} { servenv.OnParseFor(daemon, registerDiskMonitorFlags) } - servenv.OnParseFor("vttablet", registerPrimaryMonitorFlags) + servenv.OnParseFor("vttablet", registerTabletMonitorFlags) } diff --git a/go/vt/vttablet/tabletserver/health/tablet_monitor.go b/go/vt/vttablet/tabletserver/health/tablet_monitor.go new file mode 100644 index 00000000000..4008b6ba378 --- /dev/null +++ b/go/vt/vttablet/tabletserver/health/tablet_monitor.go @@ -0,0 +1,162 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package health + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/spf13/pflag" + + "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" +) + +var ( + ErrTabletMonitorOpen = vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, + "tablet monitor is already open") + + tabletMonitorInterval = time.Second * 5 + tabletMonitorTimeout = time.Second * 3 +) + +func registerTabletMonitorFlags(fs *pflag.FlagSet) { + fs.DurationVar(&tabletMonitorInterval, "tablet-monitor-interval", tabletMonitorInterval, "interval for non-primaries to monitor the tablet") + fs.DurationVar(&tabletMonitorTimeout, "tablet-monitor-timeout", tabletMonitorTimeout, "timeout for pings when monitoring the tablet") +} + +// TabletMonitor is an interface for monitoring a tablet. +type TabletMonitor interface { + Open() error + Close() + IsReachable() bool + SetTablet(*topodatapb.Tablet) +} + +// NewTabletMonitor creates a TMClientTabletMonitor. +func NewTabletMonitor(tmc tmclient.TabletManagerClient) TabletMonitor { + return &TMClientTabletMonitor{ + pingNowChan: make(chan *topodatapb.Tablet, 1), + tmc: tmc, + } +} + +// TMClientTabletMonitor monitors the health of a tablet by pinging it using grpctmclient. +type TMClientTabletMonitor struct { + cancel context.CancelFunc + ctx context.Context + mu sync.Mutex + opened bool + pingNowChan chan *topodatapb.Tablet + tablet *topodatapb.Tablet + reachable uint32 + tmc tmclient.TabletManagerClient +} + +// getTablet returns the tablet to monitor under lock. +func (pm *TMClientTabletMonitor) getTablet() *topodatapb.Tablet { + pm.mu.Lock() + defer pm.mu.Unlock() + return pm.tablet +} + +// SetTablet sets the tablet to monitor under lock. +func (pm *TMClientTabletMonitor) SetTablet(tablet *topodatapb.Tablet) { + pm.mu.Lock() + defer pm.mu.Unlock() + pm.tablet = tablet + if pm.opened { + pm.pingNowChan <- tablet + } +} + +// ping performs a Ping RPC to the tmserver of a tablet. +func (pm *TMClientTabletMonitor) ping(tablet *topodatapb.Tablet) { + if pm.tmc == nil { + return + } + if tablet == nil { + return + } + + ctx, cancel := context.WithTimeout(pm.ctx, tabletMonitorTimeout) + defer cancel() + + var reachable uint32 + if err := pm.tmc.Ping(ctx, tablet); err != nil { + log.Errorf("Failed to ping tablet %s: %+v", topoproto.TabletAliasString(tablet.Alias), err) + } else { + reachable = 1 + } + atomic.StoreUint32(&pm.reachable, reachable) +} + +// poll pings the tablet periodically and on-demand when the address changes. +func (pm *TMClientTabletMonitor) poll() { + ticker := time.NewTicker(tabletMonitorInterval) + defer ticker.Stop() + for { + select { + case <-pm.ctx.Done(): + return + case tablet := <-pm.pingNowChan: + pm.ping(tablet) + case <-ticker.C: + pm.ping(pm.getTablet()) + } + } +} + +// Open opens the tablet health monitor. This causes an initial ping, then periodic/on-demand pings in a loop. +func (pm *TMClientTabletMonitor) Open() error { + pm.mu.Lock() + defer pm.mu.Unlock() + + if pm.opened { + return ErrTabletMonitorOpen + } + + pm.ctx, pm.cancel = context.WithCancel(context.Background()) + pm.ping(pm.tablet) + go pm.poll() + pm.opened = true + return nil +} + +// Close stops background pings and closes the tablet health monitor. +func (pm *TMClientTabletMonitor) Close() { + pm.mu.Lock() + defer pm.mu.Unlock() + + if pm.cancel != nil { + pm.cancel() + } + atomic.StoreUint32(&pm.reachable, 0) + pm.opened = false + pm.tablet = nil +} + +// IsReachable returns true if the tablet is reachable. +func (pm *TMClientTabletMonitor) IsReachable() bool { + return atomic.LoadUint32(&pm.reachable) == 1 +} diff --git a/go/vt/vttablet/tabletmanager/health/primary_monitor_test.go b/go/vt/vttablet/tabletserver/health/tablet_monitor_test.go similarity index 70% rename from go/vt/vttablet/tabletmanager/health/primary_monitor_test.go rename to go/vt/vttablet/tabletserver/health/tablet_monitor_test.go index 1afcce75004..96420a4faee 100644 --- a/go/vt/vttablet/tabletmanager/health/primary_monitor_test.go +++ b/go/vt/vttablet/tabletserver/health/tablet_monitor_test.go @@ -25,15 +25,33 @@ import ( "google.golang.org/grpc" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/testutils" + "vitess.io/vitess/go/vt/vttablet/grpctmserver" + "vitess.io/vitess/go/vt/vttablet/tmrpctest" ) +func initTestTMServer(t *testing.T) (string, string, string, int32) { + // Listen on a random port + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Cannot listen: %v", err) + } + host := listener.Addr().(*net.TCPAddr).IP.String() + port := int32(listener.Addr().(*net.TCPAddr).Port) + + // Create a gRPC server and listen on the port. + s := grpc.NewServer() + fakeTM := tmrpctest.NewFakeRPCTM(t) + grpctmserver.RegisterForTest(s, fakeTM) + go s.Serve(listener) + return s, listener, host, port +} + func TestPrimaryHealthMonitor(t *testing.T) { - s, listener := testutils.InitTestTMServer(t) + s, listener, host, port := initTestTMServer(t) defer listener.Close() interval := time.Millisecond * 5 - phm := NewPrimaryHealthMonitor(interval) + phm := NewPrimaryMonitor(interval) require.NotNil(t, phm) // open diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 4512b26f177..49fa914c863 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -32,6 +32,7 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/health" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) @@ -111,19 +112,20 @@ type stateManager struct { // Open must be done in forward order. // Close must be done in reverse order. // All Close functions must be called before Open. - hs *healthStreamer - se schemaEngine - rt replTracker - vstreamer subComponent - tracker subComponent - watcher subComponent - qe queryEngine - txThrottler txThrottler - te txEngine - messager subComponent - ddle onlineDDLExecutor - throttler lagThrottler - tableGC tableGarbageCollector + hs *healthStreamer + se schemaEngine + rt replTracker + vstreamer subComponent + tracker subComponent + watcher subComponent + qe queryEngine + txThrottler txThrottler + te txEngine + messager subComponent + ddle onlineDDLExecutor + throttler lagThrottler + tableGC tableGarbageCollector + tabletMonitor health.TabletMonitor // hcticks starts on initialization and runs forever. hcticks *timer.Timer diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 30f73d2d818..ad2900b51fe 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -767,6 +767,10 @@ func (tsv *TabletServer) SetDemotePrimaryStalled() { tsv.BroadcastHealth() } +func (tsv *TabletServer) IsPrimaryVttabletUnreachable() bool { + return tsv.sm.tabletMonitor.IsReachable() +} + // CreateTransaction creates the metadata for a 2PC transaction. func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) { return tsv.execRequest( diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index a5242751454..dc50dd06839 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -279,6 +279,12 @@ func (tqsc *Controller) SetDemotePrimaryStalled() { tqsc.MethodCalled["SetDemotePrimaryStalled"] = true } +// IsPrimaryVttabletUnreachable is part of the tabletserver.Controller interface +func (tqsc *Controller) IsPrimaryVttabletUnreachable() bool { + tqsc.MethodCalled["IsPrimaryVttabletUnreachable"] = true + return false +} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock()