Skip to content

Commit

Permalink
mycdc: fix snapshot consistency issue
Browse files Browse the repository at this point in the history
We need to take the global lock, then we can create our consistent
snapshot. The lock is needed to prevent writes from sneaking in between.

The process is actually explained very nicely be debezium documentation.

https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-initial-snapshot-workflow-with-global-read-lock
  • Loading branch information
rockwotj committed Jan 26, 2025
1 parent dc9ac0a commit 85470f6
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 22 deletions.
6 changes: 4 additions & 2 deletions internal/impl/mysql/input_mysql_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,16 @@ func (i *mysqlStreamInput) Connect(ctx context.Context) error {
// canalConfig.Logger

for _, table := range i.tables {
canalConfig.IncludeTableRegex = append(canalConfig.IncludeTableRegex, regexp.QuoteMeta(table))
canalConfig.IncludeTableRegex = append(
canalConfig.IncludeTableRegex,
"^"+regexp.QuoteMeta(i.mysqlConfig.DBName+"."+table)+"$",
)
}

c, err := canal.NewCanal(canalConfig)
if err != nil {
return err
}
c.AddDumpTables(i.mysqlConfig.DBName, i.tables...)

i.canal = c

Expand Down
96 changes: 96 additions & 0 deletions internal/impl/mysql/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,25 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"

_ "github.com/go-sql-driver/mysql"
"github.com/redpanda-data/benthos/v4/public/bloblang"
_ "github.com/redpanda-data/benthos/v4/public/components/io"
_ "github.com/redpanda-data/benthos/v4/public/components/pure"
"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/service/integration"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/connect/v4/internal/asyncroutine"
"github.com/redpanda-data/connect/v4/internal/license"
)

Expand Down Expand Up @@ -652,3 +656,95 @@ memory: {}
"json_col": {"foo":-1,"bar":[3,2,1]}
}`, outBatches[1])
}

func TestIntegrationMySQLSnapshotConsistency(t *testing.T) {
dsn, db := setupTestWithMySQLVersion(t, "8.0")
db.Exec(`
CREATE TABLE IF NOT EXISTS foo (
a INT AUTO_INCREMENT,
PRIMARY KEY (a)
)
`)

template := strings.NewReplacer("$DSN", dsn).Replace(`
read_until:
# Stop when we're idle for 3 seconds, which means our writer stopped
idle_timeout: 3s
input:
mysql_cdc:
dsn: $DSN
stream_snapshot: true
snapshot_max_batch_size: 500
checkpoint_cache: foocache
tables:
- foo
`)

cacheConf := `
label: foocache
file:
directory: ` + t.TempDir()

streamOutBuilder := service.NewStreamBuilder()
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: DEBUG`))
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
require.NoError(t, streamOutBuilder.AddInputYAML(template))

var ids []int64
var batchMu sync.Mutex
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, batch service.MessageBatch) error {
batchMu.Lock()
defer batchMu.Unlock()
for _, msg := range batch {
data, err := msg.AsStructured()
require.NoError(t, err)
v, err := bloblang.ValueAsInt64(data.(map[string]any)["a"])
require.NoError(t, err)
ids = append(ids, v)
}
return nil
}))

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamOut.Resources())

// Continuously write so there is a chance we skip data between snapshot and stream hand off.
var count atomic.Int64
writer := asyncroutine.NewPeriodic(time.Microsecond, func() {
db.Exec("INSERT INTO foo (a) VALUES (DEFAULT)")
count.Add(1)
})
writer.Start()
t.Cleanup(writer.Stop)

// Wait to write some values so there are some values in the snapshot
time.Sleep(time.Second)

streamStopped := make(chan any, 1)
go func() {
err = streamOut.Run(context.Background())
require.NoError(t, err)
streamStopped <- nil
}()

// Let the writer write a little more
time.Sleep(time.Second * 3)

writer.Stop()

// Okay now wait for the stream to finish (the stream auto closes after it gets nothing for 3 seconds)
select {
case <-streamStopped:
case <-time.After(30 * time.Second):
require.Fail(t, "stream did not complete in time")
}
require.NoError(t, streamOut.StopWithin(time.Second*10))
expected := []int64{}
for i := range count.Load() {
expected = append(expected, i+1)
}
batchMu.Lock()
require.Equal(t, expected, ids)
batchMu.Unlock()
}
49 changes: 29 additions & 20 deletions internal/impl/mysql/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,6 @@ func (s *Snapshot) prepareSnapshot(ctx context.Context) (*position, error) {
return nil, fmt.Errorf("failed to start transaction: %v", err)
}

/*
START TRANSACTION WITH CONSISTENT SNAPSHOT ensures a consistent view of database state
when reading historical data during CDC initialization. Without it, concurrent writes
could create inconsistencies between binlog position and table snapshots, potentially
missing or duplicating events. The snapshot prevents other transactions from modifying
the data being read, maintaining referential integrity across tables while capturing
the initial state.
*/

// NOTE: this is a little sneaky because we're actually implicitly closing the transaction
// started with `BeginTx` above and replacing it with this one. We have to do this because
// the `database/sql` driver we're using does not support this WITH CONSISTENT SNAPSHOT.
if _, err := s.tx.ExecContext(ctx, "START TRANSACTION WITH CONSISTENT SNAPSHOT"); err != nil {
if rErr := s.tx.Rollback(); rErr != nil {
return nil, rErr
}

return nil, fmt.Errorf("failed to start consistent snapshot: %v", err)
}

/*
FLUSH TABLES WITH READ LOCK is executed after CONSISTENT SNAPSHOT to:
1. Force MySQL to flush all data from memory to disk
Expand All @@ -96,6 +76,35 @@ func (s *Snapshot) prepareSnapshot(ctx context.Context) (*position, error) {
return nil, fmt.Errorf("failed to acquire global read lock: %v", err)
}

/*
START TRANSACTION WITH CONSISTENT SNAPSHOT ensures a consistent view of database state
when reading historical data during CDC initialization. Without it, concurrent writes
could create inconsistencies between binlog position and table snapshots, potentially
missing or duplicating events. The snapshot prevents other transactions from modifying
the data being read, maintaining referential integrity across tables while capturing
the initial state.
It's important that we do this AFTER we acquire the READ LOCK and flushing the tables,
otherwise other writes could sneak in between our transaction snapshot and acquiring the
lock.
*/

// NOTE: this is a little sneaky because we're actually implicitly closing the transaction
// started with `BeginTx` above and replacing it with this one. We have to do this because
// the `database/sql` driver we're using does not support this WITH CONSISTENT SNAPSHOT.
if _, err := s.tx.ExecContext(ctx, "START TRANSACTION WITH CONSISTENT SNAPSHOT"); err != nil {
// Make sure to release the lock if we fail
if _, eErr := s.lockConn.ExecContext(ctx, "UNLOCK TABLES"); eErr != nil {
return nil, eErr
}

if rErr := s.tx.Rollback(); rErr != nil {
return nil, rErr
}

return nil, fmt.Errorf("failed to start consistent snapshot: %v", err)
}

// Get binary log position (while locked)
pos, err := s.getCurrentBinlogPosition(ctx)
if err != nil {
Expand Down

0 comments on commit 85470f6

Please sign in to comment.