Skip to content

Commit

Permalink
Fix: Import data not considering the initiated cutover during resumpt…
Browse files Browse the repository at this point in the history
…ion (#2369)

Fix - During the resumption of import data, check if the cutover is initiated before continuing the streaming. If cutover is initiated, but all the events are not processed by importer yet, on resumption, need to check if cutover event is processed by importer or not (by storing that state in MSR)
  • Loading branch information
priyanshi-yb authored Mar 7, 2025
1 parent c8306e4 commit a35486f
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 9 deletions.
1 change: 1 addition & 0 deletions yb-voyager/cmd/cutover.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,4 @@ func ExitIfAlreadyCutover(importerOrExporterRole string) {
panic(fmt.Sprintf("invalid role %s", importerOrExporterRole))
}
}

2 changes: 1 addition & 1 deletion yb-voyager/cmd/eventQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"time"

"github.com/goccy/go-json"

log "github.com/sirupsen/logrus"

"github.com/yugabyte/yb-voyager/yb-voyager/src/metadb"
"github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb"
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
Expand Down
1 change: 1 addition & 0 deletions yb-voyager/cmd/exportDataDebezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func isOracleJDBCWalletLocationSet(s srcdb.Source) (bool, error) {
// ---------------------------------------------- Export Data ---------------------------------------//

func debeziumExportData(ctx context.Context, config *dbzm.Config, tableNameToApproxRowCountMap map[string]int64) error {

if config.SnapshotMode != "never" {
err := metaDB.UpdateMigrationStatusRecord(func(record *metadb.MigrationStatusRecord) {
record.SnapshotMechanism = "debezium"
Expand Down
40 changes: 39 additions & 1 deletion yb-voyager/cmd/live_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,36 @@ func init() {
MAX_INTERVAL_BETWEEN_BATCHES = utils.GetEnvAsInt("MAX_INTERVAL_BETWEEN_BATCHES", 2000)
}

func cutoverInitiatedAndCutoverEventProcessed() (bool, error) {
msr, err := metaDB.GetMigrationStatusRecord()
if err != nil {
return false, fmt.Errorf("getting migration status record: %v", err)
}
switch importerRole {
case TARGET_DB_IMPORTER_ROLE:
return msr.CutoverToTargetRequested && msr.CutoverDetectedByTargetImporter, nil
case SOURCE_REPLICA_DB_IMPORTER_ROLE:
return msr.CutoverToSourceReplicaRequested && msr.CutoverDetectedBySourceReplicaImporter, nil
case SOURCE_DB_IMPORTER_ROLE:
return msr.CutoverToSourceRequested && msr.CutoverDetectedBySourceImporter, nil
}

return false, nil
}

func streamChanges(state *ImportDataState, tableNames []sqlname.NameTuple) error {
ok, err := cutoverInitiatedAndCutoverEventProcessed()
if err != nil {
return err
}
if ok {
log.Info("cutover is initiated and the event is detected..")
return nil
}
log.Infof("NUM_EVENT_CHANNELS: %d, EVENT_CHANNEL_SIZE: %d, MAX_EVENTS_PER_BATCH: %d, MAX_INTERVAL_BETWEEN_BATCHES: %d",
NUM_EVENT_CHANNELS, EVENT_CHANNEL_SIZE, MAX_EVENTS_PER_BATCH, MAX_INTERVAL_BETWEEN_BATCHES)
// re-initilizing name registry in case it hadn't picked up the names registered on source/target/source-replica
err := namereg.NameReg.Init()
err = namereg.NameReg.Init()
if err != nil {
return fmt.Errorf("init name registry again: %v", err)
}
Expand Down Expand Up @@ -178,6 +203,19 @@ func streamChangesFromSegment(
event.IsCutoverToSourceReplica() && importerRole == SOURCE_REPLICA_DB_IMPORTER_ROLE ||
event.IsCutoverToSource() && importerRole == SOURCE_DB_IMPORTER_ROLE { // cutover or fall-forward command

err := metaDB.UpdateMigrationStatusRecord(func(record *metadb.MigrationStatusRecord) {
switch importerRole {
case TARGET_DB_IMPORTER_ROLE:
record.CutoverDetectedByTargetImporter = true
case SOURCE_REPLICA_DB_IMPORTER_ROLE:
record.CutoverDetectedBySourceReplicaImporter = true
case SOURCE_DB_IMPORTER_ROLE:
record.CutoverDetectedBySourceImporter = true
}
})
if err != nil {
return fmt.Errorf("error updating the migration status record for cutover detected case: %v", err)
}
updateCallhomeImportPhase(event)

eventQueue.EndOfQueue = true
Expand Down
23 changes: 16 additions & 7 deletions yb-voyager/src/metadb/migrationStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,27 @@ type MigrationStatusRecord struct {

SourceDBConf *srcdb.Source `json:"SourceDBConf"`

CutoverToTargetRequested bool `json:"CutoverToTargetRequested"`
//All the cutover requested flags by initiate cutover command
CutoverToTargetRequested bool `json:"CutoverToTargetRequested"`
CutoverToSourceRequested bool `json:"CutoverToSourceRequested"`
CutoverToSourceReplicaRequested bool `json:"CutoverToSourceReplicaRequested"`

//All the cutover detected by importer flags (marked when the cutover event is recieved by the importer)
CutoverDetectedByTargetImporter bool `json:"CutoverDetectedByTargetImporter"`
CutoverDetectedBySourceImporter bool `json:"CutoverDetectedBySourceImporter"`
CutoverDetectedBySourceReplicaImporter bool `json:"CutoverDetectedBySourceReplicaImporter"`

//All the cutover processed by importer/exporter flags - indicating that the cutover is completed by that command.
CutoverProcessedBySourceExporter bool `json:"CutoverProcessedBySourceExporter"`
CutoverProcessedByTargetImporter bool `json:"CutoverProcessedByTargetImporter"`
ExportFromTargetFallForwardStarted bool `json:"ExportFromTargetFallForwardStarted"`
CutoverToSourceReplicaRequested bool `json:"CutoverToSourceReplicaRequested"`
CutoverToSourceProcessedByTargetExporter bool `json:"CutoverToSourceProcessedByTargetExporter"`
CutoverToSourceReplicaProcessedByTargetExporter bool `json:"CutoverToSourceReplicaProcessedByTargetExporter"`
CutoverProcessedByTargetImporter bool `json:"CutoverProcessedByTargetImporter"`
CutoverToSourceReplicaProcessedBySRImporter bool `json:"CutoverToSourceReplicaProcessedBySRImporter"`
ExportFromTargetFallBackStarted bool `json:"ExportFromTargetFallBackStarted"`
CutoverToSourceRequested bool `json:"CutoverToSourceRequested"`
CutoverToSourceProcessedByTargetExporter bool `json:"CutoverToSourceProcessedByTargetExporter"`
CutoverToSourceProcessedBySourceImporter bool `json:"CutoverToSourceProcessedBySourceImporter"`

ExportFromTargetFallForwardStarted bool `json:"ExportFromTargetFallForwardStarted"`
ExportFromTargetFallBackStarted bool `json:"ExportFromTargetFallBackStarted"`

ExportSchemaDone bool `json:"ExportSchemaDone"`
ExportDataDone bool `json:"ExportDataDone"` // to be interpreted as export of snapshot data from source is complete
ExportDataSourceDebeziumStarted bool `json:"ExportDataSourceDebeziumStarted"`
Expand Down

0 comments on commit a35486f

Please sign in to comment.