From 5c27b400a81fca6023e809985826e01976632e6a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 19 Feb 2025 10:59:24 -0500 Subject: [PATCH] VReplication: Align VReplication and VTGate VStream Retry Logic (#17783) Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 32 ++++++++++++++----- go/vt/vtgate/vstream_manager_test.go | 11 +++++-- .../tabletmanager/vreplication/utils.go | 3 +- .../tabletmanager/vreplication/utils_test.go | 3 +- .../tabletserver/vstreamer/uvstreamer.go | 8 ++--- 5 files changed, 40 insertions(+), 17 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index dc7a77ed62d..2a48fa9fc6d 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -27,6 +27,7 @@ import ( "golang.org/x/exp/maps" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" @@ -798,20 +799,35 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // An error should be retried if it is expected to be transient. // A tablet should be ignored upon retry if it's likely another tablet will not // produce the same error. -func (vs *vstream) shouldRetry(err error) (bool, bool) { +func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) { errCode := vterrors.Code(err) - + // In this context, where we will run the tablet picker again on retry, these + // codes indicate that it's worth a retry as the error is likely a transient + // one with a tablet or within the shard. if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE { return true, false } - - // If there is a GTIDSet Mismatch on the tablet, omit it from the candidate - // list in the TabletPicker on retry. - if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { - return true, true + // This typically indicates that the user provided invalid arguments for the + // VStream so we should not retry. + if errCode == vtrpcpb.Code_INVALID_ARGUMENT { + // But if there is a GTIDSet Mismatch on the tablet, omit that tablet from + // the candidate list in the TabletPicker and retry. The argument was invalid + // *for that specific *tablet* but it's not generally invalid. + if strings.Contains(err.Error(), "GTIDSet Mismatch") { + return true, true + } + return false, false + } + // Internal errors such as not having all journaling partipants require a new + // VStream. + if errCode == vtrpcpb.Code_INTERNAL { + return false, false } - return false, false + // For anything else, if this is an ephemeral SQL error -- such as a + // MAX_EXECUTION_TIME SQL error during the copy phase -- or any other + // type of non-SQL error, then retry. + return sqlerror.IsEphemeralError(err), false } // sendAll sends a group of events together while holding the lock. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 4e10e60c758..e209e15fb3d 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -420,12 +420,19 @@ func TestVStreamRetriableErrors(t *testing.T) { ignoreTablet: false, }, { - name: "should not retry", + name: "invalid argument", code: vtrpcpb.Code_INVALID_ARGUMENT, msg: "final error", shouldRetry: false, ignoreTablet: false, }, + { + name: "query interrupted", + code: vtrpcpb.Code_UNKNOWN, + msg: "vttablet: rpc error: code = Unknown desc = Query execution was interrupted, maximum statement execution time exceeded (errno 3024) (sqlstate HY000)", + shouldRetry: true, + ignoreTablet: false, + }, } commit := []*binlogdatapb.VEvent{ @@ -928,7 +935,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Variable names are maintained like in OneToMany, but order is different.1 + // Variable names are maintained like in OneToMany, but order is different. ks := "TestVStream" cell := "aa" _ = createSandbox(ks) diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils.go b/go/vt/vttablet/tabletmanager/vreplication/utils.go index 67b52c56261..cff53ce55ee 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/utils.go +++ b/go/vt/vttablet/tabletmanager/vreplication/utils.go @@ -129,7 +129,8 @@ func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, para insertLog(dbClient, action, vreplID, params["state"], message) } -// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate. +// isUnrecoverableError returns true if vreplication cannot recover from the given error and +// should completely terminate. func isUnrecoverableError(err error) bool { if err == nil { return false diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils_test.go b/go/vt/vttablet/tabletmanager/vreplication/utils_test.go index 15093e299fc..2406796aace 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/utils_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/utils_test.go @@ -22,8 +22,6 @@ import ( "strings" "testing" - vttablet "vitess.io/vitess/go/vt/vttablet/common" - "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql/sqlerror" @@ -31,6 +29,7 @@ import ( "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/vterrors" + vttablet "vitess.io/vitess/go/vt/vttablet/common" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index ea475d19676..bf8d4330831 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -27,17 +27,17 @@ import ( "time" "vitess.io/vitess/go/mysql/replication" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var uvstreamerTestMode = false // Only used for testing