Skip to content

Commit

Permalink
Mimic vreplication handling w/o using the same code
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 18, 2025
1 parent fd73c53 commit 7c5c747
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 8 deletions.
9 changes: 5 additions & 4 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"golang.org/x/exp/maps"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
Expand All @@ -36,7 +37,6 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -824,9 +824,10 @@ func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) {
return false, false
}

// For anything else, if this is a recoverable/ephemeral error -- such as a
// MAX_EXECUTION_TIME SQL error during the copy phase -- then retry.
return !vreplication.IsUnrecoverableError(err), false
// For anything else, if this is an ephemeral SQL error -- such as a
// MAX_EXECUTION_TIME SQL error during the copy phase -- or any other
// type of non-SQL error, then retry.
return sqlerror.IsEphemeralError(err), false
}

// sendAll sends a group of events together while holding the lock.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
// (maxTimeToRetryError). In addition, we cannot restart a workflow
// started with AtomicCopy which has _any_ error.
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) ||
IsUnrecoverableError(err) ||
isUnrecoverableError(err) ||
!ct.lastWorkflowError.ShouldRetry() {
err = vterrors.Wrapf(err, TerminalErrorIndicator)
if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, para
insertLog(dbClient, action, vreplID, params["state"], message)
}

// IsUnrecoverableError returns true if vreplication cannot recover from the given error and
// isUnrecoverableError returns true if vreplication cannot recover from the given error and
// should completely terminate.
func IsUnrecoverableError(err error) bool {
func isUnrecoverableError(err error) bool {
if err == nil {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestIsUnrecoverableError(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := IsUnrecoverableError(tc.err)
result := isUnrecoverableError(tc.err)
require.Equal(t, tc.expected, result)
})
}
Expand Down

0 comments on commit 7c5c747

Please sign in to comment.