From e5999e3c21bee8748126951a869522e182d207b6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 25 Jan 2024 15:57:21 +0800 Subject: [PATCH] kvclient(ticdc): add worker busy monitor (#10389) (#10412) close pingcap/tiflow#10388 --- cdc/api/v2/model.go | 3 + cdc/kv/client.go | 70 +++++-- cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 51 ++--- cdc/kv/metrics.go | 18 +- cdc/kv/region_worker.go | 180 ++++++++++-------- cdc/model/changefeed_test.go | 1 + cdc/processor/pipeline/puller.go | 1 + cdc/processor/processor.go | 2 +- cdc/processor/sinkmanager/manager_test.go | 2 +- .../sinkmanager/table_sink_worker_test.go | 2 +- cdc/processor/sourcemanager/manager.go | 19 +- .../sourcemanager/puller/puller_wrapper.go | 2 + cdc/puller/ddl_puller.go | 1 + cdc/puller/puller.go | 8 +- cdc/puller/puller_test.go | 3 +- pkg/config/config_test_data.go | 3 + pkg/config/replica_config.go | 14 +- 18 files changed, 239 insertions(+), 145 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 602470f5d89..2e35b8deac3 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -189,6 +189,7 @@ type ReplicaConfig struct { IgnoreIneligibleTable bool `json:"ignore_ineligible_table"` CheckGCSafePoint bool `json:"check_gc_safe_point"` EnableSyncPoint bool `json:"enable_sync_point"` + EnableTableMonitor bool `json:"enable_table_monitor,omitempty"` BDRMode bool `json:"bdr_mode"` SyncPointInterval *JSONDuration `json:"sync_point_interval" swaggertype:"string"` @@ -219,6 +220,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( res.ForceReplicate = c.ForceReplicate res.CheckGCSafePoint = c.CheckGCSafePoint res.EnableSyncPoint = c.EnableSyncPoint + res.EnableTableMonitor = c.EnableTableMonitor res.SQLMode = c.SQLMode if c.SyncPointInterval != nil { res.SyncPointInterval = c.SyncPointInterval.duration @@ -392,6 +394,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { IgnoreIneligibleTable: false, CheckGCSafePoint: cloned.CheckGCSafePoint, EnableSyncPoint: cloned.EnableSyncPoint, + EnableTableMonitor: cloned.EnableTableMonitor, SyncPointInterval: &JSONDuration{cloned.SyncPointInterval}, SyncPointRetention: &JSONDuration{cloned.SyncPointRetention}, BDRMode: cloned.BDRMode, diff --git a/cdc/kv/client.go b/cdc/kv/client.go index bde714df5de..7e7e3604931 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -75,6 +75,8 @@ const ( resolveLockMinInterval = 10 * time.Second scanRegionsConcurrency = 1024 + + tableMonitorInterval = 2 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -143,6 +145,7 @@ type CDCKVClient interface { ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error // RegionCount returns the number of captured regions. @@ -273,8 +276,9 @@ func (c *CDCClient) EventFeed( ctx context.Context, span regionspan.ComparableSpan, ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error { - s := newEventFeedSession(c, span, lockResolver, ts, eventCh) + s := newEventFeedSession(c, span, lockResolver, ts, eventCh, enableTableMonitor) return s.eventFeed(ctx) } @@ -352,11 +356,10 @@ type eventFeedSession struct { rangeLock *regionspan.RegionRangeLock - // To identify metrics of different eventFeedSession - id string - regionChSizeGauge prometheus.Gauge - errChSizeGauge prometheus.Gauge - rangeChSizeGauge prometheus.Gauge + enableTableMonitor bool + regionChSizeGauge prometheus.Gauge + errChSizeGauge prometheus.Gauge + rangeChSizeGauge prometheus.Gauge streams map[string]*eventFeedStream streamsLock sync.RWMutex @@ -377,9 +380,9 @@ func newEventFeedSession( lockResolver txnutil.LockResolver, startTs uint64, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) *eventFeedSession { id := allocID() - idStr := strconv.FormatUint(id, 10) rangeLock := regionspan.NewRegionRangeLock( id, totalSpan.Start, totalSpan.End, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) @@ -390,16 +393,19 @@ func newEventFeedSession( tableID: client.tableID, tableName: client.tableName, - totalSpan: totalSpan, - eventCh: eventCh, - rangeLock: rangeLock, - lockResolver: lockResolver, - id: idStr, - regionChSizeGauge: clientChannelSize.WithLabelValues("region"), - errChSizeGauge: clientChannelSize.WithLabelValues("err"), - rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), - streams: make(map[string]*eventFeedStream), - streamsCanceller: make(map[string]context.CancelFunc), + totalSpan: totalSpan, + eventCh: eventCh, + rangeLock: rangeLock, + lockResolver: lockResolver, + enableTableMonitor: enableTableMonitor, + regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), + errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), + rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), + streams: make(map[string]*eventFeedStream), + streamsCanceller: make(map[string]context.CancelFunc), resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -1012,6 +1018,10 @@ func (s *eventFeedSession) receiveFromStream( metricSendEventBatchResolvedSize := batchResolvedEventSize. WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) + metricReceiveBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver") + metricProcessBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-processor") // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic @@ -1031,7 +1041,7 @@ func (s *eventFeedSession) receiveFromStream( eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - err := handleExit(worker.run()) + err := handleExit(worker.run(s.enableTableMonitor)) if err != nil { log.Error("region worker exited with error", zap.Error(err), zap.Any("changefeed", s.changefeed), @@ -1042,10 +1052,32 @@ func (s *eventFeedSession) receiveFromStream( }) receiveEvents := func() error { + var receiveTime time.Duration + var processTime time.Duration + startToWork := time.Now() + maxCommitTs := model.Ts(0) for { + startToReceive := time.Now() cevent, err := stream.Recv() + if s.enableTableMonitor { + receiveTime += time.Since(startToReceive) + if time.Since(startToWork) >= tableMonitorInterval { + now := time.Now() + // Receive busyRatio indicates the blocking time (receive and decode grpc msg) of the worker. + busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + metricReceiveBusyRatio.Set(busyRatio) + receiveTime = 0 + // Process busyRatio indicates the working time (dispatch to region worker) of the worker. + busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + metricProcessBusyRatio.Set(busyRatio) + processTime = 0 + + startToWork = now + } + } + failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) { if op.(string) == "error" { _ = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) @@ -1104,6 +1136,7 @@ func (s *eventFeedSession) receiveFromStream( return nil } + startToProcess := time.Now() size := cevent.Size() if size > warnRecvMsgSizeThreshold { regionCount := 0 @@ -1148,6 +1181,7 @@ func (s *eventFeedSession) receiveFromStream( tsStat.commitTs.Store(maxCommitTs) } } + processTime += time.Since(startToProcess) } } eg.Go(func() error { diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 4c00a77064c..c517e71afc9 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -202,7 +202,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) if errors.Cause(err) != context.Canceled { b.Error(err) } @@ -296,7 +296,7 @@ func prepareBench(b *testing.B, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) if errors.Cause(err) != context.Canceled { b.Error(err) } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index b9f9cf98e60..6f2ad0f682c 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -329,7 +329,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 1, lockResolver, eventCh) + 1, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -429,7 +429,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 1, lockResolver, eventCh) + 1, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -529,7 +529,7 @@ func TestHandleError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -689,7 +689,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.True(t, cerror.ErrVersionIncompatible.Equal(err)) }() @@ -757,7 +757,7 @@ func TestClusterIDMismatch(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.True(t, cerror.ErrClusterIDMismatch.Equal(err)) }() @@ -824,7 +824,7 @@ func testHandleFeedEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1285,7 +1285,7 @@ func TestStreamSendWithError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockerResolver, eventCh) + 100, lockerResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1397,7 +1397,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1531,7 +1531,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer close(eventCh) err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1740,7 +1740,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1817,7 +1817,7 @@ func TestNoPendingRegionError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1895,7 +1895,7 @@ func TestDropStaleRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2009,7 +2009,7 @@ func TestResolveLock(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2116,7 +2116,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer clientWg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, errUnreachable, err) }() @@ -2243,7 +2243,7 @@ func testEventAfterFeedStop(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2430,7 +2430,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2648,7 +2648,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2744,7 +2744,7 @@ func TestFailRegionReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2827,7 +2827,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2895,7 +2895,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2973,7 +2973,7 @@ func testKVClientForceReconnect(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3124,7 +3124,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3241,7 +3241,7 @@ func TestEvTimeUpdate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3367,7 +3367,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3461,7 +3461,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer wg.Done() err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3539,5 +3539,6 @@ func createFakeEventFeedSession() *eventFeedSession { nil, /*lockResolver*/ 100, /*startTs*/ nil, /*eventCh*/ + false, ) } diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index 4f063ed470d..31142175274 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -71,7 +71,7 @@ var ( Subsystem: "kvclient", Name: "channel_size", Help: "size of each channel in kv client", - }, []string{"channel"}) + }, []string{"namespace", "changefeed", "table", "type"}) clientRegionTokenSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -110,6 +110,20 @@ var ( Help: "region events batch size", Buckets: prometheus.ExponentialBuckets(1, 2, 20), }) + workerBusyRatio = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_worker_busy_ratio", + Help: "Busy ratio (X ms in 1s) for region worker.", + }, []string{"namespace", "changefeed", "table", "store", "type"}) + workerChannelSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_worker_channel_size", + Help: "size of each channel in region worker", + }, []string{"namespace", "changefeed", "table", "store", "type"}) ) // InitMetrics registers all metrics in the kv package @@ -126,6 +140,8 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(batchResolvedEventSize) registry.MustRegister(grpcPoolStreamGauge) registry.MustRegister(regionEventsBatchSize) + registry.MustRegister(workerBusyRatio) + registry.MustRegister(workerChannelSize) // Register client metrics to registry. registry.MustRegister(grpcMetrics) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index e7a4b24eea1..9a366dc50a8 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -18,6 +18,7 @@ import ( "encoding/hex" "reflect" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -74,6 +75,9 @@ type regionWorkerMetrics struct { metricSendEventResolvedCounter prometheus.Counter metricSendEventCommitCounter prometheus.Counter metricSendEventCommittedCounter prometheus.Counter + + metricWorkerBusyRatio prometheus.Gauge + metricWorkerChannelSize prometheus.Gauge } /* @@ -116,7 +120,7 @@ type regionWorker struct { pendingRegions *syncRegionFeedStateMap } -func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { +func newRegionWorkerMetrics(changefeedID model.ChangeFeedID, tableID string, storeAddr string) *regionWorkerMetrics { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped") @@ -139,6 +143,11 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric metrics.metricSendEventCommittedCounter = sendEventCounter. WithLabelValues("committed", changefeedID.Namespace, changefeedID.ID) + metrics.metricWorkerBusyRatio = workerBusyRatio.WithLabelValues( + changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "event-handler") + metrics.metricWorkerChannelSize = workerChannelSize.WithLabelValues( + changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "input") + return metrics } @@ -157,7 +166,7 @@ func newRegionWorker( rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, concurrency: s.client.config.KVClient.WorkerConcurrent, - metrics: newRegionWorkerMetrics(changefeedID), + metrics: newRegionWorkerMetrics(changefeedID, strconv.FormatInt(s.tableID, 10), addr), inputPending: 0, pendingRegions: pendingRegions, @@ -429,23 +438,42 @@ func (w *regionWorker) onHandleExit(err error) { } } -func (w *regionWorker) eventHandler(ctx context.Context) error { - pollEvents := func() ([]*regionStatefulEvent, error) { - exitFn := func() error { - log.Info("region worker closed by error", - zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID)) - return cerror.ErrRegionWorkerExit.GenWithStackByArgs() - } +func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool) error { + exitFn := func() error { + log.Info("region worker closed by error", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) + return cerror.ErrRegionWorkerExit.GenWithStackByArgs() + } + + metricsTicker := time.NewTicker(tableMonitorInterval) + defer metricsTicker.Stop() + var processTime time.Duration + startToWork := time.Now() + highWatermarkMet := false + for { select { case <-ctx.Done(): - return nil, errors.Trace(ctx.Err()) + return errors.Trace(ctx.Err()) case err := <-w.errorCh: - return nil, errors.Trace(err) + return errors.Trace(err) + case <-metricsTicker.C: + if enableTableMonitor { + w.metrics.metricWorkerChannelSize.Set(float64(len(w.inputCh))) + + now := time.Now() + // busyRatio indicates the actual working time of the worker. + busyRatio := processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + w.metrics.metricWorkerBusyRatio.Set(busyRatio) + startToWork = now + processTime = 0 + } case events, ok := <-w.inputCh: if !ok { - return nil, exitFn() + return exitFn() } if len(events) == 0 { log.Panic("regionWorker.inputCh doesn't accept empty slice") @@ -454,80 +482,74 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { // event == nil means the region worker should exit and re-establish // all existing regions. if event == nil { - return nil, exitFn() + return exitFn() } } - return events, nil - } - } - - highWatermarkMet := false - for { - events, err := pollEvents() - if err != nil { - return err - } - regionEventsBatchSize.Observe(float64(len(events))) - inputPending := atomic.LoadInt32(&w.inputPending) - if highWatermarkMet { - highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark - } else { - highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark - } - atomic.AddInt32(&w.inputPending, -int32(len(events))) + regionEventsBatchSize.Observe(float64(len(events))) - if highWatermarkMet { - // All events in one batch can be hashed into one handle slot. - slot := w.inputCalcSlot(events[0].regionID) - eventsX := make([]interface{}, 0, len(events)) - for _, event := range events { - eventsX = append(eventsX, event) - } - err = w.handles[slot].AddEvents(ctx, eventsX) - if err != nil { - return err + start := time.Now() + inputPending := atomic.LoadInt32(&w.inputPending) + if highWatermarkMet { + highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark + } else { + highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark } - // Principle: events from the same region must be processed linearly. - // - // When buffered events exceed high watermark, we start to use worker - // pool to improve throughput, and we need a mechanism to quit worker - // pool when buffered events are less than low watermark, which means - // we should have a way to know whether events sent to the worker pool - // are all processed. - // Send a dummy event to each worker pool handler, after each of these - // events are processed, we can ensure all events sent to worker pool - // from this region worker are processed. - finishedCallbackCh := make(chan struct{}, 1) - err = w.handles[slot].AddEvent(ctx, ®ionStatefulEvent{finishedCallbackCh: finishedCallbackCh}) - if err != nil { - return err - } - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case err = <-w.errorCh: - return err - case <-finishedCallbackCh: - } - } else { - // We measure whether the current worker is busy based on the input - // channel size. If the buffered event count is larger than the high - // watermark, we send events to worker pool to increase processing - // throughput. Otherwise, we process event in local region worker to - // ensure low processing latency. - for _, event := range events { - err = w.processEvent(ctx, event) + atomic.AddInt32(&w.inputPending, -int32(len(events))) + + if highWatermarkMet { + // All events in one batch can be hashed into one handle slot. + slot := w.inputCalcSlot(events[0].regionID) + eventsX := make([]interface{}, 0, len(events)) + for _, event := range events { + eventsX = append(eventsX, event) + } + err := w.handles[slot].AddEvents(ctx, eventsX) if err != nil { return err } + // Principle: events from the same region must be processed linearly. + // + // When buffered events exceed high watermark, we start to use worker + // pool to improve throughput, and we need a mechanism to quit worker + // pool when buffered events are less than low watermark, which means + // we should have a way to know whether events sent to the worker pool + // are all processed. + // Send a dummy event to each worker pool handler, after each of these + // events are processed, we can ensure all events sent to worker pool + // from this region worker are processed. + finishedCallbackCh := make(chan struct{}, 1) + err = w.handles[slot].AddEvent(ctx, ®ionStatefulEvent{finishedCallbackCh: finishedCallbackCh}) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err = <-w.errorCh: + return err + case <-finishedCallbackCh: + } + } else { + // We measure whether the current worker is busy based on the input + // channel size. If the buffered event count is larger than the high + // watermark, we send events to worker pool to increase processing + // throughput. Otherwise, we process event in local region worker to + // ensure low processing latency. + for _, event := range events { + err := w.processEvent(ctx, event) + if err != nil { + return err + } + } } - } - for _, ev := range events { - // resolved ts event has been consumed, it is safe to put back. - if ev.resolvedTsEvent != nil { - w.session.resolvedTsPool.Put(ev) + for _, ev := range events { + // resolved ts event has been consumed, it is safe to put back. + if ev.resolvedTsEvent != nil { + w.session.resolvedTsPool.Put(ev) + } } + processTime += time.Since(start) } } } @@ -581,7 +603,7 @@ func (w *regionWorker) cancelStream(delay time.Duration) { } } -func (w *regionWorker) run() error { +func (w *regionWorker) run(enableTableMonitor bool) error { defer func() { for _, h := range w.handles { h.Unregister() @@ -606,7 +628,7 @@ func (w *regionWorker) run() error { return handleError(w.checkErrorReconnect(w.resolveLock(ctx))) }) wg.Go(func() error { - return handleError(w.eventHandler(ctx)) + return handleError(w.eventHandler(ctx, enableTableMonitor)) }) _ = handleError(w.collectWorkpoolError(ctx)) _ = wg.Wait() diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index a13c9890c30..00720949e28 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -148,6 +148,7 @@ func TestVerifyAndComplete(t *testing.T) { MemoryQuota: 1073741824, CaseSensitive: false, EnableOldValue: true, + EnableTableMonitor: false, CheckGCSafePoint: true, SyncPointInterval: time.Minute * 10, SyncPointRetention: time.Hour * 24, diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 188b0167e97..5e5b63d9e44 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -85,6 +85,7 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext, n.tableID, n.tableName, filterLoop, + false, ) n.wg.Go(func() error { ctx.Throw(errors.Trace(n.plr.Run(ctxC))) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 342739ae8c0..9703986f517 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -831,7 +831,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { return errors.Trace(err) } p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, p.mg, - sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode) + sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, p.changefeed.Info.Config.EnableTableMonitor) p.sinkManager, err = sinkmanager.New(stdCtx, p.changefeedID, p.changefeed.Info, p.upstream, p.schemaStorage, p.redoDMLMgr, p.sourceManager, diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 26d0e8fdfc0..6c4ce3ba9ce 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -57,7 +57,7 @@ func createManagerWithMemEngine( ) (*SinkManager, engine.SortEngine) { sortEngine := memory.New(context.Background()) up := upstream.NewUpstream4Test(&mockPD{}) - sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan, false) + sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan, false, false) manager, err := New( ctx, changefeedID, changefeedInfo, up, &entry.MockSchemaStorage{Resolved: math.MaxUint64}, diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 4f9891deb9a..bf72629c302 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -39,7 +39,7 @@ func createWorker( ) (*sinkWorker, engine.SortEngine) { sortEngine := memory.New(context.Background()) sm := sourcemanager.New(changefeedID, upstream.NewUpstream4Test(&mockPD{}), - &entry.MockMountGroup{}, sortEngine, make(chan error, 1), false) + &entry.MockMountGroup{}, sortEngine, make(chan error, 1), false, false) // To avoid refund or release panics. quota := memquota.NewMemQuota(changefeedID, memQuota+1024*1024*1024, "") diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 15dc8cfcb63..329cc84f0c5 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -48,7 +48,8 @@ type SourceManager struct { // Used to report the error to the processor. errChan chan error // Used to indicate whether the changefeed is in BDR mode. - bdrMode bool + bdrMode bool + enableTableMonitor bool } // New creates a new source manager. @@ -59,14 +60,16 @@ func New( engine engine.SortEngine, errChan chan error, bdrMode bool, + enableTableMonitor bool, ) *SourceManager { return &SourceManager{ - changefeedID: changefeedID, - up: up, - mg: mg, - engine: engine, - errChan: errChan, - bdrMode: bdrMode, + changefeedID: changefeedID, + up: up, + mg: mg, + engine: engine, + errChan: errChan, + bdrMode: bdrMode, + enableTableMonitor: enableTableMonitor, } } @@ -75,7 +78,7 @@ func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID, // Add table to the engine first, so that the engine can receive the events from the puller. m.engine.AddTable(tableID) p := pullerwrapper.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs, m.bdrMode) - p.Start(ctx, m.up, m.engine, m.errChan) + p.Start(ctx, m.up, m.engine, m.errChan, m.enableTableMonitor) m.pullers.Store(tableID, p) } diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 5aa38008a97..a01045c8929 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -78,6 +78,7 @@ func (n *Wrapper) Start( up *upstream.Upstream, eventSortEngine engine.SortEngine, errChan chan<- error, + enableTableMonitor bool, ) { failpoint.Inject("ProcessorAddTableError", func() { errChan <- cerrors.New("processor add table injected error") @@ -102,6 +103,7 @@ func (n *Wrapper) Start( n.tableID, n.tableName, n.bdrMode, + enableTableMonitor, ) n.wg.Add(1) go func() { diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index ed7f505ca37..d29b131acc8 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -511,6 +511,7 @@ func NewDDLJobPuller( changefeed, -1, DDLPullerTableName, ddLPullerFilterLoop, + false, ), kvStorage: kvStorage, outputCh: make(chan *model.DDLJobEntry, defaultPullerOutputChanSize), diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index a89559f5445..930571dddb3 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -80,6 +80,8 @@ type pullerImpl struct { lastForwardResolvedTs uint64 // startResolvedTs is the resolvedTs when puller is initialized startResolvedTs uint64 + + enableTableMonitor bool } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -96,6 +98,7 @@ func New(ctx context.Context, tableID model.TableID, tableName string, filterLoop bool, + enableTableMonitor bool, ) Puller { tikvStorage, ok := kvStorage.(tikv.Storage) if !ok { @@ -130,7 +133,8 @@ func New(ctx context.Context, tableName: tableName, cfg: cfg, - startResolvedTs: checkpointTs, + startResolvedTs: checkpointTs, + enableTableMonitor: enableTableMonitor, } return p } @@ -148,7 +152,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { span := span g.Go(func() error { - return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh) + return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh, p.enableTableMonitor) }) } diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 408a86ba903..5f888e89848 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -79,6 +79,7 @@ func (mc *mockCDCKVClient) EventFeed( ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error { for { select { @@ -131,7 +132,7 @@ func newPullerForTest( plr := New( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), checkpointTs, spans, config.GetDefaultServerConfig(), - model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false) + model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false, false) wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 14a9623e55d..2faf2419b98 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -21,6 +21,7 @@ const ( "force-replicate": true, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, "filter": { @@ -179,6 +180,7 @@ const ( "force-replicate": true, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, @@ -256,6 +258,7 @@ const ( "force-replicate": true, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 695d3989646..5b2760fffc7 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -50,6 +50,7 @@ var defaultReplicaConfig = &ReplicaConfig{ EnableOldValue: true, CheckGCSafePoint: true, EnableSyncPoint: false, + EnableTableMonitor: false, SyncPointInterval: time.Minute * 10, SyncPointRetention: time.Hour * 24, Filter: &FilterConfig{ @@ -113,12 +114,13 @@ func (d *Duration) UnmarshalText(text []byte) error { type ReplicaConfig replicaConfig type replicaConfig struct { - MemoryQuota uint64 `toml:"memory-quota" json:"memory-quota"` - CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` - EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"` - ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` - CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` - EnableSyncPoint bool `toml:"enable-sync-point" json:"enable-sync-point"` + MemoryQuota uint64 `toml:"memory-quota" json:"memory-quota"` + CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` + EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"` + ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` + CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` + EnableSyncPoint bool `toml:"enable-sync-point" json:"enable-sync-point"` + EnableTableMonitor bool `toml:"enable-table-monitor" json:"enable-table-monitor"` // BDR(Bidirectional Replication) is a feature that allows users to // replicate data of same tables from TiDB-1 to TiDB-2 and vice versa. // This feature is only available for TiDB.