From dc16c994a6071c78ed989512004a746591c4f505 Mon Sep 17 00:00:00 2001 From: Maksim Tiushev Date: Fri, 4 Oct 2024 09:45:41 +0000 Subject: [PATCH] tt replicaset: add subcommand `upgrade` Part of tarantool#924 @TarantoolBot document Title: `tt replicaset upgrade` upgrades database schema. The `tt replicaset upgrade` command allows for a automate upgrade of each replicaset in a Tarantool cluster. The process is performed sequentially on the master instance and its replicas to ensure data consistency. Below are the steps involved: For Each Replicaset: - **On the Master Instance**: 1. Run the following commands in sequence to upgrade the schema and take a snapshot: ```lua box.schema.upgrade() box.snapshot() ``` - **On Each Replica**: 1. Wait for the replica to apply all transactions produced by the `box.schema.upgrade()` command executed on the master. This is done by monitoring the vector clocks (vclock) to ensure synchronization. 2. Once the repica has caught up, run the following command to take a snapshot: ```lua box.snapshot() ``` > **Error Handling**: If any errors occur during the upgrade process, the operation will halt, and an error report will be generated. --- - Timeout for Synchronization Replicas will wait for synchronization for a maximum of `Timeout` seconds. The default timeout is set to 5 seconds, but this can be adjusted manually using the `--timeout` option. **Example:** ```bash $ tt replicaset upgrade [] --timeout 10 ``` - Selecting Replicasets for Upgrade You can specify which replicaset(s) to upgrade by using the `--replicaset` or `-r` option to target specific replicaset names. **Example:** ```bash $ tt replicaset upgrade [ | ] --replicaset -r ... ``` This provides flexibility in upgrading only the desired parts of the cluster without affecting the entire system. --- cli/cmd/replicaset.go | 49 +++++- cli/replicaset/cmd/lua/upgrade.lua | 11 ++ cli/replicaset/cmd/status.go | 46 +++--- cli/replicaset/cmd/upgrade.go | 242 +++++++++++++++++++++++++++++ 4 files changed, 325 insertions(+), 23 deletions(-) create mode 100644 cli/replicaset/cmd/lua/upgrade.lua create mode 100644 cli/replicaset/cmd/upgrade.go diff --git a/cli/cmd/replicaset.go b/cli/cmd/replicaset.go index 5068def08..49b0c5764 100644 --- a/cli/cmd/replicaset.go +++ b/cli/cmd/replicaset.go @@ -51,6 +51,35 @@ var ( " To specify relative path without `unix://` use `./`." ) +// newUpgradeCmd creates a "replicaset upgrade" command. +func newUpgradeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "upgrade ( | ) [flags]\n\n" + + replicasetUriHelp, + DisableFlagsInUseLine: true, + Short: "Upgrade tarantool cluster", + Long: "Upgrade tarantool cluster.\n\n" + + libconnect.EnvCredentialsHelp + "\n\n", + Run: func(cmd *cobra.Command, args []string) { + cmdCtx.CommandName = cmd.Name() + err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, + internalReplicasetUpgradeModule, args) + util.HandleCmdErr(cmd, err) + }, + Args: cobra.ExactArgs(1), + } + + cmd.Flags().StringArrayVarP(&replicasetcmd.ChosenReplicasetAliases, "replicaset", "r", + []string{}, "specify the replicaset name(s) to upgrade") + + cmd.Flags().IntVarP(&replicasetcmd.LsnTimeout, "timeout", "t", 5, + "timeout for waiting the LSN synchronization (in seconds)") + + addOrchestratorFlags(cmd) + addTarantoolConnectFlags(cmd) + return cmd +} + // newStatusCmd creates a "replicaset status" command. func newStatusCmd() *cobra.Command { cmd := &cobra.Command{ @@ -340,6 +369,7 @@ func NewReplicasetCmd() *cobra.Command { Aliases: []string{"rs"}, } + cmd.AddCommand(newUpgradeCmd()) cmd.AddCommand(newStatusCmd()) cmd.AddCommand(newPromoteCmd()) cmd.AddCommand(newDemoteCmd()) @@ -489,6 +519,23 @@ func replicasetFillCtx(cmdCtx *cmdcontext.CmdCtx, ctx *replicasetCtx, args []str return nil } +// internalReplicasetUpgradeModule is a "upgrade" command for the replicaset module. +func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { + var ctx replicasetCtx + if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil { + return err + } + if ctx.IsInstanceConnect { + defer ctx.Conn.Close() + } + return replicasetcmd.Upgrade(replicasetcmd.DiscoveryCtx{ + IsApplication: ctx.IsApplication, + RunningCtx: ctx.RunningCtx, + Conn: ctx.Conn, + Orchestrator: ctx.Orchestrator, + }) +} + // internalReplicasetPromoteModule is a "promote" command for the replicaset module. func internalReplicasetPromoteModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { var ctx replicasetCtx @@ -560,7 +607,7 @@ func internalReplicasetStatusModule(cmdCtx *cmdcontext.CmdCtx, args []string) er if ctx.IsInstanceConnect { defer ctx.Conn.Close() } - return replicasetcmd.Status(replicasetcmd.StatusCtx{ + return replicasetcmd.Status(replicasetcmd.DiscoveryCtx{ IsApplication: ctx.IsApplication, RunningCtx: ctx.RunningCtx, Conn: ctx.Conn, diff --git a/cli/replicaset/cmd/lua/upgrade.lua b/cli/replicaset/cmd/lua/upgrade.lua new file mode 100644 index 000000000..d755e4c1d --- /dev/null +++ b/cli/replicaset/cmd/lua/upgrade.lua @@ -0,0 +1,11 @@ +local ok, err +ok, err = pcall(box.schema.upgrade) +if ok then + ok, err = pcall(box.snapshot) +end + +return { + lsn = box.info.lsn, + iid = box.info.id, + err = (not ok) and tostring(err) or nil, +} \ No newline at end of file diff --git a/cli/replicaset/cmd/status.go b/cli/replicaset/cmd/status.go index 99a8c41c1..687843fc6 100644 --- a/cli/replicaset/cmd/status.go +++ b/cli/replicaset/cmd/status.go @@ -10,41 +10,43 @@ import ( "github.com/tarantool/tt/cli/running" ) -// StatusCtx contains information about replicaset status command execution -// context. -type StatusCtx struct { +// DiscoveryCtx contains information about replicaset discovery. +type DiscoveryCtx struct { // IsApplication true if an application passed. IsApplication bool // RunningCtx is an application running context. RunningCtx running.RunningCtx // Conn is an active connection to a passed instance. Conn connector.Connector - // Orchestrator is a forced orchestator choice. + // Orchestrator is a forced orchestrator choice. Orchestrator replicaset.Orchestrator } -// Status shows a replicaset status. -func Status(statusCtx StatusCtx) error { - orchestratorType, err := getOrchestratorType(statusCtx.Orchestrator, - statusCtx.Conn, statusCtx.RunningCtx) +// GetReplicasets discovers and returns the list of replicasets. +func GetReplicasets(ctx DiscoveryCtx) (replicaset.Replicasets, error) { + orchestratorType, err := getOrchestratorType(ctx.Orchestrator, ctx.Conn, ctx.RunningCtx) if err != nil { - return err + return replicaset.Replicasets{}, err } var orchestrator replicasetOrchestrator - if statusCtx.IsApplication { - if orchestrator, err = makeApplicationOrchestrator( - orchestratorType, statusCtx.RunningCtx, nil, nil); err != nil { - return err - } + if ctx.IsApplication { + orchestrator, err = makeApplicationOrchestrator(orchestratorType, + ctx.RunningCtx, nil, nil) } else { - if orchestrator, err = makeInstanceOrchestrator( - orchestratorType, statusCtx.Conn); err != nil { - return err - } + orchestrator, err = makeInstanceOrchestrator(orchestratorType, ctx.Conn) + } + + if err != nil { + return replicaset.Replicasets{}, err } - replicasets, err := orchestrator.Discovery(replicaset.SkipCache) + return orchestrator.Discovery(replicaset.SkipCache) +} + +// Status shows a replicaset status. +func Status(discoveryCtx DiscoveryCtx) error { + replicasets, err := GetReplicasets(discoveryCtx) if err != nil { return err } @@ -61,7 +63,7 @@ func statusReplicasets(replicasets replicaset.Replicasets) error { fmt.Println("Orchestrator: ", replicasets.Orchestrator) fmt.Println("Replicasets state:", replicasets.State) - replicasets = fillAliases(replicasets) + replicasets = FillAliases(replicasets) replicasets = sortAliases(replicasets) if len(replicasets.Replicasets) > 0 { @@ -73,9 +75,9 @@ func statusReplicasets(replicasets replicaset.Replicasets) error { return nil } -// fillAliases fills missed aliases with UUID. The case: Tarantool 1.10 without +// FillAliases fills missed aliases with UUID. The case: Tarantool 1.10 without // an orchestrator. -func fillAliases(replicasets replicaset.Replicasets) replicaset.Replicasets { +func FillAliases(replicasets replicaset.Replicasets) replicaset.Replicasets { for i := range replicasets.Replicasets { replicaset := &replicasets.Replicasets[i] if replicaset.Alias == "" { diff --git a/cli/replicaset/cmd/upgrade.go b/cli/replicaset/cmd/upgrade.go new file mode 100644 index 000000000..ef51b8fea --- /dev/null +++ b/cli/replicaset/cmd/upgrade.go @@ -0,0 +1,242 @@ +package replicasetcmd + +import ( + _ "embed" + "errors" + "fmt" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/tarantool/tt/cli/connector" + "github.com/tarantool/tt/cli/replicaset" + "github.com/tarantool/tt/cli/running" +) + +var ( + ChosenReplicasetAliases []string + LsnTimeout int +) + +type InstanceMeta struct { + run running.InstanceCtx + conn connector.Connector +} + +//go:embed lua/upgrade.lua +var upgradeMasterLua string + +type SyncInfo struct { + LSN uint64 `mapstructure:"lsn"` + IID uint32 `mapstructure:"iid"` + Err *string `mapstructure:"err"` +} + +// "FilterReplicasetsByAliases" filters the given replicaset list by chosen aliases +// and returns the allowed replicasets. If a non-existent alias is found, it returns an error. +func FilterReplicasetsByAliases(replicasets replicaset.Replicasets) ([]replicaset.Replicaset, + error) { + // If no aliases are provided, return all replicasets. + if len(ChosenReplicasetAliases) == 0 { + return replicasets.Replicasets, nil + } + + // Create a map for fast lookup of replicasets by alias + replicasetMap := make(map[string]replicaset.Replicaset) + for _, rs := range replicasets.Replicasets { + replicasetMap[rs.Alias] = rs + } + + var allowedReplicasets []replicaset.Replicaset + for _, alias := range ChosenReplicasetAliases { + rs, exists := replicasetMap[alias] + if !exists { + return nil, fmt.Errorf("replicaset with alias %q doesn't exist", alias) + } + allowedReplicasets = append(allowedReplicasets, rs) + } + + return allowedReplicasets, nil +} + +func Upgrade(discoveryCtx DiscoveryCtx) error { + replicasets, err := GetReplicasets(discoveryCtx) + if err != nil { + return err + } + + replicasets = FillAliases(replicasets) + replicasetsToUpgrade, err := FilterReplicasetsByAliases(replicasets) + if err != nil { + return err + } + + return internalUpgrade(replicasetsToUpgrade) +} + +func internalUpgrade(replicasets []replicaset.Replicaset) error { + for _, replicaset := range replicasets { + err := upgradeReplicaset(replicaset) + if err != nil { + fmt.Printf("• %s: error\n", replicaset.Alias) + return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err) + } + fmt.Printf("• %s: ok\n", replicaset.Alias) + } + return nil +} + +func getInstanceConnector(instance replicaset.Instance) (connector.Connector, error) { + run := instance.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + if fullInstanceName == "" { + fullInstanceName = instance.Alias + } + if fullInstanceName == "" { + fullInstanceName = "unknown" + } + + // Try to connect via unix socket + conn, err := connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + + if err != nil { + // try to connect via TCP [experimental] + conn, err = connector.Connect(connector.ConnectOpts{ + Network: "tcp", + Address: instance.URI, + Username: "client", // should be opt + Password: "secret", // should be opt + }) + if err != nil { + return nil, fmt.Errorf("instance %s failed to connect via both TCP "+ + "and UNIX socket [%s]: %w", fullInstanceName, instance.URI, err) + } + } + return conn, nil +} + +func СollectRWROInfo(replicaset replicaset.Replicaset, master **InstanceMeta, + replicas *[]InstanceMeta) error { + for _, instance := range replicaset.Instances { + run := instance.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + conn, err := getInstanceConnector(instance) + + if err != nil { + return err + } + + var isRW bool + if instance.Mode.String() != "unknown" { + isRW = instance.Mode.String() == "rw" + } else { + res, err := conn.Eval( + "return (type(box.cfg) == 'function') or box.info.ro", + []any{}, connector.RequestOpts{}) + if err != nil { + return fmt.Errorf("[%s]: %w", fullInstanceName, err) + } + isRW = !res[0].(bool) + } + + if isRW && *master != nil { + return fmt.Errorf("%s and %s are both masters", + running.GetAppInstanceName((*master).run), fullInstanceName) + } else if isRW { + *master = &InstanceMeta{run, conn} + } else { + *replicas = append(*replicas, InstanceMeta{run, conn}) + } + } + return nil +} + +func WaitLSN(conn connector.Connector, masterIID uint32, masterLSN uint64) error { + var lastError error + query := fmt.Sprintf("return box.info.vclock[%d]", masterIID) + + deadline := time.Now().Add(time.Duration(LsnTimeout) * time.Second) + for { + res, err := conn.Eval(query, []any{}, connector.RequestOpts{}) + if err != nil { + lastError = fmt.Errorf("failed to evaluate LSN query: %w", err) + } else if len(res) == 0 { + lastError = errors.New("empty result from LSN query") + } else { + var lsn uint64 + if err := mapstructure.Decode(res[0], &lsn); err != nil { + lastError = fmt.Errorf("failed to decode LSN: %w", err) + } else if lsn >= masterLSN { + return nil + } + } + + if time.Now().After(deadline) { + break + } + + time.Sleep(1 * time.Second) + } + + if lastError != nil { + return lastError + } + return errors.New("timeout reached before LSN synced") +} + +func upgradeReplicaset(replicaset replicaset.Replicaset) error { + var master *InstanceMeta = nil + replicas := []InstanceMeta{} + + err := СollectRWROInfo(replicaset, &master, &replicas) + if err != nil { + return err + } + + // upgrade master instance, collect LSN and IID from master instance + var upgradeInfo SyncInfo + fullMasterName := running.GetAppInstanceName(master.run) + res, err := master.conn.Eval(upgradeMasterLua, []any{}, connector.RequestOpts{}) + if err != nil { + return fmt.Errorf("failed to execute upgrade script on master instance - %s: %w", + fullMasterName, err) + } + + if err := mapstructure.Decode(res[0], &upgradeInfo); err != nil { + return fmt.Errorf("failed to decode response from master instance - %s: %w", + fullMasterName, err) + } + + if upgradeInfo.Err != nil { + return fmt.Errorf("master instance upgrade failed - %s: %w", + fullMasterName, err) + } + + // upgrade replica instances + masterLSN := upgradeInfo.LSN + masterIID := upgradeInfo.IID + + for _, replica := range replicas { + fullReplicaName := running.GetAppInstanceName(replica.run) + err := WaitLSN(replica.conn, masterIID, masterLSN) + if err != nil { + return fmt.Errorf("can't ensure that upgrade operations performed on %s "+ + "are replicated to %s to perform snapshotting on it: error "+ + "waiting LSN %d in vclock component %d: %w", + fullMasterName, fullReplicaName, + masterLSN, masterIID, err) + } + res, err := replica.conn.Eval("return box.snapshot()", []any{}, + connector.RequestOpts{}) + if err != nil { + return fmt.Errorf("failed to execute snapshot on replica: %w", err) + } + if len(res) == 0 { + return fmt.Errorf("snapshot command on %s returned an empty result, "+ + "'ok' - expected", fullReplicaName) + } + } + return nil +}