From feda026a9b634dce50f171aa4521b6ef5793b235 Mon Sep 17 00:00:00 2001 From: tanyinloo Date: Tue, 28 Jan 2025 11:48:20 +0800 Subject: [PATCH 1/3] support retry --- coredb/engine_ctx.go | 90 ++++++++++++++++++++++++++++++++++++ coredb/txengine/tx_engine.go | 45 ++++++++++++++++++ 2 files changed, 135 insertions(+) diff --git a/coredb/engine_ctx.go b/coredb/engine_ctx.go index eb07dbc..e523f5e 100644 --- a/coredb/engine_ctx.go +++ b/coredb/engine_ctx.go @@ -3,10 +3,56 @@ package coredb import ( "context" "database/sql" + "errors" "fmt" "log" + "strings" + "time" ) +// RetryConfig encapsulates retry parameters. +type RetryConfig struct { + MaxRetries int + InitialBackoff time.Duration +} + +// DefaultRetryConfig provides a reasonable default configuration +var DefaultRetryConfig = RetryConfig{ + MaxRetries: 5, + InitialBackoff: 200 * time.Millisecond, +} + +// IsNonRetryableError checks if an error is non-retryable. +func IsNonRetryableError(err error) bool { + if err == nil { + return false + } + // Example (Replace with your database's non-retryable errors) + + // SQL specific errors that are not retryable + if errors.Is(err, sql.ErrNoRows) { + return true + } + + // Example: Invalid SQL syntax + if strings.Contains(err.Error(), "syntax error") { + return true + } + + if strings.Contains(err.Error(), "1146") { // Table doesn't exists + return true + } + if strings.Contains(err.Error(), "1064") { // Invalid SQL statement + return true + } + // Example: Authentication issues + if strings.Contains(err.Error(), "Access denied") { + return true + } + + return false // Default is retryable +} + // FetchByPKCtx returns a row of T type with given primary key value func FetchByPKCtx[T any](ctx context.Context, dbname string, tableName string, pkName []string, val ...any) (*T, error) { sql := "WHERE `" + pkName[0] + "` = ?" @@ -57,6 +103,50 @@ func ExecCtx(ctx context.Context, dbname string, query string, params ...any) (s return mydb.ExecContext(ctx, query, params...) } +// ExecWithRetry executes a query with retry logic on failure. +func ExecWithRetry(ctx context.Context, dbname string, query string, retryConfig RetryConfig, params ...any) (sql.Result, error) { + // Set defaults for invalid config + if retryConfig.MaxRetries <= 0 { + retryConfig.MaxRetries = DefaultRetryConfig.MaxRetries + } + + if retryConfig.InitialBackoff <= 0 { + retryConfig.InitialBackoff = DefaultRetryConfig.InitialBackoff + } + + var result sql.Result + var err error + retryCount := 0 + currentBackoff := retryConfig.InitialBackoff + + for { + select { + case <-ctx.Done(): + return result, fmt.Errorf("context cancelled during retry: %w", ctx.Err()) + default: + result, err = ExecCtx(ctx, dbname, query, params...) + if err == nil { + return result, nil // Success! + } + + if IsNonRetryableError(err) { + return result, err // Fail immediately for non-retryable errors + } + + retryCount++ + if retryCount > retryConfig.MaxRetries { + log.Printf("Max retries (%d) exceeded for: %s, last error: %v", retryConfig.MaxRetries, query, err) + return result, fmt.Errorf("max retries exceeded, last error: %w", err) + } + + delay := currentBackoff + log.Printf("Retrying attempt %d with delay %v. Last error: %v", retryCount, delay, err) + time.Sleep(delay) + currentBackoff *= 2 + } + } +} + // FindOneCtx returns a row from given table type with where query. // If no rows found, *T will be nil. No error will be returned. func FindOneCtx[T any](ctx context.Context, dbname string, tableName string, where WhereQuery) (*T, error) { diff --git a/coredb/txengine/tx_engine.go b/coredb/txengine/tx_engine.go index 8b7f079..c0234ae 100644 --- a/coredb/txengine/tx_engine.go +++ b/coredb/txengine/tx_engine.go @@ -40,6 +40,51 @@ func RunTransaction(ctx context.Context, dbName string, fn func(ctx context.Cont return } +// RunTxWithRetry runs a transaction with retry logic on failure. +func RunTxWithRetry(ctx context.Context, dbName string, retryConfig coredb.RetryConfig, fn func(ctx context.Context, sqlTx *sql.Tx) error) (err error) { + // Set defaults for invalid config + if retryConfig.MaxRetries <= 0 { + retryConfig.MaxRetries = coredb.DefaultRetryConfig.MaxRetries + } + + if retryConfig.InitialBackoff <= 0 { + retryConfig.InitialBackoff = coredb.DefaultRetryConfig.InitialBackoff + } + + var resultErr error + retryCount := 0 + currentBackoff := retryConfig.InitialBackoff + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled during retry: %w", ctx.Err()) + default: + resultErr = RunTransaction(ctx, dbName, fn) + if resultErr == nil { + return nil // Success! + } + + if coredb.IsNonRetryableError(resultErr) { + log.Printf("Non-retryable error: %v", resultErr) + return resultErr // Fail immediately for non-retryable errors + } + + retryCount++ + if retryCount > retryConfig.MaxRetries { + log.Printf("Max retries (%d) exceeded, last error: %v", retryConfig.MaxRetries, resultErr) + return fmt.Errorf("max retries exceeded, last error: %w", resultErr) + + } + + delay := currentBackoff + log.Printf("Retrying attempt %d with delay %v. Last error: %v", retryCount, delay, resultErr) + time.Sleep(delay) + currentBackoff *= 2 + } + } +} + func runTransaction(ctx context.Context, tx *sql.Tx, conn *sql.Conn, fn func(ctx context.Context, sqlTx *sql.Tx) error) (err error) { if tx == nil && conn == nil { return errors.New("wrong usage. tx and conn cannot both be nil") From afe71bb7c04a61044bca75fd6416d300f8dc4d2f Mon Sep 17 00:00:00 2001 From: tanyinloo Date: Tue, 28 Jan 2025 12:10:55 +0800 Subject: [PATCH 2/3] expose IsNonRetryableErrorFunc --- coredb/engine_ctx.go | 20 +++++++++++++++----- coredb/txengine/tx_engine.go | 7 ++++++- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/coredb/engine_ctx.go b/coredb/engine_ctx.go index e523f5e..4c45f0a 100644 --- a/coredb/engine_ctx.go +++ b/coredb/engine_ctx.go @@ -10,16 +10,20 @@ import ( "time" ) +type IsNonRetryableErrorFunc func(err error) bool + // RetryConfig encapsulates retry parameters. type RetryConfig struct { - MaxRetries int - InitialBackoff time.Duration + MaxRetries int + InitialBackoff time.Duration + IsNonRetryableErrorFunc IsNonRetryableErrorFunc } // DefaultRetryConfig provides a reasonable default configuration var DefaultRetryConfig = RetryConfig{ - MaxRetries: 5, - InitialBackoff: 200 * time.Millisecond, + MaxRetries: 5, + InitialBackoff: 200 * time.Millisecond, + IsNonRetryableErrorFunc: IsNonRetryableError, } // IsNonRetryableError checks if an error is non-retryable. @@ -114,6 +118,12 @@ func ExecWithRetry(ctx context.Context, dbname string, query string, retryConfig retryConfig.InitialBackoff = DefaultRetryConfig.InitialBackoff } + // Use the default if NonRetryableErrorFunc is nil + nonRetryableErrorFunc := retryConfig.IsNonRetryableErrorFunc + if nonRetryableErrorFunc == nil { + nonRetryableErrorFunc = IsNonRetryableError + } + var result sql.Result var err error retryCount := 0 @@ -129,7 +139,7 @@ func ExecWithRetry(ctx context.Context, dbname string, query string, retryConfig return result, nil // Success! } - if IsNonRetryableError(err) { + if nonRetryableErrorFunc(err) { return result, err // Fail immediately for non-retryable errors } diff --git a/coredb/txengine/tx_engine.go b/coredb/txengine/tx_engine.go index c0234ae..d52a2a3 100644 --- a/coredb/txengine/tx_engine.go +++ b/coredb/txengine/tx_engine.go @@ -51,6 +51,11 @@ func RunTxWithRetry(ctx context.Context, dbName string, retryConfig coredb.Retry retryConfig.InitialBackoff = coredb.DefaultRetryConfig.InitialBackoff } + nonRetryableErrorFunc := retryConfig.IsNonRetryableErrorFunc + if nonRetryableErrorFunc == nil { + nonRetryableErrorFunc = coredb.IsNonRetryableError + } + var resultErr error retryCount := 0 currentBackoff := retryConfig.InitialBackoff @@ -65,7 +70,7 @@ func RunTxWithRetry(ctx context.Context, dbName string, retryConfig coredb.Retry return nil // Success! } - if coredb.IsNonRetryableError(resultErr) { + if nonRetryableErrorFunc(resultErr) { log.Printf("Non-retryable error: %v", resultErr) return resultErr // Fail immediately for non-retryable errors } From 8ecea052065114500f01527286929ec3cf2708d7 Mon Sep 17 00:00:00 2001 From: tanyinloo Date: Tue, 28 Jan 2025 13:31:09 +0800 Subject: [PATCH 3/3] add simple test --- coredb/engine_ctx.go | 5 ++++- tests/retry_test.go | 33 +++++++++++++++++++++++++++++++++ tests/user_test.go | 16 +++++++++------- 3 files changed, 46 insertions(+), 8 deletions(-) create mode 100644 tests/retry_test.go diff --git a/coredb/engine_ctx.go b/coredb/engine_ctx.go index 4c45f0a..6336d36 100644 --- a/coredb/engine_ctx.go +++ b/coredb/engine_ctx.go @@ -46,7 +46,10 @@ func IsNonRetryableError(err error) bool { if strings.Contains(err.Error(), "1146") { // Table doesn't exists return true } - if strings.Contains(err.Error(), "1064") { // Invalid SQL statement + if strings.Contains(err.Error(), "1064") { // No database selected + return true + } + if strings.Contains(err.Error(), "1149") { // Invalid SQL statement return true } // Example: Authentication issues diff --git a/tests/retry_test.go b/tests/retry_test.go new file mode 100644 index 0000000..a5b7f5a --- /dev/null +++ b/tests/retry_test.go @@ -0,0 +1,33 @@ +package tests + +import ( + "context" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/olachat/gola/v2/coredb" +) + +func TestExecWithRetry_Success(t *testing.T) { + ctx := context.Background() + + _, err := coredb.ExecWithRetry(ctx, testDBName, "INSERT INTO test_table (name, email) VALUES (?, ?)", coredb.DefaultRetryConfig, "test", "test@example.com") + if err != nil { + t.Fatalf("Expected success, but got error: %v", err) + } +} + +func TestExecWithRetry_Fail(t *testing.T) { + ctx := context.Background() + now := time.Now() + _, err := coredb.ExecWithRetry(ctx, testDBName, "INSERT INTO no_such_table (name, email) VALUES (?, ?)", coredb.DefaultRetryConfig, "test", "test@example.com") + if err == nil { + t.Fatalf("Expected error, but got success") + } + elapsed := time.Since(now) + if elapsed < (200+400+800+1600+3200)*time.Millisecond { + t.Fatalf("Expected retry to take at least 100ms, but took: %v", elapsed) + } + t.Logf("retry took: %v", elapsed) +} diff --git a/tests/user_test.go b/tests/user_test.go index 4f87757..af42cbd 100644 --- a/tests/user_test.go +++ b/tests/user_test.go @@ -54,16 +54,18 @@ func init() { panic(err) } - // realdb, err := open() - - // if err != nil { - // panic(err) - // } - coredb.Setup(func(dbname string, mode coredb.DBMode) *sql.DB { - return db + if dbname == testDBName { + return db + } + return nil }) + _, err = db.Exec("CREATE TABLE IF NOT EXISTS test_table (name VARCHAR(255), email VARCHAR(255))") + if err != nil { + panic(err) + } + // create tables for _, tableName := range tableNames { query, _ := testdata.Fixtures.ReadFile(tableName + ".sql")