From f2406ee51e5db73128bd6f6824cf680144a54faa Mon Sep 17 00:00:00 2001 From: Maksim Tiushev Date: Tue, 3 Sep 2024 18:22:37 +0000 Subject: [PATCH] tt: add command `tt upgrade` No commit message yet. --- cli/cmd/root.go | 1 + cli/cmd/upgrade.go | 50 +++++++ cli/upgrade/lua/instance.lua | 16 ++ cli/upgrade/lua/upgrade.lua | 11 ++ cli/upgrade/upgrade.go | 279 +++++++++++++++++++++++++++++++++++ 5 files changed, 357 insertions(+) create mode 100644 cli/cmd/upgrade.go create mode 100644 cli/upgrade/lua/instance.lua create mode 100644 cli/upgrade/lua/upgrade.lua create mode 100644 cli/upgrade/upgrade.go diff --git a/cli/cmd/root.go b/cli/cmd/root.go index 1b69240cc..823f53f26 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -194,6 +194,7 @@ After that tt will be able to manage the application using 'replicaset_example' NewKillCmd(), NewLogCmd(), NewEnableCmd(), + NewUpgradeCmd(), ) if err := injectCmds(rootCmd); err != nil { panic(err.Error()) diff --git a/cli/cmd/upgrade.go b/cli/cmd/upgrade.go new file mode 100644 index 000000000..5fe9c5930 --- /dev/null +++ b/cli/cmd/upgrade.go @@ -0,0 +1,50 @@ +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/tarantool/tt/cli/cmd/internal" + "github.com/tarantool/tt/cli/cmdcontext" + "github.com/tarantool/tt/cli/modules" + "github.com/tarantool/tt/cli/running" + "github.com/tarantool/tt/cli/upgrade" + "github.com/tarantool/tt/cli/util" +) + +func NewUpgradeCmd() *cobra.Command { + var upgradeCmd = &cobra.Command{ + Use: "upgrade []", + Short: "upgrade tarantool", + Run: func(cmd *cobra.Command, args []string) { + cmdCtx.CommandName = cmd.Name() + err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, + upgradeInstance, args) + util.HandleCmdErr(cmd, err) + }, + ValidArgsFunction: func( + cmd *cobra.Command, + args []string, + toComplete string) ([]string, cobra.ShellCompDirective) { + return internal.ValidArgsFunction( + cliOpts, &cmdCtx, cmd, toComplete, + running.ExtractAppNames, + running.ExtractInstanceNames) + }, + } + upgradeCmd.Flags().IntVarP(&upgrade.TIMEOUT, "timeout", "t", 5, "LSN timeout") + + return upgradeCmd +} + +func upgradeInstance(cmdCtx *cmdcontext.CmdCtx, args []string) error { + if !isConfigExist(cmdCtx) { + return errNoConfig + } + + var runningCtx running.RunningCtx + if err := running.FillCtx(cliOpts, cmdCtx, &runningCtx, args); err != nil { + return err + } + + err := upgrade.Upgrade(runningCtx) + return err +} diff --git a/cli/upgrade/lua/instance.lua b/cli/upgrade/lua/instance.lua new file mode 100644 index 000000000..f6f6a4c22 --- /dev/null +++ b/cli/upgrade/lua/instance.lua @@ -0,0 +1,16 @@ +local uuid = box.info.uuid +local replica_uuids = {} + +local isRO = (type(box.cfg) == 'function') or box.info.ro + +for _, elem in ipairs(box.info.replication) do + if elem.uuid ~= uuid then + table.insert(replica_uuids, elem.uuid) + end +end + +return { + RO = isRO, + UUID = uuid, + ReplicUUIDs = replica_uuids, +} diff --git a/cli/upgrade/lua/upgrade.lua b/cli/upgrade/lua/upgrade.lua new file mode 100644 index 000000000..7c0ec5870 --- /dev/null +++ b/cli/upgrade/lua/upgrade.lua @@ -0,0 +1,11 @@ +local ok, err +ok, err = pcall(box.schema.upgrade) +if ok then + ok, err = pcall(box.snapshot) +end + +return { + lsn = box.info.lsn, + iid = box.info.id, + err = (ok == 'ok') and tostring(err) or nil, +} \ No newline at end of file diff --git a/cli/upgrade/upgrade.go b/cli/upgrade/upgrade.go new file mode 100644 index 000000000..4e4497999 --- /dev/null +++ b/cli/upgrade/upgrade.go @@ -0,0 +1,279 @@ +package upgrade + +import ( + _ "embed" + "errors" + "fmt" + "os" + "strings" + "time" + + "github.com/jedib0t/go-pretty/v6/table" + "github.com/mitchellh/mapstructure" + "github.com/tarantool/tt/cli/connector" + "github.com/tarantool/tt/cli/running" +) + +var TIMEOUT int + +//go:embed lua/instance.lua +var instanceInfoLuaScript string + +//go:embed lua/upgrade.lua +var upgradeLuaScript string + +type InstanceRawInfo struct { + RO bool `mapstructure:"RO"` + UUID string `mapstructure:"UUID"` + ReplicUUIDs []string `mapstructure:"ReplicUUIDs"` +} + +type SyncInfo struct { + LSN uint32 `mapstructure:"lsn"` + IID uint32 `mapstructure:"iid"` + Err *string `mapstructure:"err"` +} + +type InstanceInfo struct { + Run running.InstanceCtx + ReplicUUIDs []string +} + +func InternalUpgrade(conn connector.Connector) (SyncInfo, error) { + var upgradeInfo SyncInfo + res, err := conn.Eval(upgradeLuaScript, []any{}, connector.RequestOpts{}) + if err != nil { + return upgradeInfo, err + } + if err := mapstructure.Decode(res[0], &upgradeInfo); err != nil { + return upgradeInfo, err + } + if upgradeInfo.Err != nil { + return upgradeInfo, errors.New(*upgradeInfo.Err) + } + return upgradeInfo, nil +} + +func SplitInsances(runningCtx running.RunningCtx, masters *map[string]InstanceInfo, + replics *map[string]InstanceInfo) error { + + count := 0 + + for _, run := range runningCtx.Instances { + fullInstanceName := running.GetAppInstanceName(run) + conn, err := connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + if err != nil { + return fmt.Errorf("[%s]: %w", fullInstanceName, err) + } + + var instanceInfo InstanceRawInfo + res, err := conn.Eval(instanceInfoLuaScript, []any{}, connector.RequestOpts{}) + if err != nil { + return fmt.Errorf("[%s]: %w", fullInstanceName, err) + } + + if err := mapstructure.Decode(res[0], &instanceInfo); err != nil { + return fmt.Errorf("[%s]: %w", fullInstanceName, err) + } + instance := InstanceInfo{ + Run: run, + ReplicUUIDs: instanceInfo.ReplicUUIDs, + } + if instanceInfo.RO { + (*replics)[instanceInfo.UUID] = instance + } else { + for _, uuid := range instanceInfo.ReplicUUIDs { + if m, exist := (*masters)[uuid]; exist { + return fmt.Errorf("instances %q and %q are both masters "+ + "in the same replicaset", + fullInstanceName, + running.GetAppInstanceName(m.Run)) + } + } + (*masters)[instanceInfo.UUID] = instance + count += len(instanceInfo.ReplicUUIDs) + 1 + } + } + + var err error + if count != len(runningCtx.Instances) { + replicasetIndex := 1 + usedReplicas := make(map[string]bool) + + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.SetStyle(table.StyleRounded) + + t.AppendHeader(table.Row{"Replicaset", "Instance", "Role"}) + var instanceErrors []error + + for masterUUID, masterInfo := range *masters { + masterName := running.GetAppInstanceName(masterInfo.Run) + t.AppendRow([]interface{}{fmt.Sprintf("Replicaset %d", replicasetIndex), + masterName, "master"}) + usedReplicas[masterUUID] = true + for _, replicaUUID := range masterInfo.ReplicUUIDs { + if replic, exists := (*replics)[replicaUUID]; exists { + replicaName := running.GetAppInstanceName(replic.Run) + t.AppendRow([]interface{}{"", replicaName, "replica"}) + usedReplicas[replicaUUID] = true + } else { + t.AppendRow([]interface{}{"", replicaUUID, "miss"}) + err = fmt.Errorf(`"Replicaset %d" contains multiple `+ + `instances. Upgrade requires all instances of `+ + `replicaset to be included`, replicasetIndex) + instanceErrors = append(instanceErrors, err) + } + } + t.AppendSeparator() + replicasetIndex++ + } + + t.AppendSeparator() + for replicaUUID, replic := range *replics { + if usedReplicas[replicaUUID] { + continue + } + replicaName := running.GetAppInstanceName(replic.Run) + t.AppendRow([]interface{}{fmt.Sprintf("Replicaset %d", replicasetIndex), + replicaName, "replica"}) + usedReplicas[replicaUUID] = true + missed := false + for _, otherReplicUUID := range replic.ReplicUUIDs { + if usedReplicas[otherReplicUUID] { + continue + } + if replic, exists := (*replics)[otherReplicUUID]; exists { + otherReplicaName := running.GetAppInstanceName(replic.Run) + t.AppendRow([]interface{}{"", otherReplicaName, "replica"}) + usedReplicas[otherReplicUUID] = true + } else { + missed = true + t.AppendRow([]interface{}{"", otherReplicUUID, "miss"}) + err = fmt.Errorf(`"Replicaset %d" contains multiple `+ + `instances. Upgrade requires all instances of `+ + `replicaset to be included`, replicasetIndex) + instanceErrors = append(instanceErrors, err) + } + } + if !missed { + err = fmt.Errorf(`"Replicaset %d" has not master instance`, + replicasetIndex) + instanceErrors = append(instanceErrors, err) + } + t.AppendSeparator() + replicasetIndex++ + } + + t.Render() + return errors.New(strings.Join(func() []string { + var errorMessages []string + for _, err := range instanceErrors { + errorMessages = append(errorMessages, err.Error()) + } + return errorMessages + }(), "\n\t")) + } + return nil +} + +func Upgrade(runningCtx running.RunningCtx) error { + var masterInstances = make(map[string]InstanceInfo) + var replicInstances = make(map[string]InstanceInfo) + + var err error + err = SplitInsances(runningCtx, &masterInstances, &replicInstances) + if err != nil { + return err + } + + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.SetStyle(table.StyleRounded) + t.AppendHeader(table.Row{"Replicaset", "Instance", "Role", "Upgrade-status"}) + + replicasetIndex := 1 + for masterUUID, masterInfo := range masterInstances { + run := masterInfo.Run + fullMasterInstanceName := running.GetAppInstanceName(run) + var conn connector.Connector + conn, err = connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + if err != nil { + err = fmt.Errorf("[%s]: %s", fullMasterInstanceName, err) + break + } + // upgrade master + var masterUpgradeInfo SyncInfo + masterUpgradeInfo, err = InternalUpgrade(conn) + if err != nil { + err = fmt.Errorf("[%s]: %s", fullMasterInstanceName, err) + break + } + t.AppendRow([]interface{}{fmt.Sprintf("Replicaset %d", replicasetIndex), + fullMasterInstanceName, "master", "ok"}) + + for _, replicaUUID := range masterInfo.ReplicUUIDs { + replRun := replicInstances[replicaUUID].Run + fullReplicInstanceName := running.GetAppInstanceName(replRun) + conn, err = connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: replRun.ConsoleSocket, + }) + if err != nil { + err = fmt.Errorf("[%s]: %s", fullReplicInstanceName, err) + break + } + ok := false + for i := 0; i < TIMEOUT; i++ { + var res []interface{} + res, err = conn.Eval( + fmt.Sprintf("return box.info.vclock[%d]", + masterUpgradeInfo.IID), + []any{}, connector.RequestOpts{}) + if err != nil { + break + } + var lsn uint32 + switch v := res[0].(type) { + case uint16: + lsn = uint32(v) + case uint32: + lsn = v + } + if lsn >= masterUpgradeInfo.LSN { + ok = true + break + } + time.Sleep(time.Second) + } + if !ok { + t.AppendRow([]interface{}{"", fullReplicInstanceName, + "replica", "timeout"}) + err = fmt.Errorf(`[%s]: LSN wait timeout (From: (%s, %s) `+ + `waiting for %d seconds)`, + fullReplicInstanceName, fullMasterInstanceName, + masterUUID, TIMEOUT) + break + } + _, err = InternalUpgrade(conn) + if err != nil { + err = fmt.Errorf("[%s]: %s", fullReplicInstanceName, err) + break + } + t.AppendRow([]interface{}{"", fullReplicInstanceName, "replica", "ok"}) + } + t.AppendSeparator() + if err != nil { + break + } + replicasetIndex++ + } + t.Render() + return err +}