Skip to content

Commit

Permalink
Merge branch 'master' into meiji163/generated-col-error
Browse files Browse the repository at this point in the history
  • Loading branch information
meiji163 authored Dec 18, 2024
2 parents 4960711 + 690b1e1 commit 1c88601
Show file tree
Hide file tree
Showing 156 changed files with 3,908 additions and 1,774 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
/libexec/
/.vendor/
.idea/
*.tmp
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Please see [Coding gh-ost](doc/coding-ghost.md) for a guide to getting started d

`gh-ost` is a Go project; it is built with Go `1.15` and above. To build on your own, use either:
- [script/build](https://github.com/github/gh-ost/blob/master/script/build) - this is the same build script used by CI hence the authoritative; artifact is `./bin/gh-ost` binary.
- [build.sh](https://github.com/github/gh-ost/blob/master/build.sh) for building `tar.gz` artifacts in `/tmp/gh-ost`
- [build.sh](https://github.com/github/gh-ost/blob/master/build.sh) for building `tar.gz` artifacts in `/tmp/gh-ost-release`

Generally speaking, `master` branch is stable, but only [releases](https://github.com/github/gh-ost/releases) are to be used in production.

Expand Down
9 changes: 5 additions & 4 deletions build.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#!/bin/bash
#
#

RELEASE_VERSION=
buildpath=
Expand Down Expand Up @@ -72,11 +70,14 @@ main() {
build macOS osx darwin amd64
build macOS osx darwin arm64

bin_files=$(find $buildpath/gh-ost* -type f -maxdepth 1)
echo "Binaries found in:"
find $buildpath/gh-ost* -type f -maxdepth 1
echo "$bin_files"

echo "Checksums:"
(cd $buildpath && shasum -a256 gh-ost* 2>/dev/null)
(shasum -a256 $bin_files 2>/dev/null)

echo "Build Success!"
}

main "$@"
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ require (
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.34.0
golang.org/x/net v0.24.0
golang.org/x/term v0.19.0
golang.org/x/text v0.14.0
golang.org/x/sync v0.10.0
golang.org/x/term v0.27.0
golang.org/x/text v0.21.0
)

require (
Expand Down Expand Up @@ -65,7 +66,7 @@ require (
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sys v0.28.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand All @@ -199,6 +199,8 @@ golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -209,15 +211,15 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
5 changes: 5 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ type MigrationContext struct {
AzureMySQL bool
AttemptInstantDDL bool

// SkipPortValidation allows skipping the port validation in `ValidateConnection`
// This is useful when connecting to a MySQL instance where the external port
// may not match the internal port.
SkipPortValidation bool

config ContextConfig
configMutex *sync.Mutex
ConfigFile string
Expand Down
11 changes: 10 additions & 1 deletion go/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,27 @@ func StringContainsAll(s string, substrings ...string) bool {

func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
versionQuery := `select @@global.version`
var port, extraPort int

var version string
if err := db.QueryRow(versionQuery).Scan(&version); err != nil {
return "", err
}

if migrationContext.SkipPortValidation {
return version, nil
}

var extraPort int

extraPortQuery := `select @@global.extra_port`
if err := db.QueryRow(extraPortQuery).Scan(&extraPort); err != nil { //nolint:staticcheck
// swallow this error. not all servers support extra_port
}

// AliyunRDS set users port to "NULL", replace it by gh-ost param
// GCP set users port to "NULL", replace it by gh-ost param
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
var port int
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
port = connectionConfig.Key.Port
} else {
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")

maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
Expand Down
93 changes: 71 additions & 22 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"

"github.com/openark/golib/log"
"context"
"database/sql/driver"

"github.com/github/gh-ost/go/mysql"
drivermysql "github.com/go-sql-driver/mysql"
"github.com/openark/golib/sqlutils"
)

Expand Down Expand Up @@ -77,7 +80,8 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {

func (this *Applier) InitDBConnections() (err error) {
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
uriWithMulti := fmt.Sprintf("%s&multiStatements=true", applierUri)
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, uriWithMulti); err != nil {
return err
}
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
Expand Down Expand Up @@ -251,6 +255,15 @@ func (this *Applier) ValidateOrDropExistingTables() error {
func (this *Applier) AttemptInstantDDL() error {
query := this.generateInstantDDLQuery()
this.migrationContext.Log.Infof("INSTANT DDL query is: %s", query)

// Reuse cut-over-lock-timeout from regular migration process to reduce risk
// in situations where there may be long-running transactions.
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
lockTimeoutQuery := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := this.db.Exec(lockTimeoutQuery); err != nil {
return err
}
// We don't need a trx, because for instant DDL the SQL mode doesn't matter.
_, err := this.db.Exec(query)
return err
Expand Down Expand Up @@ -1207,44 +1220,80 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
ctx := context.Background()

err := func() error {
tx, err := this.db.Begin()
conn, err := this.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()

sessionQuery := "SET /* gh-ost */ SESSION time_zone = '+00:00'"
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())
if _, err := conn.ExecContext(ctx, sessionQuery); err != nil {
return err
}

tx, err := conn.BeginTx(ctx, nil)
if err != nil {
return err
}
rollback := func(err error) error {
tx.Rollback()
return err
}

sessionQuery := "SET /* gh-ost */ SESSION time_zone = '+00:00'"
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())

if _, err := tx.Exec(sessionQuery); err != nil {
return rollback(err)
}
buildResults := make([]*dmlBuildResult, 0, len(dmlEvents))
nArgs := 0
for _, dmlEvent := range dmlEvents {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if buildResult.err != nil {
return rollback(buildResult.err)
}
result, err := tx.Exec(buildResult.query, buildResult.args...)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args)
return rollback(err)
}
nArgs += len(buildResult.args)
buildResults = append(buildResults, buildResult)
}
}

rowsAffected, err := result.RowsAffected()
if err != nil {
log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err)
rowsAffected = 1
// We batch together the DML queries into multi-statements to minimize network trips.
// We have to use the raw driver connection to access the rows affected
// for each statement in the multi-statement.
execErr := conn.Raw(func(driverConn any) error {
ex := driverConn.(driver.ExecerContext)
nvc := driverConn.(driver.NamedValueChecker)

multiArgs := make([]driver.NamedValue, 0, nArgs)
multiQueryBuilder := strings.Builder{}
for _, buildResult := range buildResults {
for _, arg := range buildResult.args {
nv := driver.NamedValue{Value: driver.Value(arg)}
nvc.CheckNamedValue(&nv)
multiArgs = append(multiArgs, nv)
}
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
totalDelta += buildResult.rowsDelta * rowsAffected

multiQueryBuilder.WriteString(buildResult.query)
multiQueryBuilder.WriteString(";\n")
}

res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
return err
}

mysqlRes := res.(drivermysql.Result)

// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
totalDelta += buildResults[i].rowsDelta * rowsAffected
}
return nil
})

if execErr != nil {
return rollback(execErr)
}
if err := tx.Commit(); err != nil {
return err
Expand Down
Loading

0 comments on commit 1c88601

Please sign in to comment.