From 2116cf71f800e797374bbdbf5a11d11ea5f2b2c0 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Fri, 14 Feb 2025 16:31:20 +0100 Subject: [PATCH 1/3] Cherry-pick 420342fddb0ea91df012f7be4d1ae0b59e71448a with conflicts --- go/test/endtoend/vreplication/fk_ext_test.go | 5 ++++- go/test/endtoend/vreplication/fk_test.go | 15 +++++++++++++++ .../tabletmanager/vreplication/vcopier_atomic.go | 16 ++++++++++++---- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/vreplication/fk_ext_test.go b/go/test/endtoend/vreplication/fk_ext_test.go index 401b99360d8..fdfc936ebd5 100644 --- a/go/test/endtoend/vreplication/fk_ext_test.go +++ b/go/test/endtoend/vreplication/fk_ext_test.go @@ -90,7 +90,10 @@ func TestFKExt(t *testing.T) { setSidecarDBName("_vt") // Ensure that there are multiple copy phase cycles per table. - extraVTTabletArgs = append(extraVTTabletArgs, "--vstream_packet_size=256", "--queryserver-config-schema-change-signal") + extraVTTabletArgs = append(extraVTTabletArgs, + "--vstream_packet_size=256", + "--queryserver-config-schema-change-signal", + parallelInsertWorkers) extraVTGateArgs = append(extraVTGateArgs, "--schema_change_signal=true", "--planner-version", "Gen4") defer func() { extraVTTabletArgs = nil }() initFKExtConfig(t) diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 3a104004726..c9be86f74d4 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -40,6 +40,7 @@ const testWorkflowFlavor = workflowFlavorRandom // It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without, // i.e. with foreign_key_checks=0. func TestFKWorkflow(t *testing.T) { + setSidecarDBName("_vt") extraVTTabletArgs = []string{ // Ensure that there are multiple copy phase cycles per table. "--vstream_packet_size=256", @@ -129,6 +130,20 @@ func TestFKWorkflow(t *testing.T) { <-ch } mt.Complete() +<<<<<<< HEAD +======= + vtgateConn, closeConn := getVTGateConn() + defer closeConn() + + if withLoad { + t11Count := getRowCount(t, vtgateConn, "t11") + t12Count := getRowCount(t, vtgateConn, "t12") + require.Greater(t, t11Count, 1) + require.Greater(t, t12Count, 1) + require.Equal(t, t11Count, t12Count) + } + +>>>>>>> 420342fddb (VReplication Atomic Copy Workflows: fix bugs around concurrent inserts (#17772)) } func insertInitialFKData(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go index 6e4204c46a5..8a7a60c6dbd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go @@ -85,12 +85,17 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings rowsCopiedTicker := time.NewTicker(rowsCopiedUpdateInterval) defer rowsCopiedTicker.Stop() +<<<<<<< HEAD parallelism := getInsertParallelism() // For now do not support concurrent inserts for atomic copies. if parallelism > 1 { parallelism = 1 log.Infof("Disabling concurrent inserts for atomic copies") } +======= + parallelism := int(math.Max(1, float64(vc.vr.workflowConfig.ParallelInsertWorkers))) + +>>>>>>> 420342fddb (VReplication Atomic Copy Workflows: fix bugs around concurrent inserts (#17772)) copyWorkerFactory := vc.newCopyWorkerFactory(parallelism) var copyWorkQueue *vcopierCopyWorkQueue @@ -114,7 +119,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings resp.TableName, len(resp.Fields), len(resp.Rows), resp.Gtid, resp.Lastpk) tableName := resp.TableName gtid = resp.Gtid - updateRowsCopied := func() error { updateRowsQuery := binlogplayer.GenerateUpdateRowsCopied(vc.vr.id, vc.vr.stats.CopyRowCount.Get()) _, err := vc.vr.dbClient.Execute(updateRowsQuery) @@ -204,6 +208,10 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings log.Infof("copying table %s with lastpk %v", tableName, lastpkbv) // Prepare a vcopierCopyTask for the current batch of work. currCh := make(chan *vcopierCopyTaskResult, 1) + + if parallelism > 1 { + resp = resp.CloneVT() + } currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp.Rows, resp.Lastpk)) // Send result to the global resultCh and currCh. resultCh is used by @@ -291,12 +299,12 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings log.Infof("Copy of %v stopped", state.currentTableName) return fmt.Errorf("CopyAll was interrupted due to context expiration") default: - if err := vc.deleteCopyState(state.currentTableName); err != nil { - return err - } if copyWorkQueue != nil { copyWorkQueue.close() } + if err := vc.deleteCopyState(state.currentTableName); err != nil { + return err + } if err := vc.updatePos(ctx, gtid); err != nil { return err } From a0b8631e1483af7661c5516ed4b32d453dbefa2d Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 15 Feb 2025 23:09:25 +0100 Subject: [PATCH 2/3] Fix conflicts Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/fk_test.go | 14 -------------- .../tabletmanager/vreplication/vcopier_atomic.go | 9 --------- 2 files changed, 23 deletions(-) diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index c9be86f74d4..64587a21da8 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -130,20 +130,6 @@ func TestFKWorkflow(t *testing.T) { <-ch } mt.Complete() -<<<<<<< HEAD -======= - vtgateConn, closeConn := getVTGateConn() - defer closeConn() - - if withLoad { - t11Count := getRowCount(t, vtgateConn, "t11") - t12Count := getRowCount(t, vtgateConn, "t12") - require.Greater(t, t11Count, 1) - require.Greater(t, t12Count, 1) - require.Equal(t, t11Count, t12Count) - } - ->>>>>>> 420342fddb (VReplication Atomic Copy Workflows: fix bugs around concurrent inserts (#17772)) } func insertInitialFKData(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go index 8a7a60c6dbd..424996255e2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go @@ -85,17 +85,8 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings rowsCopiedTicker := time.NewTicker(rowsCopiedUpdateInterval) defer rowsCopiedTicker.Stop() -<<<<<<< HEAD parallelism := getInsertParallelism() - // For now do not support concurrent inserts for atomic copies. - if parallelism > 1 { - parallelism = 1 - log.Infof("Disabling concurrent inserts for atomic copies") - } -======= - parallelism := int(math.Max(1, float64(vc.vr.workflowConfig.ParallelInsertWorkers))) ->>>>>>> 420342fddb (VReplication Atomic Copy Workflows: fix bugs around concurrent inserts (#17772)) copyWorkerFactory := vc.newCopyWorkerFactory(parallelism) var copyWorkQueue *vcopierCopyWorkQueue From 187c1ae71cb3df1876c41f6d895a400c8889a919 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 15 Feb 2025 23:09:58 +0100 Subject: [PATCH 3/3] Trigger rebuild Signed-off-by: Rohit Nayak