Skip to content

Commit

Permalink
Added checks for checking test containers running or not to avoid tes…
Browse files Browse the repository at this point in the history
…t flakeness (#2303)

* Enhance srcdb.Connect() method to be verify if already existing and check status using Ping()
* Add database ping check in Start() for all DB test container types

* Call source db Connect() func to ensure database is connected
  • Loading branch information
sanyamsinghal authored Feb 18, 2025
1 parent 0b0953d commit 3dcd367
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 58 deletions.
12 changes: 12 additions & 0 deletions yb-voyager/src/srcdb/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ func newMySQL(s *Source) *MySQL {
}

func (ms *MySQL) Connect() error {
if ms.db != nil {
err := ms.db.Ping()
if err == nil {
log.Infof("Already connected to the source database")
return nil
} else {
log.Infof("Failed to ping the source database: %s", err)
ms.Disconnect()
}
log.Info("Reconnecting to the source database")
}

db, err := sql.Open("mysql", ms.getConnectionUri())
db.SetMaxOpenConns(ms.source.NumConnections)
db.SetConnMaxIdleTime(5 * time.Minute)
Expand Down
1 change: 1 addition & 0 deletions yb-voyager/src/srcdb/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestMysqlGetAllTableNames(t *testing.T) {
testMySQLSource.Source.DBName = "test" // used in query of GetAllTableNames()

// Test GetAllTableNames
_ = testMySQLSource.DB().Connect()
actualTables := testMySQLSource.DB().GetAllTableNames()
expectedTables := []*sqlname.SourceName{
sqlname.NewSourceName("test", "foo"),
Expand Down
12 changes: 12 additions & 0 deletions yb-voyager/src/srcdb/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ func newOracle(s *Source) *Oracle {
}

func (ora *Oracle) Connect() error {
if ora.db != nil {
err := ora.db.Ping()
if err == nil {
log.Infof("Already connected to the source database")
return nil
} else {
log.Infof("Failed to ping the source database: %s", err)
ora.Disconnect()
}
log.Info("Reconnecting to the source database")
}

db, err := sql.Open("godror", ora.getConnectionUri())
db.SetMaxOpenConns(ora.source.NumConnections)
db.SetConnMaxIdleTime(5 * time.Minute)
Expand Down
2 changes: 2 additions & 0 deletions yb-voyager/src/srcdb/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestOracleGetTableToUniqueKeyColumnsMap(t *testing.T) {
tableList := []sqlname.NameTuple{
{CurrentName: objectName},
}
_ = testOracleSource.DB().Connect()
uniqueKeys, err := testOracleSource.DB().GetTableToUniqueKeyColumnsMap(tableList)
if err != nil {
t.Fatalf("Error retrieving unique keys: %v", err)
Expand All @@ -72,6 +73,7 @@ func TestOracleGetTableToUniqueKeyColumnsMap(t *testing.T) {
}

func TestOracleGetNonPKTables(t *testing.T) {
_ = testOracleSource.DB().Connect()
actualTables, err := testOracleSource.DB().GetNonPKTables()
assert.NilError(t, err, "Expected nil but non nil error: %v", err)

Expand Down
12 changes: 12 additions & 0 deletions yb-voyager/src/srcdb/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ func newPostgreSQL(s *Source) *PostgreSQL {
}

func (pg *PostgreSQL) Connect() error {
if pg.db != nil {
err := pg.db.Ping()
if err == nil {
log.Infof("Already connected to the source database")
log.Infof("Already connected to the source database")
return nil
} else {
log.Infof("Failed to ping the source database: %s", err)
pg.Disconnect()
}
log.Info("Reconnecting to the source database")
}
db, err := sql.Open("pgx", pg.getConnectionUri())
db.SetMaxOpenConns(pg.source.NumConnections)
db.SetConnMaxIdleTime(5 * time.Minute)
Expand Down
5 changes: 5 additions & 0 deletions yb-voyager/src/srcdb/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestPostgresGetAllTableNames(t *testing.T) {
testPostgresSource.Source.Schema = "test_schema" // used in query of GetAllTableNames()

// Test GetAllTableNames
_ = testPostgresSource.DB().Connect()
actualTables := testPostgresSource.DB().GetAllTableNames()
expectedTables := []*sqlname.SourceName{
sqlname.NewSourceName("test_schema", "foo"),
Expand Down Expand Up @@ -86,6 +87,8 @@ func TestPostgresGetTableToUniqueKeyColumnsMap(t *testing.T) {
{CurrentName: sqlname.NewObjectName("postgresql", "test_schema", "test_schema", "another_unique_table")},
}

// Test GetTableToUniqueKeyColumnsMap
_ = testPostgresSource.DB().Connect()
actualUniqKeys, err := testPostgresSource.DB().GetTableToUniqueKeyColumnsMap(uniqueTablesList)
if err != nil {
t.Fatalf("Error retrieving unique keys: %v", err)
Expand Down Expand Up @@ -128,6 +131,8 @@ func TestPostgresGetNonPKTables(t *testing.T) {
);`)
defer testPostgresSource.TestContainer.ExecuteSqls(`DROP SCHEMA test_schema CASCADE;`)

// Test GetNonPKTables
_ = testPostgresSource.DB().Connect()
actualTables, err := testPostgresSource.DB().GetNonPKTables()
assert.NilError(t, err, "Expected nil but non nil error: %v", err)

Expand Down
11 changes: 11 additions & 0 deletions yb-voyager/src/srcdb/yugabytedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ func newYugabyteDB(s *Source) *YugabyteDB {
}

func (yb *YugabyteDB) Connect() error {
if yb.db != nil {
err := yb.db.Ping()
if err == nil {
log.Infof("Already connected to the source database")
return nil
} else {
log.Infof("Failed to ping the source database: %s", err)
yb.Disconnect()
}
log.Info("Reconnecting to the source database")
}
db, err := sql.Open("pgx", yb.getConnectionUri())
db.SetMaxOpenConns(yb.source.NumConnections)
db.SetConnMaxIdleTime(5 * time.Minute)
Expand Down
5 changes: 5 additions & 0 deletions yb-voyager/src/srcdb/yugbaytedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestYugabyteGetAllTableNames(t *testing.T) {
testYugabyteDBSource.Source.Schema = "test_schema"

// Test GetAllTableNames
_ = testYugabyteDBSource.DB().Connect()
actualTables := testYugabyteDBSource.DB().GetAllTableNames()
expectedTables := []*sqlname.SourceName{
sqlname.NewSourceName("test_schema", "foo"),
Expand Down Expand Up @@ -86,6 +87,8 @@ func TestYugabyteGetTableToUniqueKeyColumnsMap(t *testing.T) {
{CurrentName: sqlname.NewObjectName("postgresql", "test_schema", "test_schema", "another_unique_table")},
}

// Test GetTableToUniqueKeyColumnsMap
_ = testYugabyteDBSource.DB().Connect()
actualUniqKeys, err := testYugabyteDBSource.DB().GetTableToUniqueKeyColumnsMap(uniqueTablesList)
if err != nil {
t.Fatalf("Error retrieving unique keys: %v", err)
Expand Down Expand Up @@ -128,6 +131,8 @@ func TestYugabyteGetNonPKTables(t *testing.T) {
);`)
defer testYugabyteDBSource.TestContainer.ExecuteSqls(`DROP SCHEMA test_schema CASCADE;`)

// Test GetNonPKTables
_ = testYugabyteDBSource.DB().Connect()
actualTables, err := testYugabyteDBSource.DB().GetNonPKTables()
assert.NilError(t, err, "Expected nil but non nil error: %v", err)

Expand Down
31 changes: 31 additions & 0 deletions yb-voyager/test/containers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package testcontainers

import (
"context"
"database/sql"
_ "embed"
"fmt"
"io"
"time"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -58,3 +60,32 @@ func printContainerLogs(container testcontainers.Container) {

fmt.Printf("=== Logs for container %s ===\n%s\n=== End of Logs for container %s ===\n", containerID, string(logData), containerID)
}

// pingDatabase tries to connect to the database using the driver and connection string.
// It retries for a few times with a delay before giving up.
func pingDatabase(driver string, connStr string) error {
var err error
maxRetries := 3
retryDelay := 5 * time.Second

for i := 0; i < maxRetries; i++ {
db, openErr := sql.Open(driver, connStr)
if openErr != nil {
err = openErr
} else {
pingErr := db.Ping()
closeErr := db.Close()
if pingErr == nil && closeErr == nil {
return nil // success
}

if pingErr != nil {
err = pingErr
} else {
err = closeErr
}
}
time.Sleep(retryDelay)
}
return fmt.Errorf("pingDatabase failed even after '%d' retries: %w", maxRetries, err)
}
38 changes: 13 additions & 25 deletions yb-voyager/test/containers/mysql_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import (
type MysqlContainer struct {
ContainerConfig
container testcontainers.Container
db *sql.DB
}

func (ms *MysqlContainer) Start(ctx context.Context) (err error) {
if ms.container != nil {
if ms.container != nil && ms.container.IsRunning() {
utils.PrintAndLog("Mysql-%s container already running", ms.DBVersion)
return nil
}
Expand Down Expand Up @@ -62,24 +61,15 @@ func (ms *MysqlContainer) Start(ctx context.Context) (err error) {
Started: true,
})
printContainerLogs(ms.container)
if err != nil {
return err
}

dsn := ms.GetConnectionString()
db, err := sql.Open("mysql", dsn)
if err != nil {
return fmt.Errorf("failed to open mysql connection: %w", err)
return fmt.Errorf("failed to start mysql container: %w", err)
}

if err = db.Ping(); err != nil {
db.Close()
return fmt.Errorf("failed to ping mysql after connection: %w", err)
err = pingDatabase("mysql", ms.GetConnectionString())
if err != nil {
return fmt.Errorf("failed to ping mysql container: %w", err)
}

// Store the DB connection for reuse
ms.db = db

return nil
}

Expand All @@ -88,13 +78,6 @@ func (ms *MysqlContainer) Terminate(ctx context.Context) {
return
}

// Close the DB connection if it exists
if ms.db != nil {
if err := ms.db.Close(); err != nil {
log.Errorf("failed to close mysql db connection: %v", err)
}
}

err := ms.container.Terminate(ctx)
if err != nil {
log.Errorf("failed to terminate mysql container: %v", err)
Expand Down Expand Up @@ -136,12 +119,17 @@ func (ms *MysqlContainer) GetConnectionString() string {
}

func (ms *MysqlContainer) ExecuteSqls(sqls ...string) {
if ms.db == nil {
utils.ErrExit("db connection not initialized for mysql container")
if ms == nil {
utils.ErrExit("mysql container is not started: nil")
}

db, err := sql.Open("mysql", ms.GetConnectionString())
if err != nil {
utils.ErrExit("failed to connect to mysql for executing sqls: %w", err)
}

for _, sqlStmt := range sqls {
_, err := ms.db.Exec(sqlStmt)
_, err := db.Exec(sqlStmt)
if err != nil {
utils.ErrExit("failed to execute sql '%s': %w", sqlStmt, err)
}
Expand Down
22 changes: 19 additions & 3 deletions yb-voyager/test/containers/oracle_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type OracleContainer struct {
}

func (ora *OracleContainer) Start(ctx context.Context) (err error) {
if ora.container != nil {
if ora.container != nil && ora.container.IsRunning() {
utils.PrintAndLog("Oracle-%s container already running", ora.DBVersion)
return nil
}
Expand Down Expand Up @@ -61,7 +61,15 @@ func (ora *OracleContainer) Start(ctx context.Context) (err error) {
Started: true,
})
printContainerLogs(ora.container)
return err
if err != nil {
return fmt.Errorf("failed to start oracle container: %w", err)
}

err = pingDatabase("godror", ora.GetConnectionString())
if err != nil {
return fmt.Errorf("failed to ping oracle container: %w", err)
}
return nil
}

func (ora *OracleContainer) Terminate(ctx context.Context) {
Expand Down Expand Up @@ -99,7 +107,15 @@ func (ora *OracleContainer) GetConfig() ContainerConfig {
}

func (ora *OracleContainer) GetConnectionString() string {
panic("GetConnectionString() not implemented yet for oracle")
config := ora.GetConfig()
host, port, err := ora.GetHostPort()
if err != nil {
utils.ErrExit("failed to get host port for oracle connection string: %v", err)
}

connectString := fmt.Sprintf(`(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = %s)(PORT = %d))(CONNECT_DATA = (SERVICE_NAME = %s)))`,
host, port, config.DBName)
return fmt.Sprintf(`user="%s" password="%s" connectString="%s"`, config.User, config.Password, connectString)
}

func (ora *OracleContainer) ExecuteSqls(sqls ...string) {
Expand Down
Loading

0 comments on commit 3dcd367

Please sign in to comment.