Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store the health check subscriber name to improve error message #17863

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (fhc *FakeHealthCheck) GetTabletHealth(kst KeyspaceShardTabletType, alias *
}

// Subscribe returns the channel in the struct. Subscribe should only be called in one place for this fake health check
func (fhc *FakeHealthCheck) Subscribe() chan *TabletHealth {
func (fhc *FakeHealthCheck) Subscribe(string) chan *TabletHealth {
return fhc.ch
}

Expand Down
23 changes: 16 additions & 7 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"fmt"
"hash/crc32"
"net/http"
"runtime"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -241,7 +242,7 @@ type HealthCheck interface {
GetTabletHealthByAlias(alias *topodata.TabletAlias) (*TabletHealth, error)

// Subscribe adds a listener. Used by vtgate buffer to learn about primary changes.
Subscribe() chan *TabletHealth
Subscribe(name string) chan *TabletHealth

// Unsubscribe removes a listener.
Unsubscribe(c chan *TabletHealth)
Expand Down Expand Up @@ -296,7 +297,7 @@ type HealthCheckImpl struct {
// mutex to protect subscribers
subMu sync.Mutex
// subscribers
subscribers map[chan *TabletHealth]struct{}
subscribers map[chan *TabletHealth]string
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
}
Expand Down Expand Up @@ -361,7 +362,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
subscribers: make(map[chan *TabletHealth]struct{}),
subscribers: make(map[chan *TabletHealth]string),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}, 1),
}
Expand Down Expand Up @@ -632,11 +633,11 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) {
}

// Subscribe adds a listener. Used by vtgate buffer to learn about primary changes.
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
func (hc *HealthCheckImpl) Subscribe(subscriber string) chan *TabletHealth {
hc.subMu.Lock()
defer hc.subMu.Unlock()
c := make(chan *TabletHealth, 2048)
hc.subscribers[c] = struct{}{}
hc.subscribers[c] = subscriber
return c
}

Expand All @@ -647,16 +648,24 @@ func (hc *HealthCheckImpl) Unsubscribe(c chan *TabletHealth) {
delete(hc.subscribers, c)
}

var printStack = sync.OnceFunc(func() {
buf := make([]byte, 10240) // Allocate buffer large enough
n := runtime.Stack(buf, true)
fmt.Printf("All Goroutines Stack Trace:\n%s\n", buf[:n])
})
Comment on lines +651 to +655
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this happen only for debugging?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we want to merge this, so that if we exhaust the pool in production we have the stack trace in the logs. This is why its behind a sync.Once call so that we print that only once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help me understand why it is needed to print all go routines stack trace?
What information will use be used from it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can see why the subscriber is blocked and not processing the updates, because we'll be able to see their stack trace.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the reason, we can use the pprof to output the logs with

pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)

runtime.Stack(buf, true) with true could do a stop the world to inspect all goroutines


func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
hc.subMu.Lock()
defer hc.subMu.Unlock()
for c := range hc.subscribers {
for c, subscriber := range hc.subscribers {
select {
case c <- th:
default:
// If the channel is full, we drop the message.
hcChannelFullCounter.Add(1)
log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
log.Warningf("HealthCheck broadcast channel is full for %v, dropping message for %s", subscriber, topotools.TabletIdent(th.Tablet))
// Print the stack trace only once.
printStack()
}
}
}
Expand Down
63 changes: 46 additions & 17 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io"
"slices"
"strings"
"sync"
"sync/atomic"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/google/safehtml/template"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"

"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/grpcclient"
Expand Down Expand Up @@ -150,7 +152,7 @@ func TestHealthCheck(t *testing.T) {
conn := createFakeConn(tablet, input)

// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
testChecksum(t, 0, hc.stateChecksum())
hc.AddTablet(tablet)
testChecksum(t, 1027934207, hc.stateChecksum())
Expand Down Expand Up @@ -289,7 +291,7 @@ func TestHealthCheckStreamError(t *testing.T) {

tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse)
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
fc := createFakeConn(tablet, input)
fc.errCh = make(chan error)
hc.AddTablet(tablet)
Expand Down Expand Up @@ -353,7 +355,7 @@ func TestHealthCheckErrorOnPrimary(t *testing.T) {

tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse)
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
fc := createFakeConn(tablet, input)
fc.errCh = make(chan error)
hc.AddTablet(tablet)
Expand Down Expand Up @@ -414,7 +416,7 @@ func TestHealthCheckErrorOnPrimaryAfterExternalReparent(t *testing.T) {
hc := createTestHc(ctx, ts)
defer hc.Close()

resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")

tablet1 := createTestTablet(0, "cell", "a")
input1 := make(chan *querypb.StreamHealthResponse)
Expand Down Expand Up @@ -498,7 +500,7 @@ func TestHealthCheckVerifiesTabletAlias(t *testing.T) {
tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse, 1)
fc := createFakeConn(tablet, input)
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")

hc.AddTablet(tablet)

Expand Down Expand Up @@ -543,7 +545,7 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) {
tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse, 1)
createFakeConn(tablet, input)
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")

hc.AddTablet(tablet)

Expand Down Expand Up @@ -610,7 +612,7 @@ func TestHealthCheckTimeout(t *testing.T) {
tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
hc.AddTablet(tablet)
// Immediately after AddTablet() there will be the first notification.
want := &TabletHealth{
Expand Down Expand Up @@ -692,7 +694,7 @@ func TestWaitForAllServingTablets(t *testing.T) {
createFakeConn(tablet, input)

// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
hc.AddTablet(tablet)
// there will be a first result, get and discard it
<-resultChan
Expand Down Expand Up @@ -760,7 +762,7 @@ func TestRemoveTablet(t *testing.T) {
createFakeConn(tablet, input)

// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
hc.AddTablet(tablet)
// there will be a first result, get and discard it
<-resultChan
Expand Down Expand Up @@ -896,7 +898,7 @@ func TestRemoveTabletDuringExternalReparenting(t *testing.T) {
thirdTabletConn := createFakeConn(thirdTablet, thirdTabletHealthStream)
thirdTabletConn.errCh = make(chan error)

resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")

hc.AddTablet(firstTablet)
<-resultChan
Expand Down Expand Up @@ -991,7 +993,7 @@ func TestGetHealthyTablets(t *testing.T) {
createFakeConn(tablet, input)

// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
hc.AddTablet(tablet)
// there will be a first result, get and discard it
<-resultChan
Expand Down Expand Up @@ -1181,7 +1183,7 @@ func TestPrimaryInOtherCell(t *testing.T) {
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
hc.AddTablet(tablet)
// should get a result, but this will hang if multi-cell logic is broken
// so wait and timeout
Expand Down Expand Up @@ -1241,7 +1243,7 @@ func TestReplicaInOtherCell(t *testing.T) {
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(local, input)
// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
hc.AddTablet(local)

ticker := time.NewTicker(1 * time.Second)
Expand Down Expand Up @@ -1286,7 +1288,7 @@ func TestReplicaInOtherCell(t *testing.T) {
input2 := make(chan *querypb.StreamHealthResponse)
fc2 := createFakeConn(remote, input2)
// create a channel and subscribe to healthcheck
resultChan2 := hc.Subscribe()
resultChan2 := hc.Subscribe("")
hc.AddTablet(remote)
// should get a result, but this will hang if multi-cell logic is broken
// so wait and timeout
Expand Down Expand Up @@ -1352,7 +1354,7 @@ func TestCellAliases(t *testing.T) {
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")
hc.AddTablet(tablet)
// should get a result, but this will hang if cell alias logic is broken
// so wait and timeout
Expand Down Expand Up @@ -1405,7 +1407,7 @@ func TestHealthCheckChecksGrpcPort(t *testing.T) {

tablet := createTestTablet(0, "cell", "a")
tablet.PortMap["grpc"] = 0
resultChan := hc.Subscribe()
resultChan := hc.Subscribe("")

// AddTablet should not add the tablet because port is 0
hc.AddTablet(tablet)
Expand Down Expand Up @@ -1446,6 +1448,33 @@ func TestTemplate(t *testing.T) {
require.Nil(t, err, "error executing template: %v", err)
}

// TestHealthCheckImplSubscriberName tests that we have the subscirber name in the healthcheck.
func TestHealthCheckImplSubscriberName(t *testing.T) {
ctx := utils.LeakCheckContext(t)

hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, nil, "", "", nil)
defer hc.Close()

subsName := "SubscriberName1"
subsName2 := "SubscriberName2"
ch := hc.Subscribe(subsName)
ch2 := hc.Subscribe(subsName2)

subsNames := maps.Values(hc.subscribers)
slices.Sort(subsNames)
require.Equal(t, 2, len(subsNames), "expected 2 subscribers")
require.EqualValues(t, []string{subsName, subsName2}, subsNames, "unexpected subscribers")

hc.Unsubscribe(ch)
subsNames = maps.Values(hc.subscribers)
require.Equal(t, 1, len(subsNames), "expected 1 subscriber")
require.EqualValues(t, []string{subsName2}, subsNames, "unexpected subscribers")

hc.Unsubscribe(ch2)
subsNames = maps.Values(hc.subscribers)
require.Empty(t, subsNames, "expected no subscribers")
}

func TestDebugURLFormatting(t *testing.T) {
defer utils.EnsureNoLeaks(t)
TabletURLTemplateString = "https://{{.GetHostNameLevel 0}}.bastion.{{.Tablet.Alias.Cell}}.corp"
Expand Down Expand Up @@ -1490,7 +1519,7 @@ func TestConcurrentUpdates(t *testing.T) {

// Subscribe to the healthcheck
// Make the receiver keep track of the updates received.
ch := hc.Subscribe()
ch := hc.Subscribe("")
var totalCount atomic.Int32
go func() {
for range ch {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (kew *KeyspaceEventWatcher) broadcast(ev *KeyspaceEvent) {
}

func (kew *KeyspaceEventWatcher) run(ctx context.Context) {
hcChan := kew.hc.Subscribe()
hcChan := kew.hc.Subscribe("KeyspaceEventWatcher")
bufferCtx, bufferCancel := context.WithCancel(ctx)

go func() {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func newClient(ctx context.Context, primary *primary, replica *replica, ts *topo
throttler: t,
stopChan: make(chan struct{}),
}
healthcheckCh := c.healthCheck.Subscribe()
healthcheckCh := c.healthCheck.Subscribe("ThrottlerDemo")
c.healthcheckCh = healthcheckCh
c.healthCheck.AddTablet(replica.fakeTablet.Tablet)
return c
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func Init(
// ScatterConn depends on TxConn to perform forced rollbacks.
sc := NewScatterConn("VttabletCall", tc, gw)
// TxResolver depends on TxConn to complete distributed transaction.
tr := txresolver.NewTxResolver(gw.hc.Subscribe(), tc)
tr := txresolver.NewTxResolver(gw.hc.Subscribe("TxResolver"), tc)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
resolver := NewResolver(srvResolver, serv, cell, sc)
vsm := newVStreamManager(srvResolver, serv, cell)
Expand All @@ -345,7 +345,7 @@ func Init(
var si SchemaInfo // default nil
var st *vtschema.Tracker
if enableSchemaChangeSignal {
st = vtschema.NewTracker(gw.hc.Subscribe(), enableViews, enableUdfs, env.Parser())
st = vtschema.NewTracker(gw.hc.Subscribe("SchemaTracker"), enableViews, enableUdfs, env.Parser())
addKeyspacesToTracker(ctx, srvResolver, st, gw)
si = st
}
Expand Down
Loading
Loading