From 234b9806c58d7d775816dd432159f74d89e94fe0 Mon Sep 17 00:00:00 2001 From: Maksim Tiushev Date: Tue, 3 Sep 2024 18:22:37 +0000 Subject: [PATCH] tt: add command `tt upgrade` Part of #924 @TarantoolBot document Title: `tt upgrade` upgrades database schema. This patch adds `tt upgrade` command to automate the process of upgrading the database schema across both master and replica instances. The command follows the required steps in the correct order, ensuring proper synchronization between instances, and minimizes the need for manual intervention. ``` tt upgrade [] [flags] ``` `tt upgrade` command steps: - For each replicaset: - On the master instance: 1. Execute the following commands sequentially: ``` box.schema.upgrade() box.snapshot() ``` - On each replica: 1. Wait until the replica applies all transactions produced by `box.schema.upgrade()` on the master by comparing the vector clocks (vclock). 2. Once synchronization is confirmed, execute the following commands sequentially on the replica: ``` box.schema.upgrade() box.snapshot() ``` > If any errors occur during the upgrade process, the process will stop and an error report will be generated. There are flags supported by this command: - `--replicaset (-r) stringArray`: specify the replicaset name(s) to upgrade; - `--timeout (-t) int`: timeout for waiting the LSN synchronization in seconds (default 5); Usage examples: - Update all active replicasets managed by `tt`: ``` $tt upgrade ``` - Update active replicasets of application : ``` $tt upgrade ``` - Update the specified replicaset(s) of application : ``` $tt upgrade -r -r ... ``` --- cli/cmd/root.go | 1 + cli/cmd/upgrade.go | 77 +++++++++++++++ cli/replicaset/cmd/status.go | 21 ++-- cli/upgrade/lua/upgrade.lua | 11 +++ cli/upgrade/upgrade.go | 182 +++++++++++++++++++++++++++++++++++ 5 files changed, 286 insertions(+), 6 deletions(-) create mode 100644 cli/cmd/upgrade.go create mode 100644 cli/upgrade/lua/upgrade.lua create mode 100644 cli/upgrade/upgrade.go diff --git a/cli/cmd/root.go b/cli/cmd/root.go index 1b69240cc..823f53f26 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -194,6 +194,7 @@ After that tt will be able to manage the application using 'replicaset_example' NewKillCmd(), NewLogCmd(), NewEnableCmd(), + NewUpgradeCmd(), ) if err := injectCmds(rootCmd); err != nil { panic(err.Error()) diff --git a/cli/cmd/upgrade.go b/cli/cmd/upgrade.go new file mode 100644 index 000000000..f10340c10 --- /dev/null +++ b/cli/cmd/upgrade.go @@ -0,0 +1,77 @@ +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/tarantool/tt/cli/cmd/internal" + "github.com/tarantool/tt/cli/cmdcontext" + "github.com/tarantool/tt/cli/modules" + replicasetcmd "github.com/tarantool/tt/cli/replicaset/cmd" + "github.com/tarantool/tt/cli/running" + "github.com/tarantool/tt/cli/upgrade" + "github.com/tarantool/tt/cli/util" +) + +var ( + Timeout *int + pendingReplicasetAliases *[]string +) + +func NewUpgradeCmd() *cobra.Command { + var upgradeCmd = &cobra.Command{ + Use: "upgrade []", + Short: "upgrade tarantool schema", + Example: `tt upgrade - Upgrade all active replicasets + tt upgrade - Upgrade replicasets of the specified app + tt upgrade -r - Upgrade specific replicaset of app `, + Run: func(cmd *cobra.Command, args []string) { + cmdCtx.CommandName = cmd.Name() + err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, + upgradeInstance, args) + util.HandleCmdErr(cmd, err) + }, + ValidArgsFunction: func( + cmd *cobra.Command, + args []string, + toComplete string) ([]string, cobra.ShellCompDirective) { + return internal.ValidArgsFunction( + cliOpts, &cmdCtx, cmd, toComplete, + running.ExtractAppNames, + running.ExtractInstanceNames) + }, + } + + pendingReplicasetAliases = upgradeCmd.Flags().StringArrayP("replicaset", "r", + []string{}, "specify the replicaset name(s) to upgrade") + + Timeout = upgradeCmd.Flags().IntP("timeout", "t", 5, + "timeout for waiting the LSN synchronization (in seconds)") + + return upgradeCmd +} + +func upgradeInstance(cmdCtx *cmdcontext.CmdCtx, args []string) error { + if !isConfigExist(cmdCtx) { + return errNoConfig + } + + var ctx replicasetCtx + if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil { + return err + } + if ctx.IsInstanceConnect { + defer ctx.Conn.Close() + } + statusCtx := replicasetcmd.StatusCtx{ + IsApplication: ctx.IsApplication, + RunningCtx: ctx.RunningCtx, + Conn: ctx.Conn, + Orchestrator: ctx.Orchestrator, + } + + replicasets, err := replicasetcmd.GetReplicasets(statusCtx) + if err != nil { + return err + } + + return upgrade.Upgrade(replicasets, *pendingReplicasetAliases, *Timeout) +} diff --git a/cli/replicaset/cmd/status.go b/cli/replicaset/cmd/status.go index 99a8c41c1..a58959cc6 100644 --- a/cli/replicaset/cmd/status.go +++ b/cli/replicaset/cmd/status.go @@ -23,28 +23,37 @@ type StatusCtx struct { Orchestrator replicaset.Orchestrator } -// Status shows a replicaset status. -func Status(statusCtx StatusCtx) error { +func GetReplicasets(statusCtx StatusCtx) (replicaset.Replicasets, error) { + var replicasets replicaset.Replicasets orchestratorType, err := getOrchestratorType(statusCtx.Orchestrator, statusCtx.Conn, statusCtx.RunningCtx) if err != nil { - return err + return replicasets, err } var orchestrator replicasetOrchestrator if statusCtx.IsApplication { if orchestrator, err = makeApplicationOrchestrator( orchestratorType, statusCtx.RunningCtx, nil, nil); err != nil { - return err + return replicasets, err } } else { if orchestrator, err = makeInstanceOrchestrator( orchestratorType, statusCtx.Conn); err != nil { - return err + return replicasets, err } } - replicasets, err := orchestrator.Discovery(replicaset.SkipCache) + replicasets, err = orchestrator.Discovery(replicaset.SkipCache) + if err != nil { + return replicasets, err + } + return replicasets, err +} + +// Status shows a replicaset status. +func Status(statusCtx StatusCtx) error { + replicasets, err := GetReplicasets(statusCtx) if err != nil { return err } diff --git a/cli/upgrade/lua/upgrade.lua b/cli/upgrade/lua/upgrade.lua new file mode 100644 index 000000000..d755e4c1d --- /dev/null +++ b/cli/upgrade/lua/upgrade.lua @@ -0,0 +1,11 @@ +local ok, err +ok, err = pcall(box.schema.upgrade) +if ok then + ok, err = pcall(box.snapshot) +end + +return { + lsn = box.info.lsn, + iid = box.info.id, + err = (not ok) and tostring(err) or nil, +} \ No newline at end of file diff --git a/cli/upgrade/upgrade.go b/cli/upgrade/upgrade.go new file mode 100644 index 000000000..6d0cef7be --- /dev/null +++ b/cli/upgrade/upgrade.go @@ -0,0 +1,182 @@ +package upgrade + +import ( + _ "embed" + "errors" + "fmt" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/tarantool/tt/cli/connector" + "github.com/tarantool/tt/cli/replicaset" + "github.com/tarantool/tt/cli/running" +) + +//go:embed lua/upgrade.lua +var upgradeLuaScript string + +type SyncInfo struct { + LSN uint32 `mapstructure:"lsn"` + IID uint32 `mapstructure:"iid"` + Err *string `mapstructure:"err"` +} + +func internalUpgrade(conn connector.Connector) (SyncInfo, error) { + var upgradeInfo SyncInfo + res, err := conn.Eval(upgradeLuaScript, []any{}, connector.RequestOpts{}) + if err != nil { + return upgradeInfo, err + } + if err := mapstructure.Decode(res[0], &upgradeInfo); err != nil { + return upgradeInfo, err + } + if upgradeInfo.Err != nil { + return upgradeInfo, errors.New(*upgradeInfo.Err) + } + return upgradeInfo, nil +} + +func WaitLSN(conn connector.Connector, masterUpgradeInfo SyncInfo, timeout int) error { + for i := 0; i < timeout; i++ { + var res []interface{} + res, err := conn.Eval( + fmt.Sprintf("return box.info.vclock[%d]", masterUpgradeInfo.IID), + []any{}, connector.RequestOpts{}) + if err != nil || len(res) == 0 { + return err + } + var lsn uint32 + switch v := res[0].(type) { + case uint16: + lsn = uint32(v) + case uint32: + lsn = v + } + if lsn >= masterUpgradeInfo.LSN { + return nil + } + time.Sleep(time.Second) + } + return errors.New("") +} + +func GetAllowedReplicasets(allReplicasets replicaset.Replicasets, + pendingReplicasetAliases []string) ([]replicaset.Replicaset, error) { + if len(pendingReplicasetAliases) == 0 { + return allReplicasets.Replicasets, nil + } + + replicasetMap := make(map[string]replicaset.Replicaset) + for _, rs := range allReplicasets.Replicasets { + replicasetMap[rs.Alias] = rs + } + + var allowedReplicasets []replicaset.Replicaset + + for _, alias := range pendingReplicasetAliases { + replicaset, exists := replicasetMap[alias] + if !exists { + return nil, fmt.Errorf("replicaset with alias %q doesn't exist", alias) + } + allowedReplicasets = append(allowedReplicasets, replicaset) + } + + return allowedReplicasets, nil +} + +func Upgrade(replicasets replicaset.Replicasets, pendingReplicasetAliases []string, + lsnTimeout int) error { + allowedReplicasets, err := GetAllowedReplicasets(replicasets, pendingReplicasetAliases) + if err != nil { + return err + } + + printReplicasetStatus := func(alias, status string) { + fmt.Printf("• %s: %s\n", alias, status) + } + + for _, rs := range allowedReplicasets { + var masterRun *running.InstanceCtx = nil + var replicRun []running.InstanceCtx + + for _, inst := range rs.Instances { + run := inst.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + conn, err := connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err) + } + + res, err := conn.Eval( + "return (type(box.cfg) == 'function') or box.info.ro", + []any{}, connector.RequestOpts{}) + if err != nil || len(res) == 0 { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err) + } + + if !res[0].(bool) { + if masterRun != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s]: %s and %s are both masters", + rs.Alias, running.GetAppInstanceName(*masterRun), + fullInstanceName) + } + masterRun = &run + } else { + replicRun = append(replicRun, run) + } + } + if masterRun == nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s]: has not master instance", rs.Alias) + } + var conn connector.Connector + conn, err = connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: masterRun.ConsoleSocket, + }) + + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, + running.GetAppInstanceName(*masterRun), err) + } + masterUpgradeInfo, err := internalUpgrade(conn) + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, + running.GetAppInstanceName(*masterRun), err) + } + for _, run := range replicRun { + fullInstanceName := running.GetAppInstanceName(run) + conn, err = connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err) + } + err = WaitLSN(conn, masterUpgradeInfo, lsnTimeout) + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s]: LSN wait timeout: error waiting LSN %d "+ + "in vclock component %d on %s: time quota %d seconds "+ + "exceeded", rs.Alias, masterUpgradeInfo.LSN, + masterUpgradeInfo.IID, fullInstanceName, lsnTimeout) + } + _, err = internalUpgrade(conn) + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err) + } + } + printReplicasetStatus(rs.Alias, "ok") + } + return nil +}