diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index a0804e62e49..0ae86a37a17 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" tablesinkmetrics "github.com/pingcap/tiflow/cdc/sink/metrics/tablesink" "github.com/pingcap/tiflow/cdc/sink/tablesink" - "github.com/pingcap/tiflow/pkg/config" pconfig "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" @@ -330,12 +329,6 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er } } -func (m *SinkManager) needsStuckCheck() bool { - m.sinkFactory.Lock() - defer m.sinkFactory.Unlock() - return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ -} - func (m *SinkManager) initSinkFactory() (chan error, uint64) { m.sinkFactory.Lock() defer m.sinkFactory.Unlock() @@ -403,19 +396,6 @@ func (m *SinkManager) clearSinkFactory() { } } -func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) { - m.sinkFactory.Lock() - defer m.sinkFactory.Unlock() - if version == m.sinkFactory.version { - select { - case m.sinkFactory.errors <- err: - default: - } - return true - } - return false -} - func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) { for i := 0; i < sinkWorkerNum; i++ { w := newSinkWorker(m.changefeedID, m.sourceManager, @@ -1023,25 +1003,6 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { m.sinkMemQuota.Release(span, checkpointTs) m.redoMemQuota.Release(span, checkpointTs) - advanceTimeoutInSec := util.GetOrZero(m.config.Sink.AdvanceTimeoutInSec) - if advanceTimeoutInSec <= 0 { - advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec - } - stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second - - if m.needsStuckCheck() { - isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) - if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { - log.Warn("Table checkpoint is stuck too long, will restart the sink backend", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Stringer("span", &span), - zap.Any("checkpointTs", checkpointTs), - zap.Float64("stuckCheck", stuckCheck.Seconds()), - zap.Uint64("factoryVersion", sinkVersion)) - } - } - var resolvedTs model.Ts // If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts. if m.redoDMLMgr != nil { diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 0532dcb3b92..ab753105f05 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -378,21 +378,6 @@ func TestSinkManagerRunWithErrors(t *testing.T) { } } -func TestSinkManagerNeedsStuckCheck(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - errCh := make(chan error, 16) - changefeedInfo := getChangefeedInfo() - manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh) - defer func() { - cancel() - manager.Close() - }() - - require.False(t, manager.needsStuckCheck()) -} - func TestSinkManagerRestartTableSinks(t *testing.T) { failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return") defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause") diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index dddba30ea2a..3dfa10c7f93 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -453,25 +453,6 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound sorter.Position, min return shouldClean } -func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) { - t.getCheckpointTs() - - t.tableSink.RLock() - defer t.tableSink.RUnlock() - t.tableSink.innerMu.Lock() - defer t.tableSink.innerMu.Unlock() - - // What these conditions mean: - // 1. the table sink has been associated with a valid sink; - // 2. its checkpoint hasn't been advanced for a while; - version := t.tableSink.version - advanced := t.tableSink.advanced - if version > 0 && time.Since(advanced) > stuckCheck { - return true, version - } - return false, uint64(0) -} - func handleRowChangedEvents( changefeed model.ChangeFeedID, span tablepb.Span, events ...*model.PolymorphicEvent, diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index f94f889f8a5..7bde3a845d0 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -18,7 +18,6 @@ import ( "math" "sync" "testing" - "time" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -29,7 +28,6 @@ import ( "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/oracle" ) type mockSink struct { @@ -315,63 +313,3 @@ func TestTableSinkWrapperSinkVersion(t *testing.T) { require.Nil(t, wrapper.tableSink.s) require.Equal(t, wrapper.tableSink.version, uint64(0)) } - -func TestTableSinkWrapperSinkInner(t *testing.T) { - t.Parallel() - - innerTableSink := tablesink.New[*model.RowChangedEvent]( - model.ChangeFeedID{}, tablepb.Span{}, model.Ts(0), - newMockSink(), &dmlsink.RowChangeEventAppender{}, - pdutil.NewClock4Test(), - prometheus.NewCounter(prometheus.CounterOpts{}), - prometheus.NewHistogram(prometheus.HistogramOpts{}), - ) - version := new(uint64) - - wrapper := newTableSinkWrapper( - model.DefaultChangeFeedID("1"), - spanz.TableIDToComparableSpan(1), - func() (tablesink.TableSink, uint64) { - *version += 1 - return innerTableSink, *version - }, - tablepb.TableStatePrepared, - oracle.GoTimeToTS(time.Now()), - oracle.GoTimeToTS(time.Now().Add(10000*time.Second)), - func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil }, - ) - - require.True(t, wrapper.initTableSink()) - - wrapper.closeAndClearTableSink() - - // Shouldn't be stuck because version is 0. - require.Equal(t, wrapper.tableSink.version, uint64(0)) - isStuck, _ := wrapper.sinkMaybeStuck(100 * time.Millisecond) - require.False(t, isStuck) - - // Shouldn't be stuck because tableSink.advanced is just updated. - require.True(t, wrapper.initTableSink()) - isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) - require.False(t, isStuck) - - // Shouldn't be stuck because upperbound hasn't been advanced. - time.Sleep(200 * time.Millisecond) - isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) - require.False(t, isStuck) - - // Shouldn't be stuck because `getCheckpointTs` will update tableSink.advanced. - nowTs := oracle.GoTimeToTS(time.Now()) - wrapper.updateReceivedSorterResolvedTs(nowTs) - wrapper.barrierTs.Store(nowTs) - isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) - require.False(t, isStuck) - - time.Sleep(200 * time.Millisecond) - nowTs = oracle.GoTimeToTS(time.Now()) - wrapper.updateReceivedSorterResolvedTs(nowTs) - wrapper.barrierTs.Store(nowTs) - wrapper.updateResolvedTs(model.NewResolvedTs(nowTs)) - isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) - require.True(t, isStuck) -} diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 915cbd2c097..ea68ee2cc7d 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -178,6 +178,7 @@ type SinkConfig struct { // AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been // advanced for this given duration, the sink will be canceled and re-established. + // Deprecated since v8.1.1 AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"` // Simple Protocol only config, use to control the behavior of sending bootstrap message. diff --git a/pkg/sink/kafka/sarama_factory.go b/pkg/sink/kafka/sarama_factory.go index e8e1e8c68ba..1d569a14566 100644 --- a/pkg/sink/kafka/sarama_factory.go +++ b/pkg/sink/kafka/sarama_factory.go @@ -15,12 +15,15 @@ package kafka import ( "context" + "time" "github.com/IBM/sarama" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" "github.com/rcrowley/go-metrics" + "go.uber.org/zap" ) type saramaFactory struct { @@ -43,17 +46,32 @@ func NewSaramaFactory( } func (f *saramaFactory) AdminClient(ctx context.Context) (ClusterAdminClient, error) { + start := time.Now() config, err := NewSaramaConfig(ctx, f.option) + duration := time.Since(start).Seconds() + if duration > 2 { + log.Warn("new sarama config cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID)) + } if err != nil { return nil, err } + start = time.Now() client, err := sarama.NewClient(f.option.BrokerEndpoints, config) + duration = time.Since(start).Seconds() + if duration > 2 { + log.Warn("new sarama client cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID)) + } if err != nil { return nil, errors.Trace(err) } + start = time.Now() admin, err := sarama.NewClusterAdminFromClient(client) + duration = time.Since(start).Seconds() + if duration > 2 { + log.Warn("new sarama cluster admin cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID)) + } if err != nil { return nil, errors.Trace(err) } diff --git a/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml b/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml deleted file mode 100644 index 9161a8511f6..00000000000 --- a/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml +++ /dev/null @@ -1,2 +0,0 @@ -[sink] -advance-timeout-in-sec = 10 diff --git a/tests/integration_tests/hang_sink_suicide/conf/diff_config.toml b/tests/integration_tests/hang_sink_suicide/conf/diff_config.toml deleted file mode 100644 index 9c5a2102afb..00000000000 --- a/tests/integration_tests/hang_sink_suicide/conf/diff_config.toml +++ /dev/null @@ -1,29 +0,0 @@ -# diff Configuration. - -check-thread-count = 4 - -export-fix-sql = true - -check-struct-only = false - -[task] - output-dir = "/tmp/tidb_cdc_test/hang_sink_suicide/sync_diff/output" - - source-instances = ["mysql1"] - - target-instance = "tidb0" - - target-check-tables = ["hang_sink_suicide.t?*"] - -[data-sources] -[data-sources.mysql1] - host = "127.0.0.1" - port = 4000 - user = "root" - password = "" - -[data-sources.tidb0] - host = "127.0.0.1" - port = 3306 - user = "root" - password = "" diff --git a/tests/integration_tests/hang_sink_suicide/run.sh b/tests/integration_tests/hang_sink_suicide/run.sh deleted file mode 100644 index e4e663cb975..00000000000 --- a/tests/integration_tests/hang_sink_suicide/run.sh +++ /dev/null @@ -1,47 +0,0 @@ -#!/bin/bash - -set -eu - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test -SINK_TYPE=$1 - -function run() { - # test with mysql sink only - if [ "$SINK_TYPE" != "mysql" ]; then - return - fi - - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - start_tidb_cluster --workdir $WORK_DIR - cd $WORK_DIR - - pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=2*return(true)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --pd $pd_addr --logsuffix 1 --addr "127.0.0.1:8300" - - SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" - changefeed_id=test - cdc cli changefeed create -c $changefeed_id --config $CUR/conf/changefeed.toml --pd=$pd_addr --sink-uri="$SINK_URI" - - run_sql "CREATE DATABASE hang_sink_suicide;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "CREATE table hang_sink_suicide.t1 (id int primary key auto_increment)" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "CREATE table hang_sink_suicide.t2 (id int primary key auto_increment)" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "hang_sink_suicide.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_exists "hang_sink_suicide.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - - run_sql "insert into hang_sink_suicide.t1 values (),(),(),(),()" - run_sql "insert into hang_sink_suicide.t2 values (),(),(),(),()" - - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - export GO_FAILPOINTS='' - cleanup_process $CDC_BINARY -} - -trap stop_tidb_cluster EXIT -# TODO: update the case to use kafka sink instead of mysql sink. -# run $* -# check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 53629cf9407..2e1c4f6ee86 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -11,7 +11,7 @@ group_num=${group#G} # Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant # changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence # multi_cdc_cluster capture_suicide_while_balance_table -mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide server_config_compatibility changefeed_dup_error_restart" +mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart" mysql_only_http="http_api http_api_tls api_v2 http_api_tls_with_user_auth cli_tls_with_auth" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"