diff --git a/cli/cmd/replicaset.go b/cli/cmd/replicaset.go index 5068def08..49b0c5764 100644 --- a/cli/cmd/replicaset.go +++ b/cli/cmd/replicaset.go @@ -51,6 +51,35 @@ var ( " To specify relative path without `unix://` use `./`." ) +// newUpgradeCmd creates a "replicaset upgrade" command. +func newUpgradeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "upgrade ( | ) [flags]\n\n" + + replicasetUriHelp, + DisableFlagsInUseLine: true, + Short: "Upgrade tarantool cluster", + Long: "Upgrade tarantool cluster.\n\n" + + libconnect.EnvCredentialsHelp + "\n\n", + Run: func(cmd *cobra.Command, args []string) { + cmdCtx.CommandName = cmd.Name() + err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, + internalReplicasetUpgradeModule, args) + util.HandleCmdErr(cmd, err) + }, + Args: cobra.ExactArgs(1), + } + + cmd.Flags().StringArrayVarP(&replicasetcmd.ChosenReplicasetAliases, "replicaset", "r", + []string{}, "specify the replicaset name(s) to upgrade") + + cmd.Flags().IntVarP(&replicasetcmd.LsnTimeout, "timeout", "t", 5, + "timeout for waiting the LSN synchronization (in seconds)") + + addOrchestratorFlags(cmd) + addTarantoolConnectFlags(cmd) + return cmd +} + // newStatusCmd creates a "replicaset status" command. func newStatusCmd() *cobra.Command { cmd := &cobra.Command{ @@ -340,6 +369,7 @@ func NewReplicasetCmd() *cobra.Command { Aliases: []string{"rs"}, } + cmd.AddCommand(newUpgradeCmd()) cmd.AddCommand(newStatusCmd()) cmd.AddCommand(newPromoteCmd()) cmd.AddCommand(newDemoteCmd()) @@ -489,6 +519,23 @@ func replicasetFillCtx(cmdCtx *cmdcontext.CmdCtx, ctx *replicasetCtx, args []str return nil } +// internalReplicasetUpgradeModule is a "upgrade" command for the replicaset module. +func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { + var ctx replicasetCtx + if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil { + return err + } + if ctx.IsInstanceConnect { + defer ctx.Conn.Close() + } + return replicasetcmd.Upgrade(replicasetcmd.DiscoveryCtx{ + IsApplication: ctx.IsApplication, + RunningCtx: ctx.RunningCtx, + Conn: ctx.Conn, + Orchestrator: ctx.Orchestrator, + }) +} + // internalReplicasetPromoteModule is a "promote" command for the replicaset module. func internalReplicasetPromoteModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { var ctx replicasetCtx @@ -560,7 +607,7 @@ func internalReplicasetStatusModule(cmdCtx *cmdcontext.CmdCtx, args []string) er if ctx.IsInstanceConnect { defer ctx.Conn.Close() } - return replicasetcmd.Status(replicasetcmd.StatusCtx{ + return replicasetcmd.Status(replicasetcmd.DiscoveryCtx{ IsApplication: ctx.IsApplication, RunningCtx: ctx.RunningCtx, Conn: ctx.Conn, diff --git a/cli/replicaset/cmd/lua/upgrade.lua b/cli/replicaset/cmd/lua/upgrade.lua new file mode 100644 index 000000000..d755e4c1d --- /dev/null +++ b/cli/replicaset/cmd/lua/upgrade.lua @@ -0,0 +1,11 @@ +local ok, err +ok, err = pcall(box.schema.upgrade) +if ok then + ok, err = pcall(box.snapshot) +end + +return { + lsn = box.info.lsn, + iid = box.info.id, + err = (not ok) and tostring(err) or nil, +} \ No newline at end of file diff --git a/cli/replicaset/cmd/status.go b/cli/replicaset/cmd/status.go index 99a8c41c1..687843fc6 100644 --- a/cli/replicaset/cmd/status.go +++ b/cli/replicaset/cmd/status.go @@ -10,41 +10,43 @@ import ( "github.com/tarantool/tt/cli/running" ) -// StatusCtx contains information about replicaset status command execution -// context. -type StatusCtx struct { +// DiscoveryCtx contains information about replicaset discovery. +type DiscoveryCtx struct { // IsApplication true if an application passed. IsApplication bool // RunningCtx is an application running context. RunningCtx running.RunningCtx // Conn is an active connection to a passed instance. Conn connector.Connector - // Orchestrator is a forced orchestator choice. + // Orchestrator is a forced orchestrator choice. Orchestrator replicaset.Orchestrator } -// Status shows a replicaset status. -func Status(statusCtx StatusCtx) error { - orchestratorType, err := getOrchestratorType(statusCtx.Orchestrator, - statusCtx.Conn, statusCtx.RunningCtx) +// GetReplicasets discovers and returns the list of replicasets. +func GetReplicasets(ctx DiscoveryCtx) (replicaset.Replicasets, error) { + orchestratorType, err := getOrchestratorType(ctx.Orchestrator, ctx.Conn, ctx.RunningCtx) if err != nil { - return err + return replicaset.Replicasets{}, err } var orchestrator replicasetOrchestrator - if statusCtx.IsApplication { - if orchestrator, err = makeApplicationOrchestrator( - orchestratorType, statusCtx.RunningCtx, nil, nil); err != nil { - return err - } + if ctx.IsApplication { + orchestrator, err = makeApplicationOrchestrator(orchestratorType, + ctx.RunningCtx, nil, nil) } else { - if orchestrator, err = makeInstanceOrchestrator( - orchestratorType, statusCtx.Conn); err != nil { - return err - } + orchestrator, err = makeInstanceOrchestrator(orchestratorType, ctx.Conn) + } + + if err != nil { + return replicaset.Replicasets{}, err } - replicasets, err := orchestrator.Discovery(replicaset.SkipCache) + return orchestrator.Discovery(replicaset.SkipCache) +} + +// Status shows a replicaset status. +func Status(discoveryCtx DiscoveryCtx) error { + replicasets, err := GetReplicasets(discoveryCtx) if err != nil { return err } @@ -61,7 +63,7 @@ func statusReplicasets(replicasets replicaset.Replicasets) error { fmt.Println("Orchestrator: ", replicasets.Orchestrator) fmt.Println("Replicasets state:", replicasets.State) - replicasets = fillAliases(replicasets) + replicasets = FillAliases(replicasets) replicasets = sortAliases(replicasets) if len(replicasets.Replicasets) > 0 { @@ -73,9 +75,9 @@ func statusReplicasets(replicasets replicaset.Replicasets) error { return nil } -// fillAliases fills missed aliases with UUID. The case: Tarantool 1.10 without +// FillAliases fills missed aliases with UUID. The case: Tarantool 1.10 without // an orchestrator. -func fillAliases(replicasets replicaset.Replicasets) replicaset.Replicasets { +func FillAliases(replicasets replicaset.Replicasets) replicaset.Replicasets { for i := range replicasets.Replicasets { replicaset := &replicasets.Replicasets[i] if replicaset.Alias == "" { diff --git a/cli/replicaset/cmd/upgrade.go b/cli/replicaset/cmd/upgrade.go new file mode 100644 index 000000000..a3a96b37a --- /dev/null +++ b/cli/replicaset/cmd/upgrade.go @@ -0,0 +1,279 @@ +package replicasetcmd + +import ( + _ "embed" + "errors" + "fmt" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/tarantool/tt/cli/connector" + "github.com/tarantool/tt/cli/replicaset" + "github.com/tarantool/tt/cli/running" +) + +var ( + ChosenReplicasetAliases []string + LsnTimeout int +) + +type InstanceMeta struct { + run running.InstanceCtx + conn connector.Connector +} + +//go:embed lua/upgrade.lua +var upgradeMasterLua string + +type SyncInfo struct { + LSN uint64 `mapstructure:"lsn"` + IID uint32 `mapstructure:"iid"` + Err *string `mapstructure:"err"` +} + +// "FilterReplicasetsByAliases" filters the given replicaset list by chosen aliases +// and returns the chosen replicasets. If a non-existent alias is found, it returns an error. +func FilterReplicasetsByAliases(replicasets replicaset.Replicasets) ([]replicaset.Replicaset, + error) { + // If no aliases are provided, return all replicasets. + if len(ChosenReplicasetAliases) == 0 { + return replicasets.Replicasets, nil + } + + // Create a map for fast lookup of replicasets by alias + replicasetMap := make(map[string]replicaset.Replicaset) + for _, rs := range replicasets.Replicasets { + replicasetMap[rs.Alias] = rs + } + + var chosenReplicasets []replicaset.Replicaset + for _, alias := range ChosenReplicasetAliases { + rs, exists := replicasetMap[alias] + if !exists { + return nil, fmt.Errorf("replicaset with alias %q doesn't exist", alias) + } + chosenReplicasets = append(chosenReplicasets, rs) + } + + return chosenReplicasets, nil +} + +func Upgrade(discoveryCtx DiscoveryCtx) error { + replicasets, err := GetReplicasets(discoveryCtx) + if err != nil { + // This may be a single-instance application without Tarantool-3 config + // or instances.yml file. + if len(discoveryCtx.RunningCtx.Instances) == 1 { + // Create a dummy replicaset + var replicasetList []replicaset.Replicaset + var dummyReplicaset replicaset.Replicaset + var instance replicaset.Instance + + instance.InstanceCtx = discoveryCtx.RunningCtx.Instances[0] + instance.Alias = running.GetAppInstanceName(instance.InstanceCtx) + instance.InstanceCtxFound = true + + dummyReplicaset.Alias = instance.Alias + dummyReplicaset.Instances = append(dummyReplicaset.Instances, instance) + replicasetList = append(replicasetList, dummyReplicaset) + + return internalUpgrade(replicasetList) + } + return err + } + + replicasets = FillAliases(replicasets) + replicasetsToUpgrade, err := FilterReplicasetsByAliases(replicasets) + if err != nil { + return err + } + + return internalUpgrade(replicasetsToUpgrade) +} + +func internalUpgrade(replicasets []replicaset.Replicaset) error { + for _, replicaset := range replicasets { + err := upgradeReplicaset(replicaset) + if err != nil { + fmt.Printf("• %s: error\n", replicaset.Alias) + return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err) + } + fmt.Printf("• %s: ok\n", replicaset.Alias) + } + return nil +} + +func getInstanceConnector(instance replicaset.Instance) (connector.Connector, error) { + run := instance.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + if fullInstanceName == "" { + fullInstanceName = instance.Alias + } + if fullInstanceName == "" { + fullInstanceName = "unknown" + } + + // Try to connect via unix socket + conn, err := connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + + if err != nil { + // try to connect via TCP [experimental] + conn, err = connector.Connect(connector.ConnectOpts{ + Network: "tcp", + Address: instance.URI, + Username: "client", // should be opt + Password: "secret", // should be opt + }) + if err != nil { + return nil, fmt.Errorf("instance %s failed to connect via both TCP "+ + "and UNIX socket [%s]: %w", fullInstanceName, instance.URI, err) + } + } + return conn, nil +} + +func СollectRWROInfo(replicaset replicaset.Replicaset, master **InstanceMeta, + replicas *[]InstanceMeta) error { + for _, instance := range replicaset.Instances { + run := instance.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + conn, err := getInstanceConnector(instance) + + if err != nil { + return err + } + + var isRW bool + if instance.Mode.String() != "unknown" { + isRW = instance.Mode.String() == "rw" + } else { + res, err := conn.Eval( + "return (type(box.cfg) == 'function') or box.info.ro", + []any{}, connector.RequestOpts{}) + if err != nil { + return fmt.Errorf("[%s]: %w", fullInstanceName, err) + } + isRW = !res[0].(bool) + } + + if isRW && *master != nil { + return fmt.Errorf("%s and %s are both masters", + running.GetAppInstanceName((*master).run), fullInstanceName) + } else if isRW { + *master = &InstanceMeta{run, conn} + } else { + *replicas = append(*replicas, InstanceMeta{run, conn}) + } + } + return nil +} + +func WaitLSN(conn connector.Connector, masterIID uint32, masterLSN uint64) error { + var lastError error + query := fmt.Sprintf("return box.info.vclock[%d]", masterIID) + + deadline := time.Now().Add(time.Duration(LsnTimeout) * time.Second) + for { + res, err := conn.Eval(query, []any{}, connector.RequestOpts{}) + if err != nil { + lastError = fmt.Errorf("failed to evaluate LSN query: %w", err) + } else if len(res) == 0 { + lastError = errors.New("empty result from LSN query") + } else { + var lsn uint64 + if err := mapstructure.Decode(res[0], &lsn); err != nil { + lastError = fmt.Errorf("failed to decode LSN: %w", err) + } else if lsn >= masterLSN { + return nil + } else { + lastError = fmt.Errorf("current LSN %d is behind required "+ + "master LSN %d", lsn, masterLSN) + } + } + + if time.Now().After(deadline) { + break + } + + time.Sleep(1 * time.Second) + } + + return lastError +} + +func upgradeMaster(master *InstanceMeta) (SyncInfo, error) { + var upgradeInfo SyncInfo + fullMasterName := running.GetAppInstanceName(master.run) + res, err := master.conn.Eval(upgradeMasterLua, []any{}, connector.RequestOpts{}) + if err != nil { + return upgradeInfo, fmt.Errorf( + "failed to execute upgrade script on master instance - %s: %w", + fullMasterName, err) + } + + if err := mapstructure.Decode(res[0], &upgradeInfo); err != nil { + return upgradeInfo, fmt.Errorf( + "failed to decode response from master instance - %s: %w", + fullMasterName, err) + } + + if upgradeInfo.Err != nil { + return upgradeInfo, fmt.Errorf( + "master instance upgrade failed - %s: %w", + fullMasterName, err) + } + return upgradeInfo, nil +} + +func Snapshot(instance *InstanceMeta) error { + res, err := instance.conn.Eval("return box.snapshot()", []any{}, + connector.RequestOpts{}) + if err != nil { + return fmt.Errorf("failed to execute snapshot on replica: %w", err) + } + if len(res) == 0 { + return fmt.Errorf("snapshot command on %s returned an empty result, "+ + "'ok' - expected", running.GetAppInstanceName(instance.run)) + } + return nil +} + +func upgradeReplicaset(replicaset replicaset.Replicaset) error { + var master *InstanceMeta = nil + replicas := []InstanceMeta{} + + err := СollectRWROInfo(replicaset, &master, &replicas) + if err != nil { + return err + } + + // upgrade master instance, collect LSN and IID from master instance + upgradeInfo, err := upgradeMaster(master) + if err != nil { + return err + } + + // upgrade replica instances + masterLSN := upgradeInfo.LSN + masterIID := upgradeInfo.IID + + for _, replica := range replicas { + fullReplicaName := running.GetAppInstanceName(replica.run) + err := WaitLSN(replica.conn, masterIID, masterLSN) + if err != nil { + return fmt.Errorf("can't ensure that upgrade operations performed on %s "+ + "are replicated to %s to perform snapshotting on it: error "+ + "waiting LSN %d in vclock component %d: %w", + running.GetAppInstanceName(master.run), fullReplicaName, + masterLSN, masterIID, err) + } + err = Snapshot(&replica) + if err != nil { + return err + } + } + return nil +} diff --git a/test/integration/replicaset/single-t2-app/00000000000000000004.snap b/test/integration/replicaset/single-t2-app/00000000000000000004.snap new file mode 100644 index 000000000..80e614a6e Binary files /dev/null and b/test/integration/replicaset/single-t2-app/00000000000000000004.snap differ diff --git a/test/integration/replicaset/single-t2-app/init.lua b/test/integration/replicaset/single-t2-app/init.lua new file mode 100644 index 000000000..8d5d71d86 --- /dev/null +++ b/test/integration/replicaset/single-t2-app/init.lua @@ -0,0 +1,12 @@ +local fiber = require('fiber') +local fio = require('fio') + +box.cfg({}) + +fh = fio.open('ready', {'O_WRONLY', 'O_CREAT'}, tonumber('644',8)) +fh:close() + +while true do + fiber.sleep(5) +end + diff --git a/test/integration/replicaset/single-t2-app/tt.yaml b/test/integration/replicaset/single-t2-app/tt.yaml new file mode 100644 index 000000000..ee312214a --- /dev/null +++ b/test/integration/replicaset/single-t2-app/tt.yaml @@ -0,0 +1,9 @@ +env: + instances_enabled: . + +default: + app: + dir: . + file: init.lua + memtx_dir: ./ + wal_dir: ./ diff --git a/test/integration/replicaset/test_replicaset_upgrade.py b/test/integration/replicaset/test_replicaset_upgrade.py new file mode 100644 index 000000000..0b40ffd40 --- /dev/null +++ b/test/integration/replicaset/test_replicaset_upgrade.py @@ -0,0 +1,146 @@ +import os +import re +import shutil +import subprocess +import tempfile + +import pytest +from replicaset_helpers import stop_application + +from utils import get_tarantool_version, run_command_and_get_output, wait_file + +tarantool_major_version, _ = get_tarantool_version() + + +def run_command_on_instance(tt_cmd, tmpdir, full_inst_name, cmd): + con_cmd = [tt_cmd, "connect", full_inst_name, "-f", "-"] + instance_process = subprocess.Popen( + con_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + text=True, + ) + instance_process.stdin.writelines([cmd]) + instance_process.stdin.close() + output = instance_process.stdout.read() + return output + + +@pytest.mark.skipif( + tarantool_major_version < 3, reason="skip centralized config test for Tarantool < 3" +) +def test_upgrade_cluster(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + app_name = "test_ccluster_app" + app_path = os.path.join(tmpdir, app_name) + shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path) + + replicasets = [ + "replicaset-001", + "replicaset-002", + ] + + try: + # Start a cluster. + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=tmpdir) + assert rc == 0 + + for i in range(1, 6): + file = wait_file( + os.path.join(tmpdir, app_name), f"ready-instance-00{i}", [] + ) + assert file != "" + + _ = run_command_on_instance( + tt_cmd, tmpdir, "test_ccluster_app:instance-004", "box.cfg{read_only=true}" + ) + + status_cmd = [tt_cmd, "replicaset", "upgrade", app_name] + + rc, out = run_command_and_get_output(status_cmd, cwd=tmpdir) + assert rc == 0 + + upgrade_out = out.strip().split("\n") + assert len(upgrade_out) == len(replicasets) + + for i in range(len(replicasets)): + match = re.search(r"•\s*(.*?):\s*(.*)", upgrade_out[i]) + assert match.group(1) in replicasets + assert match.group(2) == "ok" + + finally: + stop_application(tt_cmd, app_name, tmpdir, []) + + +@pytest.mark.skipif( + tarantool_major_version < 3, reason="skip centralized config test for Tarantool < 3" +) +def test_upgrade_multi_master(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + app_name = "test_ccluster_app" + app_path = os.path.join(tmpdir, app_name) + shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path) + try: + # Start a cluster. + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=tmpdir) + assert rc == 0 + + for i in range(1, 6): + file = wait_file( + os.path.join(tmpdir, app_name), f"ready-instance-00{i}", [] + ) + assert file != "" + + status_cmd = [tt_cmd, "replicaset", "upgrade", app_name] + + rc, out = run_command_and_get_output(status_cmd, cwd=tmpdir) + assert rc == 1 + assert "replicaset-002: error" in out and "are both masters" in out + + finally: + stop_application(tt_cmd, app_name, tmpdir, []) + + +@pytest.mark.skipif( + tarantool_major_version < 3, reason="skip centralized config test for Tarantool < 3" +) +def test_upgrade_t2_app_dummy_replicaset(tt_cmd): + app_name = "single-t2-app" + test_app_path_src = os.path.join(os.path.dirname(__file__), app_name) + + # snapshot from tarantool 2.11.4 app + snapfile = os.path.join(test_app_path_src, "00000000000000000004.snap") + + with tempfile.TemporaryDirectory() as tmpdir: + test_app_path = os.path.join(tmpdir, app_name) + shutil.copytree(test_app_path_src, test_app_path) + memtx_dir = os.path.join(test_app_path, "var", "lib", app_name) + os.makedirs(memtx_dir, exist_ok=True) + shutil.copy(snapfile, memtx_dir) + + try: + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=test_app_path) + assert rc == 0 + + file = wait_file(test_app_path, "ready", []) + assert file != "" + + out = run_command_on_instance( + tt_cmd, + test_app_path, + app_name, + "return box.space.example_space:select{2}", + ) + assert "[2, 'Second record']" in out + + upgrade_cmd = [tt_cmd, "replicaset", "upgrade", app_name] + rc, out = run_command_and_get_output(upgrade_cmd, cwd=test_app_path) + assert rc == 0 + assert out == "• single-t2-app: ok\n" + finally: + stop_application(tt_cmd, app_name, test_app_path, [])