diff --git a/cli/cmd/root.go b/cli/cmd/root.go index 1b69240cc..823f53f26 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -194,6 +194,7 @@ After that tt will be able to manage the application using 'replicaset_example' NewKillCmd(), NewLogCmd(), NewEnableCmd(), + NewUpgradeCmd(), ) if err := injectCmds(rootCmd); err != nil { panic(err.Error()) diff --git a/cli/cmd/upgrade.go b/cli/cmd/upgrade.go new file mode 100644 index 000000000..a8541b4a4 --- /dev/null +++ b/cli/cmd/upgrade.go @@ -0,0 +1,83 @@ +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/tarantool/tt/cli/cmd/internal" + "github.com/tarantool/tt/cli/cmdcontext" + "github.com/tarantool/tt/cli/modules" + "github.com/tarantool/tt/cli/replicaset" + replicasetcmd "github.com/tarantool/tt/cli/replicaset/cmd" + "github.com/tarantool/tt/cli/running" + "github.com/tarantool/tt/cli/upgrade" + "github.com/tarantool/tt/cli/util" +) + +var ( + Timeout *int + pendingReplicasetAliases *[]string +) + +func NewUpgradeCmd() *cobra.Command { + var upgradeCmd = &cobra.Command{ + Use: "upgrade []", + Short: "upgrade tarantool schema", + Example: `tt upgrade - Upgrade all active replicasets + tt upgrade - Upgrade replicasets of the specified app + tt upgrade -r - Upgrade specific replicaset of app `, + Run: func(cmd *cobra.Command, args []string) { + cmdCtx.CommandName = cmd.Name() + err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, + upgradeReplicasets, args) + util.HandleCmdErr(cmd, err) + }, + ValidArgsFunction: func( + cmd *cobra.Command, + args []string, + toComplete string) ([]string, cobra.ShellCompDirective) { + return internal.ValidArgsFunction( + cliOpts, &cmdCtx, cmd, toComplete, + running.ExtractAppNames, + running.ExtractInstanceNames) + }, + } + + pendingReplicasetAliases = upgradeCmd.Flags().StringArrayP("replicaset", "r", + []string{}, "specify the replicaset name(s) to upgrade") + + Timeout = upgradeCmd.Flags().IntP("timeout", "t", 5, + "timeout for waiting the LSN synchronization (in seconds)") + + return upgradeCmd +} + +func prepareReplicasets(cmdCtx *cmdcontext.CmdCtx, args []string) (replicaset.Replicasets, error) { + var ctx replicasetCtx + if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil { + return replicaset.Replicasets{}, err + } + if ctx.IsInstanceConnect { + defer ctx.Conn.Close() + } + statusCtx := replicasetcmd.StatusCtx{ + IsApplication: ctx.IsApplication, + RunningCtx: ctx.RunningCtx, + Conn: ctx.Conn, + Orchestrator: ctx.Orchestrator, + } + + replicasets, err := replicasetcmd.GetReplicasets(statusCtx) + return replicasets, err +} + +func upgradeReplicasets(cmdCtx *cmdcontext.CmdCtx, args []string) error { + if !isConfigExist(cmdCtx) { + return errNoConfig + } + + replicasets, err := prepareReplicasets(cmdCtx, args) + if err != nil { + return err + } + + return upgrade.Upgrade(replicasets, *pendingReplicasetAliases, *Timeout) +} diff --git a/cli/replicaset/cmd/status.go b/cli/replicaset/cmd/status.go index 99a8c41c1..a58959cc6 100644 --- a/cli/replicaset/cmd/status.go +++ b/cli/replicaset/cmd/status.go @@ -23,28 +23,37 @@ type StatusCtx struct { Orchestrator replicaset.Orchestrator } -// Status shows a replicaset status. -func Status(statusCtx StatusCtx) error { +func GetReplicasets(statusCtx StatusCtx) (replicaset.Replicasets, error) { + var replicasets replicaset.Replicasets orchestratorType, err := getOrchestratorType(statusCtx.Orchestrator, statusCtx.Conn, statusCtx.RunningCtx) if err != nil { - return err + return replicasets, err } var orchestrator replicasetOrchestrator if statusCtx.IsApplication { if orchestrator, err = makeApplicationOrchestrator( orchestratorType, statusCtx.RunningCtx, nil, nil); err != nil { - return err + return replicasets, err } } else { if orchestrator, err = makeInstanceOrchestrator( orchestratorType, statusCtx.Conn); err != nil { - return err + return replicasets, err } } - replicasets, err := orchestrator.Discovery(replicaset.SkipCache) + replicasets, err = orchestrator.Discovery(replicaset.SkipCache) + if err != nil { + return replicasets, err + } + return replicasets, err +} + +// Status shows a replicaset status. +func Status(statusCtx StatusCtx) error { + replicasets, err := GetReplicasets(statusCtx) if err != nil { return err } diff --git a/cli/upgrade/lua/upgrade.lua b/cli/upgrade/lua/upgrade.lua new file mode 100644 index 000000000..d755e4c1d --- /dev/null +++ b/cli/upgrade/lua/upgrade.lua @@ -0,0 +1,11 @@ +local ok, err +ok, err = pcall(box.schema.upgrade) +if ok then + ok, err = pcall(box.snapshot) +end + +return { + lsn = box.info.lsn, + iid = box.info.id, + err = (not ok) and tostring(err) or nil, +} \ No newline at end of file diff --git a/cli/upgrade/upgrade.go b/cli/upgrade/upgrade.go new file mode 100644 index 000000000..73c78b873 --- /dev/null +++ b/cli/upgrade/upgrade.go @@ -0,0 +1,183 @@ +package upgrade + +import ( + _ "embed" + "errors" + "fmt" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/tarantool/tt/cli/connector" + "github.com/tarantool/tt/cli/replicaset" + "github.com/tarantool/tt/cli/running" +) + +//go:embed lua/upgrade.lua +var upgradeLuaScript string + +type SyncInfo struct { + LSN uint32 `mapstructure:"lsn"` + IID uint32 `mapstructure:"iid"` + Err *string `mapstructure:"err"` +} + +func internalUpgrade(conn connector.Connector) (SyncInfo, error) { + var upgradeInfo SyncInfo + res, err := conn.Eval(upgradeLuaScript, []any{}, connector.RequestOpts{}) + if err != nil { + return upgradeInfo, err + } + if err := mapstructure.Decode(res[0], &upgradeInfo); err != nil { + return upgradeInfo, err + } + if upgradeInfo.Err != nil { + return upgradeInfo, errors.New(*upgradeInfo.Err) + } + return upgradeInfo, nil +} + +func WaitLSN(conn connector.Connector, masterUpgradeInfo SyncInfo, timeout int) error { + for i := 0; i < timeout; i++ { + var res []interface{} + res, err := conn.Eval( + fmt.Sprintf("return box.info.vclock[%d]", masterUpgradeInfo.IID), + []any{}, connector.RequestOpts{}) + if err != nil || len(res) == 0 { + return err + } + var lsn uint32 + switch v := res[0].(type) { + case uint16: + lsn = uint32(v) + case uint32: + lsn = v + } + if lsn >= masterUpgradeInfo.LSN { + return nil + } + time.Sleep(time.Second) + } + return errors.New("") +} + +func GetAllowedReplicasets(allReplicasets replicaset.Replicasets, + pendingReplicasetAliases []string) ([]replicaset.Replicaset, error) { + if len(pendingReplicasetAliases) == 0 { + return allReplicasets.Replicasets, nil + } + + replicasetMap := make(map[string]replicaset.Replicaset) + for _, rs := range allReplicasets.Replicasets { + replicasetMap[rs.Alias] = rs + } + + var allowedReplicasets []replicaset.Replicaset + + for _, alias := range pendingReplicasetAliases { + replicaset, exists := replicasetMap[alias] + if !exists { + return nil, fmt.Errorf("replicaset with alias %q doesn't exist", alias) + } + allowedReplicasets = append(allowedReplicasets, replicaset) + } + + return allowedReplicasets, nil +} + +func Upgrade(replicasets replicaset.Replicasets, pendingReplicasetAliases []string, + lsnTimeout int) error { + allowedReplicasets, err := GetAllowedReplicasets(replicasets, pendingReplicasetAliases) + if err != nil { + return err + } + + printReplicasetStatus := func(alias, status string) { + fmt.Printf("• %s: %s\n", alias, status) + } + + for _, rs := range allowedReplicasets { + var masterRun *running.InstanceCtx = nil + var replicRun []running.InstanceCtx + + for _, inst := range rs.Instances { + run := inst.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + conn, err := connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err) + } + + res, err := conn.Eval( + "return (type(box.cfg) == 'function') or box.info.ro", + []any{}, connector.RequestOpts{}) + if err != nil || len(res) == 0 { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err) + } + + if !res[0].(bool) { + if masterRun != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s]: %s and %s are both masters", + rs.Alias, running.GetAppInstanceName(*masterRun), + fullInstanceName) + } + masterRun = &run + } else { + replicRun = append(replicRun, run) + } + } + if masterRun == nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s]: has not master instance", rs.Alias) + } + var conn connector.Connector + conn, err = connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: masterRun.ConsoleSocket, + }) + + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, + running.GetAppInstanceName(*masterRun), err) + } + masterUpgradeInfo, err := internalUpgrade(conn) + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, + running.GetAppInstanceName(*masterRun), err) + } + for _, run := range replicRun { + fullInstanceName := running.GetAppInstanceName(run) + conn, err = connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err) + } + err = WaitLSN(conn, masterUpgradeInfo, lsnTimeout) + if err != nil { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s]: LSN wait timeout: error waiting LSN %d "+ + "in vclock component %d on %s: time quota %d seconds "+ + "exceeded", rs.Alias, masterUpgradeInfo.LSN, + masterUpgradeInfo.IID, fullInstanceName, lsnTimeout) + } + res, err := conn.Eval("return box.snapshot()", []any{}, + connector.RequestOpts{}) + if err != nil || len(res) == 0 { + printReplicasetStatus(rs.Alias, "error") + return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err) + } + } + printReplicasetStatus(rs.Alias, "ok") + } + return nil +}