From 851b600257aa511c0b70204c096e19585bc9d001 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 25 Feb 2025 14:46:11 +0530 Subject: [PATCH 1/4] feat: add name to the subscriber to have a more helpful error message Signed-off-by: Manan Gupta --- go/vt/discovery/fake_healthcheck.go | 2 +- go/vt/discovery/healthcheck.go | 14 +-- go/vt/discovery/healthcheck_test.go | 34 +++--- go/vt/discovery/keyspace_events.go | 2 +- go/vt/throttler/demo/throttler_demo.go | 2 +- go/vt/vtgate/vtgate.go | 4 +- .../txthrottler/mock_healthcheck_test.go | 108 ++++++++++-------- .../txthrottler/mock_throttler_test.go | 54 +++++---- .../tabletserver/txthrottler/tx_throttler.go | 2 +- .../txthrottler/tx_throttler_test.go | 2 +- 10 files changed, 121 insertions(+), 103 deletions(-) diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index 6ae7ee105c2..6d06512f422 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -109,7 +109,7 @@ func (fhc *FakeHealthCheck) GetTabletHealth(kst KeyspaceShardTabletType, alias * } // Subscribe returns the channel in the struct. Subscribe should only be called in one place for this fake health check -func (fhc *FakeHealthCheck) Subscribe() chan *TabletHealth { +func (fhc *FakeHealthCheck) Subscribe(string) chan *TabletHealth { return fhc.ch } diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 5734749b167..d57d568db40 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -241,7 +241,7 @@ type HealthCheck interface { GetTabletHealthByAlias(alias *topodata.TabletAlias) (*TabletHealth, error) // Subscribe adds a listener. Used by vtgate buffer to learn about primary changes. - Subscribe() chan *TabletHealth + Subscribe(name string) chan *TabletHealth // Unsubscribe removes a listener. Unsubscribe(c chan *TabletHealth) @@ -296,7 +296,7 @@ type HealthCheckImpl struct { // mutex to protect subscribers subMu sync.Mutex // subscribers - subscribers map[chan *TabletHealth]struct{} + subscribers map[chan *TabletHealth]string // loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted loadTabletsTrigger chan struct{} } @@ -361,7 +361,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), - subscribers: make(map[chan *TabletHealth]struct{}), + subscribers: make(map[chan *TabletHealth]string), cellAliases: make(map[string]string), loadTabletsTrigger: make(chan struct{}, 1), } @@ -632,11 +632,11 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) { } // Subscribe adds a listener. Used by vtgate buffer to learn about primary changes. -func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth { +func (hc *HealthCheckImpl) Subscribe(subscriber string) chan *TabletHealth { hc.subMu.Lock() defer hc.subMu.Unlock() c := make(chan *TabletHealth, 2048) - hc.subscribers[c] = struct{}{} + hc.subscribers[c] = subscriber return c } @@ -650,13 +650,13 @@ func (hc *HealthCheckImpl) Unsubscribe(c chan *TabletHealth) { func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { hc.subMu.Lock() defer hc.subMu.Unlock() - for c := range hc.subscribers { + for c, subscriber := range hc.subscribers { select { case c <- th: default: // If the channel is full, we drop the message. hcChannelFullCounter.Add(1) - log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet)) + log.Warningf("HealthCheck broadcast channel is full for %v, dropping message for %s", subscriber, topotools.TabletIdent(th.Tablet)) } } } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 81f7d8b80b1..6ccb7c0b844 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -150,7 +150,7 @@ func TestHealthCheck(t *testing.T) { conn := createFakeConn(tablet, input) // create a channel and subscribe to healthcheck - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") testChecksum(t, 0, hc.stateChecksum()) hc.AddTablet(tablet) testChecksum(t, 1027934207, hc.stateChecksum()) @@ -289,7 +289,7 @@ func TestHealthCheckStreamError(t *testing.T) { tablet := createTestTablet(0, "cell", "a") input := make(chan *querypb.StreamHealthResponse) - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") fc := createFakeConn(tablet, input) fc.errCh = make(chan error) hc.AddTablet(tablet) @@ -353,7 +353,7 @@ func TestHealthCheckErrorOnPrimary(t *testing.T) { tablet := createTestTablet(0, "cell", "a") input := make(chan *querypb.StreamHealthResponse) - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") fc := createFakeConn(tablet, input) fc.errCh = make(chan error) hc.AddTablet(tablet) @@ -414,7 +414,7 @@ func TestHealthCheckErrorOnPrimaryAfterExternalReparent(t *testing.T) { hc := createTestHc(ctx, ts) defer hc.Close() - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") tablet1 := createTestTablet(0, "cell", "a") input1 := make(chan *querypb.StreamHealthResponse) @@ -498,7 +498,7 @@ func TestHealthCheckVerifiesTabletAlias(t *testing.T) { tablet := createTestTablet(0, "cell", "a") input := make(chan *querypb.StreamHealthResponse, 1) fc := createFakeConn(tablet, input) - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(tablet) @@ -543,7 +543,7 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) { tablet := createTestTablet(0, "cell", "a") input := make(chan *querypb.StreamHealthResponse, 1) createFakeConn(tablet, input) - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(tablet) @@ -610,7 +610,7 @@ func TestHealthCheckTimeout(t *testing.T) { tablet := createTestTablet(0, "cell", "a") input := make(chan *querypb.StreamHealthResponse) fc := createFakeConn(tablet, input) - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(tablet) // Immediately after AddTablet() there will be the first notification. want := &TabletHealth{ @@ -692,7 +692,7 @@ func TestWaitForAllServingTablets(t *testing.T) { createFakeConn(tablet, input) // create a channel and subscribe to healthcheck - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(tablet) // there will be a first result, get and discard it <-resultChan @@ -760,7 +760,7 @@ func TestRemoveTablet(t *testing.T) { createFakeConn(tablet, input) // create a channel and subscribe to healthcheck - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(tablet) // there will be a first result, get and discard it <-resultChan @@ -896,7 +896,7 @@ func TestRemoveTabletDuringExternalReparenting(t *testing.T) { thirdTabletConn := createFakeConn(thirdTablet, thirdTabletHealthStream) thirdTabletConn.errCh = make(chan error) - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(firstTablet) <-resultChan @@ -991,7 +991,7 @@ func TestGetHealthyTablets(t *testing.T) { createFakeConn(tablet, input) // create a channel and subscribe to healthcheck - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(tablet) // there will be a first result, get and discard it <-resultChan @@ -1181,7 +1181,7 @@ func TestPrimaryInOtherCell(t *testing.T) { input := make(chan *querypb.StreamHealthResponse) fc := createFakeConn(tablet, input) // create a channel and subscribe to healthcheck - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(tablet) // should get a result, but this will hang if multi-cell logic is broken // so wait and timeout @@ -1241,7 +1241,7 @@ func TestReplicaInOtherCell(t *testing.T) { input := make(chan *querypb.StreamHealthResponse) fc := createFakeConn(local, input) // create a channel and subscribe to healthcheck - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(local) ticker := time.NewTicker(1 * time.Second) @@ -1286,7 +1286,7 @@ func TestReplicaInOtherCell(t *testing.T) { input2 := make(chan *querypb.StreamHealthResponse) fc2 := createFakeConn(remote, input2) // create a channel and subscribe to healthcheck - resultChan2 := hc.Subscribe() + resultChan2 := hc.Subscribe("") hc.AddTablet(remote) // should get a result, but this will hang if multi-cell logic is broken // so wait and timeout @@ -1352,7 +1352,7 @@ func TestCellAliases(t *testing.T) { input := make(chan *querypb.StreamHealthResponse) fc := createFakeConn(tablet, input) // create a channel and subscribe to healthcheck - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") hc.AddTablet(tablet) // should get a result, but this will hang if cell alias logic is broken // so wait and timeout @@ -1405,7 +1405,7 @@ func TestHealthCheckChecksGrpcPort(t *testing.T) { tablet := createTestTablet(0, "cell", "a") tablet.PortMap["grpc"] = 0 - resultChan := hc.Subscribe() + resultChan := hc.Subscribe("") // AddTablet should not add the tablet because port is 0 hc.AddTablet(tablet) @@ -1490,7 +1490,7 @@ func TestConcurrentUpdates(t *testing.T) { // Subscribe to the healthcheck // Make the receiver keep track of the updates received. - ch := hc.Subscribe() + ch := hc.Subscribe("") var totalCount atomic.Int32 go func() { for range ch { diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 147606c553b..d48f58e654c 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -221,7 +221,7 @@ func (kew *KeyspaceEventWatcher) broadcast(ev *KeyspaceEvent) { } func (kew *KeyspaceEventWatcher) run(ctx context.Context) { - hcChan := kew.hc.Subscribe() + hcChan := kew.hc.Subscribe("KeyspaceEventWatcher") bufferCtx, bufferCancel := context.WithCancel(ctx) go func() { diff --git a/go/vt/throttler/demo/throttler_demo.go b/go/vt/throttler/demo/throttler_demo.go index 13927f90c12..d72749c5914 100644 --- a/go/vt/throttler/demo/throttler_demo.go +++ b/go/vt/throttler/demo/throttler_demo.go @@ -246,7 +246,7 @@ func newClient(ctx context.Context, primary *primary, replica *replica, ts *topo throttler: t, stopChan: make(chan struct{}), } - healthcheckCh := c.healthCheck.Subscribe() + healthcheckCh := c.healthCheck.Subscribe("ThrottlerDemo") c.healthcheckCh = healthcheckCh c.healthCheck.AddTablet(replica.fakeTablet.Tablet) return c diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 8b8302d77d4..faa1bca66b3 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -319,7 +319,7 @@ func Init( // ScatterConn depends on TxConn to perform forced rollbacks. sc := NewScatterConn("VttabletCall", tc, gw) // TxResolver depends on TxConn to complete distributed transaction. - tr := txresolver.NewTxResolver(gw.hc.Subscribe(), tc) + tr := txresolver.NewTxResolver(gw.hc.Subscribe("TxResolver"), tc) srvResolver := srvtopo.NewResolver(serv, gw, cell) resolver := NewResolver(srvResolver, serv, cell, sc) vsm := newVStreamManager(srvResolver, serv, cell) @@ -345,7 +345,7 @@ func Init( var si SchemaInfo // default nil var st *vtschema.Tracker if enableSchemaChangeSignal { - st = vtschema.NewTracker(gw.hc.Subscribe(), enableViews, enableUdfs, env.Parser()) + st = vtschema.NewTracker(gw.hc.Subscribe("SchemaTracker"), enableViews, enableUdfs, env.Parser()) addKeyspacesToTracker(ctx, srvResolver, st, gw) si = st } diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_healthcheck_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_healthcheck_test.go index ecc6688fb9d..3d21f052396 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_healthcheck_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_healthcheck_test.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: vitess.io/vitess/go/vt/discovery (interfaces: HealthCheck) +// +// Generated by this command: +// +// mockgen -destination mock_healthcheck_test.go -package txthrottler -mock_names HealthCheck=MockHealthCheck vitess.io/vitess/go/vt/discovery HealthCheck +// // Package txthrottler is a generated GoMock package. package txthrottler @@ -20,6 +25,7 @@ import ( type MockHealthCheck struct { ctrl *gomock.Controller recorder *MockHealthCheckMockRecorder + isgomock struct{} } // MockHealthCheckMockRecorder is the mock recorder for MockHealthCheck. @@ -40,15 +46,15 @@ func (m *MockHealthCheck) EXPECT() *MockHealthCheckMockRecorder { } // AddTablet mocks base method. -func (m *MockHealthCheck) AddTablet(arg0 *topodata.Tablet) { +func (m *MockHealthCheck) AddTablet(tablet *topodata.Tablet) { m.ctrl.T.Helper() - m.ctrl.Call(m, "AddTablet", arg0) + m.ctrl.Call(m, "AddTablet", tablet) } // AddTablet indicates an expected call of AddTablet. -func (mr *MockHealthCheckMockRecorder) AddTablet(arg0 interface{}) *gomock.Call { +func (mr *MockHealthCheckMockRecorder) AddTablet(tablet any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTablet", reflect.TypeOf((*MockHealthCheck)(nil).AddTablet), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTablet", reflect.TypeOf((*MockHealthCheck)(nil).AddTablet), tablet) } // CacheStatus mocks base method. @@ -59,14 +65,6 @@ func (m *MockHealthCheck) CacheStatus() discovery.TabletsCacheStatusList { return ret0 } -// HealthyStatus mocks base method. -func (m *MockHealthCheck) HealthyStatus() discovery.TabletsCacheStatusList { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HealthyStatus") - ret0, _ := ret[0].(discovery.TabletsCacheStatusList) - return ret0 -} - // CacheStatus indicates an expected call of CacheStatus. func (mr *MockHealthCheckMockRecorder) CacheStatus() *gomock.Call { mr.mock.ctrl.T.Helper() @@ -102,17 +100,17 @@ func (mr *MockHealthCheckMockRecorder) Close() *gomock.Call { } // GetHealthyTabletStats mocks base method. -func (m *MockHealthCheck) GetHealthyTabletStats(arg0 *query.Target) []*discovery.TabletHealth { +func (m *MockHealthCheck) GetHealthyTabletStats(target *query.Target) []*discovery.TabletHealth { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetHealthyTabletStats", arg0) + ret := m.ctrl.Call(m, "GetHealthyTabletStats", target) ret0, _ := ret[0].([]*discovery.TabletHealth) return ret0 } // GetHealthyTabletStats indicates an expected call of GetHealthyTabletStats. -func (mr *MockHealthCheckMockRecorder) GetHealthyTabletStats(arg0 interface{}) *gomock.Call { +func (mr *MockHealthCheckMockRecorder) GetHealthyTabletStats(target any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHealthyTabletStats", reflect.TypeOf((*MockHealthCheck)(nil).GetHealthyTabletStats), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHealthyTabletStats", reflect.TypeOf((*MockHealthCheck)(nil).GetHealthyTabletStats), target) } // GetLoadTabletsTrigger mocks base method. @@ -130,33 +128,47 @@ func (mr *MockHealthCheckMockRecorder) GetLoadTabletsTrigger() *gomock.Call { } // GetTabletHealth mocks base method. -func (m *MockHealthCheck) GetTabletHealth(arg0 discovery.KeyspaceShardTabletType, arg1 *topodata.TabletAlias) (*discovery.TabletHealth, error) { +func (m *MockHealthCheck) GetTabletHealth(kst discovery.KeyspaceShardTabletType, alias *topodata.TabletAlias) (*discovery.TabletHealth, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetTabletHealth", arg0, arg1) + ret := m.ctrl.Call(m, "GetTabletHealth", kst, alias) ret0, _ := ret[0].(*discovery.TabletHealth) ret1, _ := ret[1].(error) return ret0, ret1 } // GetTabletHealth indicates an expected call of GetTabletHealth. -func (mr *MockHealthCheckMockRecorder) GetTabletHealth(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockHealthCheckMockRecorder) GetTabletHealth(kst, alias any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTabletHealth", reflect.TypeOf((*MockHealthCheck)(nil).GetTabletHealth), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTabletHealth", reflect.TypeOf((*MockHealthCheck)(nil).GetTabletHealth), kst, alias) } // GetTabletHealthByAlias mocks base method. -func (m *MockHealthCheck) GetTabletHealthByAlias(arg0 *topodata.TabletAlias) (*discovery.TabletHealth, error) { +func (m *MockHealthCheck) GetTabletHealthByAlias(alias *topodata.TabletAlias) (*discovery.TabletHealth, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetTabletHealthByAlias", arg0) + ret := m.ctrl.Call(m, "GetTabletHealthByAlias", alias) ret0, _ := ret[0].(*discovery.TabletHealth) ret1, _ := ret[1].(error) return ret0, ret1 } // GetTabletHealthByAlias indicates an expected call of GetTabletHealthByAlias. -func (mr *MockHealthCheckMockRecorder) GetTabletHealthByAlias(arg0 interface{}) *gomock.Call { +func (mr *MockHealthCheckMockRecorder) GetTabletHealthByAlias(alias any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTabletHealthByAlias", reflect.TypeOf((*MockHealthCheck)(nil).GetTabletHealthByAlias), alias) +} + +// HealthyStatus mocks base method. +func (m *MockHealthCheck) HealthyStatus() discovery.TabletsCacheStatusList { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HealthyStatus") + ret0, _ := ret[0].(discovery.TabletsCacheStatusList) + return ret0 +} + +// HealthyStatus indicates an expected call of HealthyStatus. +func (mr *MockHealthCheckMockRecorder) HealthyStatus() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTabletHealthByAlias", reflect.TypeOf((*MockHealthCheck)(nil).GetTabletHealthByAlias), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HealthyStatus", reflect.TypeOf((*MockHealthCheck)(nil).HealthyStatus)) } // RegisterStats mocks base method. @@ -172,80 +184,80 @@ func (mr *MockHealthCheckMockRecorder) RegisterStats() *gomock.Call { } // RemoveTablet mocks base method. -func (m *MockHealthCheck) RemoveTablet(arg0 *topodata.Tablet) { +func (m *MockHealthCheck) RemoveTablet(tablet *topodata.Tablet) { m.ctrl.T.Helper() - m.ctrl.Call(m, "RemoveTablet", arg0) + m.ctrl.Call(m, "RemoveTablet", tablet) } // RemoveTablet indicates an expected call of RemoveTablet. -func (mr *MockHealthCheckMockRecorder) RemoveTablet(arg0 interface{}) *gomock.Call { +func (mr *MockHealthCheckMockRecorder) RemoveTablet(tablet any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveTablet", reflect.TypeOf((*MockHealthCheck)(nil).RemoveTablet), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveTablet", reflect.TypeOf((*MockHealthCheck)(nil).RemoveTablet), tablet) } // ReplaceTablet mocks base method. -func (m *MockHealthCheck) ReplaceTablet(arg0, arg1 *topodata.Tablet) { +func (m *MockHealthCheck) ReplaceTablet(old, new *topodata.Tablet) { m.ctrl.T.Helper() - m.ctrl.Call(m, "ReplaceTablet", arg0, arg1) + m.ctrl.Call(m, "ReplaceTablet", old, new) } // ReplaceTablet indicates an expected call of ReplaceTablet. -func (mr *MockHealthCheckMockRecorder) ReplaceTablet(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockHealthCheckMockRecorder) ReplaceTablet(old, new any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceTablet", reflect.TypeOf((*MockHealthCheck)(nil).ReplaceTablet), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceTablet", reflect.TypeOf((*MockHealthCheck)(nil).ReplaceTablet), old, new) } // Subscribe mocks base method. -func (m *MockHealthCheck) Subscribe() chan *discovery.TabletHealth { +func (m *MockHealthCheck) Subscribe(name string) chan *discovery.TabletHealth { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Subscribe") + ret := m.ctrl.Call(m, "Subscribe", name) ret0, _ := ret[0].(chan *discovery.TabletHealth) return ret0 } // Subscribe indicates an expected call of Subscribe. -func (mr *MockHealthCheckMockRecorder) Subscribe() *gomock.Call { +func (mr *MockHealthCheckMockRecorder) Subscribe(name any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockHealthCheck)(nil).Subscribe)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockHealthCheck)(nil).Subscribe), name) } // TabletConnection mocks base method. -func (m *MockHealthCheck) TabletConnection(arg0 context.Context, arg1 *topodata.TabletAlias, arg2 *query.Target) (queryservice.QueryService, error) { +func (m *MockHealthCheck) TabletConnection(ctx context.Context, alias *topodata.TabletAlias, target *query.Target) (queryservice.QueryService, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TabletConnection", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "TabletConnection", ctx, alias, target) ret0, _ := ret[0].(queryservice.QueryService) ret1, _ := ret[1].(error) return ret0, ret1 } // TabletConnection indicates an expected call of TabletConnection. -func (mr *MockHealthCheckMockRecorder) TabletConnection(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockHealthCheckMockRecorder) TabletConnection(ctx, alias, target any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TabletConnection", reflect.TypeOf((*MockHealthCheck)(nil).TabletConnection), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TabletConnection", reflect.TypeOf((*MockHealthCheck)(nil).TabletConnection), ctx, alias, target) } // Unsubscribe mocks base method. -func (m *MockHealthCheck) Unsubscribe(arg0 chan *discovery.TabletHealth) { +func (m *MockHealthCheck) Unsubscribe(c chan *discovery.TabletHealth) { m.ctrl.T.Helper() - m.ctrl.Call(m, "Unsubscribe", arg0) + m.ctrl.Call(m, "Unsubscribe", c) } // Unsubscribe indicates an expected call of Unsubscribe. -func (mr *MockHealthCheckMockRecorder) Unsubscribe(arg0 interface{}) *gomock.Call { +func (mr *MockHealthCheckMockRecorder) Unsubscribe(c any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsubscribe", reflect.TypeOf((*MockHealthCheck)(nil).Unsubscribe), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsubscribe", reflect.TypeOf((*MockHealthCheck)(nil).Unsubscribe), c) } // WaitForAllServingTablets mocks base method. -func (m *MockHealthCheck) WaitForAllServingTablets(arg0 context.Context, arg1 []*query.Target) error { +func (m *MockHealthCheck) WaitForAllServingTablets(ctx context.Context, targets []*query.Target) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WaitForAllServingTablets", arg0, arg1) + ret := m.ctrl.Call(m, "WaitForAllServingTablets", ctx, targets) ret0, _ := ret[0].(error) return ret0 } // WaitForAllServingTablets indicates an expected call of WaitForAllServingTablets. -func (mr *MockHealthCheckMockRecorder) WaitForAllServingTablets(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockHealthCheckMockRecorder) WaitForAllServingTablets(ctx, targets any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForAllServingTablets", reflect.TypeOf((*MockHealthCheck)(nil).WaitForAllServingTablets), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForAllServingTablets", reflect.TypeOf((*MockHealthCheck)(nil).WaitForAllServingTablets), ctx, targets) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index 327a37dc43f..00ac81b010b 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: vitess.io/vitess/go/vt/throttler (interfaces: Throttler) +// +// Generated by this command: +// +// mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/throttler Throttler +// // Package txthrottler is a generated GoMock package. package txthrottler @@ -20,6 +25,7 @@ import ( type MockThrottler struct { ctrl *gomock.Controller recorder *MockThrottlerMockRecorder + isgomock struct{} } // MockThrottlerMockRecorder is the mock recorder for MockThrottler. @@ -80,17 +86,17 @@ func (mr *MockThrottlerMockRecorder) Log() *gomock.Call { } // MaxLag mocks base method. -func (m *MockThrottler) MaxLag(arg0 topodata.TabletType) uint32 { +func (m *MockThrottler) MaxLag(tabletType topodata.TabletType) uint32 { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MaxLag", arg0) + ret := m.ctrl.Call(m, "MaxLag", tabletType) ret0, _ := ret[0].(uint32) return ret0 } // MaxLag indicates an expected call of MaxLag. -func (mr *MockThrottlerMockRecorder) MaxLag(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) MaxLag(tabletType any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottler)(nil).MaxLag), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottler)(nil).MaxLag), tabletType) } // MaxRate mocks base method. @@ -108,15 +114,15 @@ func (mr *MockThrottlerMockRecorder) MaxRate() *gomock.Call { } // RecordReplicationLag mocks base method. -func (m *MockThrottler) RecordReplicationLag(arg0 time.Time, arg1 *discovery.TabletHealth) { +func (m *MockThrottler) RecordReplicationLag(time time.Time, th *discovery.TabletHealth) { m.ctrl.T.Helper() - m.ctrl.Call(m, "RecordReplicationLag", arg0, arg1) + m.ctrl.Call(m, "RecordReplicationLag", time, th) } // RecordReplicationLag indicates an expected call of RecordReplicationLag. -func (mr *MockThrottlerMockRecorder) RecordReplicationLag(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) RecordReplicationLag(time, th any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottler)(nil).RecordReplicationLag), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottler)(nil).RecordReplicationLag), time, th) } // ResetConfiguration mocks base method. @@ -132,53 +138,53 @@ func (mr *MockThrottlerMockRecorder) ResetConfiguration() *gomock.Call { } // SetMaxRate mocks base method. -func (m *MockThrottler) SetMaxRate(arg0 int64) { +func (m *MockThrottler) SetMaxRate(rate int64) { m.ctrl.T.Helper() - m.ctrl.Call(m, "SetMaxRate", arg0) + m.ctrl.Call(m, "SetMaxRate", rate) } // SetMaxRate indicates an expected call of SetMaxRate. -func (mr *MockThrottlerMockRecorder) SetMaxRate(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) SetMaxRate(rate any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottler)(nil).SetMaxRate), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottler)(nil).SetMaxRate), rate) } // ThreadFinished mocks base method. -func (m *MockThrottler) ThreadFinished(arg0 int) { +func (m *MockThrottler) ThreadFinished(threadID int) { m.ctrl.T.Helper() - m.ctrl.Call(m, "ThreadFinished", arg0) + m.ctrl.Call(m, "ThreadFinished", threadID) } // ThreadFinished indicates an expected call of ThreadFinished. -func (mr *MockThrottlerMockRecorder) ThreadFinished(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) ThreadFinished(threadID any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottler)(nil).ThreadFinished), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottler)(nil).ThreadFinished), threadID) } // Throttle mocks base method. -func (m *MockThrottler) Throttle(arg0 int) time.Duration { +func (m *MockThrottler) Throttle(threadID int) time.Duration { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Throttle", arg0) + ret := m.ctrl.Call(m, "Throttle", threadID) ret0, _ := ret[0].(time.Duration) return ret0 } // Throttle indicates an expected call of Throttle. -func (mr *MockThrottlerMockRecorder) Throttle(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) Throttle(threadID any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottler)(nil).Throttle), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottler)(nil).Throttle), threadID) } // UpdateConfiguration mocks base method. -func (m *MockThrottler) UpdateConfiguration(arg0 *throttlerdata.Configuration, arg1 bool) error { +func (m *MockThrottler) UpdateConfiguration(configuration *throttlerdata.Configuration, copyZeroValues bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateConfiguration", arg0, arg1) + ret := m.ctrl.Call(m, "UpdateConfiguration", configuration, copyZeroValues) ret0, _ := ret[0].(error) return ret0 } // UpdateConfiguration indicates an expected call of UpdateConfiguration. -func (mr *MockThrottlerMockRecorder) UpdateConfiguration(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) UpdateConfiguration(configuration, copyZeroValues any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottler)(nil).UpdateConfiguration), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottler)(nil).UpdateConfiguration), configuration, copyZeroValues) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index cf0a88ad310..5fc46fdc724 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -311,7 +311,7 @@ func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, t if err != nil { return err } - ts.healthCheckChan = ts.healthCheck.Subscribe() + ts.healthCheckChan = ts.healthCheck.Subscribe("TxThrottler") return nil } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 1d3c9f57e72..9b70981a521 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -67,7 +67,7 @@ func TestEnabledThrottler(t *testing.T) { ts := memorytopo.NewServer(ctx, "cell1", "cell2") mockHealthCheck := NewMockHealthCheck(mockCtrl) - hcCall1 := mockHealthCheck.EXPECT().Subscribe() + hcCall1 := mockHealthCheck.EXPECT().Subscribe("TestEnabledThrottler") hcCall1.Do(func() {}) hcCall2 := mockHealthCheck.EXPECT().RegisterStats() hcCall2.Do(func() {}) From 8c04a0a6691971d9f536dbb15e732d73aebe2d99 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 25 Feb 2025 14:51:09 +0530 Subject: [PATCH 2/4] feat: add printing of the stack trace to healthcheck Signed-off-by: Manan Gupta --- go/vt/discovery/healthcheck.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index d57d568db40..7bb98fb63cd 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -39,6 +39,7 @@ import ( "fmt" "hash/crc32" "net/http" + "runtime" "sort" "strings" "sync" @@ -647,6 +648,12 @@ func (hc *HealthCheckImpl) Unsubscribe(c chan *TabletHealth) { delete(hc.subscribers, c) } +var printStack = sync.OnceFunc(func() { + buf := make([]byte, 10240) // Allocate buffer large enough + n := runtime.Stack(buf, true) + fmt.Printf("All Goroutines Stack Trace:\n%s\n", buf[:n]) +}) + func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { hc.subMu.Lock() defer hc.subMu.Unlock() @@ -657,6 +664,8 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { // If the channel is full, we drop the message. hcChannelFullCounter.Add(1) log.Warningf("HealthCheck broadcast channel is full for %v, dropping message for %s", subscriber, topotools.TabletIdent(th.Tablet)) + // Print the stack trace only once. + printStack() } } } From f129aa0d45d92aa353aa3a802dbb0034dc77bb96 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 25 Feb 2025 15:31:50 +0530 Subject: [PATCH 3/4] feat: fix test expectations Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 9b70981a521..4e3d6d4c60a 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -67,8 +67,8 @@ func TestEnabledThrottler(t *testing.T) { ts := memorytopo.NewServer(ctx, "cell1", "cell2") mockHealthCheck := NewMockHealthCheck(mockCtrl) - hcCall1 := mockHealthCheck.EXPECT().Subscribe("TestEnabledThrottler") - hcCall1.Do(func() {}) + hcCall1 := mockHealthCheck.EXPECT().Subscribe("TxThrottler") + hcCall1.Do(func(string) {}) hcCall2 := mockHealthCheck.EXPECT().RegisterStats() hcCall2.Do(func() {}) hcCall2.After(hcCall1) From 406260d6b86e1e63ea156719f1081846e12a58d2 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 27 Feb 2025 12:21:54 +0530 Subject: [PATCH 4/4] test: add a unit test verifying we have the subscriber names in the healthcheck Signed-off-by: Manan Gupta --- go/vt/discovery/healthcheck_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 6ccb7c0b844..e42a98989b2 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "io" + "slices" "strings" "sync" "sync/atomic" @@ -30,6 +31,7 @@ import ( "github.com/google/safehtml/template" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/grpcclient" @@ -1446,6 +1448,33 @@ func TestTemplate(t *testing.T) { require.Nil(t, err, "error executing template: %v", err) } +// TestHealthCheckImplSubscriberName tests that we have the subscirber name in the healthcheck. +func TestHealthCheckImplSubscriberName(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, nil, "", "", nil) + defer hc.Close() + + subsName := "SubscriberName1" + subsName2 := "SubscriberName2" + ch := hc.Subscribe(subsName) + ch2 := hc.Subscribe(subsName2) + + subsNames := maps.Values(hc.subscribers) + slices.Sort(subsNames) + require.Equal(t, 2, len(subsNames), "expected 2 subscribers") + require.EqualValues(t, []string{subsName, subsName2}, subsNames, "unexpected subscribers") + + hc.Unsubscribe(ch) + subsNames = maps.Values(hc.subscribers) + require.Equal(t, 1, len(subsNames), "expected 1 subscriber") + require.EqualValues(t, []string{subsName2}, subsNames, "unexpected subscribers") + + hc.Unsubscribe(ch2) + subsNames = maps.Values(hc.subscribers) + require.Empty(t, subsNames, "expected no subscribers") +} + func TestDebugURLFormatting(t *testing.T) { defer utils.EnsureNoLeaks(t) TabletURLTemplateString = "https://{{.GetHostNameLevel 0}}.bastion.{{.Tablet.Alias.Cell}}.corp"