Skip to content

Commit

Permalink
kvclient(ticdc): add worker busy monitor (#10389) (#10412)
Browse files Browse the repository at this point in the history
close #10388
  • Loading branch information
ti-chi-bot authored Jan 25, 2024
1 parent f4ff47d commit e5999e3
Show file tree
Hide file tree
Showing 18 changed files with 239 additions and 145 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ type ReplicaConfig struct {
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
EnableSyncPoint bool `json:"enable_sync_point"`
EnableTableMonitor bool `json:"enable_table_monitor,omitempty"`
BDRMode bool `json:"bdr_mode"`

SyncPointInterval *JSONDuration `json:"sync_point_interval" swaggertype:"string"`
Expand Down Expand Up @@ -219,6 +220,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.ForceReplicate = c.ForceReplicate
res.CheckGCSafePoint = c.CheckGCSafePoint
res.EnableSyncPoint = c.EnableSyncPoint
res.EnableTableMonitor = c.EnableTableMonitor
res.SQLMode = c.SQLMode
if c.SyncPointInterval != nil {
res.SyncPointInterval = c.SyncPointInterval.duration
Expand Down Expand Up @@ -392,6 +394,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
IgnoreIneligibleTable: false,
CheckGCSafePoint: cloned.CheckGCSafePoint,
EnableSyncPoint: cloned.EnableSyncPoint,
EnableTableMonitor: cloned.EnableTableMonitor,
SyncPointInterval: &JSONDuration{cloned.SyncPointInterval},
SyncPointRetention: &JSONDuration{cloned.SyncPointRetention},
BDRMode: cloned.BDRMode,
Expand Down
70 changes: 52 additions & 18 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ const (
resolveLockMinInterval = 10 * time.Second

scanRegionsConcurrency = 1024

tableMonitorInterval = 2 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -143,6 +145,7 @@ type CDCKVClient interface {
ts uint64,
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) error

// RegionCount returns the number of captured regions.
Expand Down Expand Up @@ -273,8 +276,9 @@ func (c *CDCClient) EventFeed(
ctx context.Context, span regionspan.ComparableSpan, ts uint64,
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) error {
s := newEventFeedSession(c, span, lockResolver, ts, eventCh)
s := newEventFeedSession(c, span, lockResolver, ts, eventCh, enableTableMonitor)
return s.eventFeed(ctx)
}

Expand Down Expand Up @@ -352,11 +356,10 @@ type eventFeedSession struct {

rangeLock *regionspan.RegionRangeLock

// To identify metrics of different eventFeedSession
id string
regionChSizeGauge prometheus.Gauge
errChSizeGauge prometheus.Gauge
rangeChSizeGauge prometheus.Gauge
enableTableMonitor bool
regionChSizeGauge prometheus.Gauge
errChSizeGauge prometheus.Gauge
rangeChSizeGauge prometheus.Gauge

streams map[string]*eventFeedStream
streamsLock sync.RWMutex
Expand All @@ -377,9 +380,9 @@ func newEventFeedSession(
lockResolver txnutil.LockResolver,
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) *eventFeedSession {
id := allocID()
idStr := strconv.FormatUint(id, 10)
rangeLock := regionspan.NewRegionRangeLock(
id, totalSpan.Start, totalSpan.End, startTs,
client.changefeed.Namespace+"."+client.changefeed.ID)
Expand All @@ -390,16 +393,19 @@ func newEventFeedSession(
tableID: client.tableID,
tableName: client.tableName,

totalSpan: totalSpan,
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
id: idStr,
regionChSizeGauge: clientChannelSize.WithLabelValues("region"),
errChSizeGauge: clientChannelSize.WithLabelValues("err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues("range"),
streams: make(map[string]*eventFeedStream),
streamsCanceller: make(map[string]context.CancelFunc),
totalSpan: totalSpan,
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
enableTableMonitor: enableTableMonitor,
regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"),
errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"),
streams: make(map[string]*eventFeedStream),
streamsCanceller: make(map[string]context.CancelFunc),
resolvedTsPool: sync.Pool{
New: func() any {
return &regionStatefulEvent{
Expand Down Expand Up @@ -1012,6 +1018,10 @@ func (s *eventFeedSession) receiveFromStream(

metricSendEventBatchResolvedSize := batchResolvedEventSize.
WithLabelValues(s.changefeed.Namespace, s.changefeed.ID)
metricReceiveBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver")
metricProcessBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-processor")

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outer code logic
Expand All @@ -1031,7 +1041,7 @@ func (s *eventFeedSession) receiveFromStream(

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
err := handleExit(worker.run())
err := handleExit(worker.run(s.enableTableMonitor))
if err != nil {
log.Error("region worker exited with error", zap.Error(err),
zap.Any("changefeed", s.changefeed),
Expand All @@ -1042,10 +1052,32 @@ func (s *eventFeedSession) receiveFromStream(
})

receiveEvents := func() error {
var receiveTime time.Duration
var processTime time.Duration
startToWork := time.Now()

maxCommitTs := model.Ts(0)
for {
startToReceive := time.Now()
cevent, err := stream.Recv()

if s.enableTableMonitor {
receiveTime += time.Since(startToReceive)
if time.Since(startToWork) >= tableMonitorInterval {
now := time.Now()
// Receive busyRatio indicates the blocking time (receive and decode grpc msg) of the worker.
busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100
metricReceiveBusyRatio.Set(busyRatio)
receiveTime = 0
// Process busyRatio indicates the working time (dispatch to region worker) of the worker.
busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100
metricProcessBusyRatio.Set(busyRatio)
processTime = 0

startToWork = now
}
}

failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) {
if op.(string) == "error" {
_ = worker.sendEvents(ctx, []*regionStatefulEvent{nil})
Expand Down Expand Up @@ -1104,6 +1136,7 @@ func (s *eventFeedSession) receiveFromStream(
return nil
}

startToProcess := time.Now()
size := cevent.Size()
if size > warnRecvMsgSizeThreshold {
regionCount := 0
Expand Down Expand Up @@ -1148,6 +1181,7 @@ func (s *eventFeedSession) receiveFromStream(
tsStat.commitTs.Store(maxCommitTs)
}
}
processTime += time.Since(startToProcess)
}
}
eg.Go(func() error {
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
go func() {
err := cdcClient.EventFeed(ctx,
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
100, lockResolver, eventCh)
100, lockResolver, eventCh, false)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func prepareBench(b *testing.B, regionNum int) (
go func() {
err := cdcClient.EventFeed(ctx,
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")},
100, lockResolver, eventCh)
100, lockResolver, eventCh, false)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down
Loading

0 comments on commit e5999e3

Please sign in to comment.