Skip to content

Commit

Permalink
tt: add command tt upgrade
Browse files Browse the repository at this point in the history
Part of tarantool#924

@TarantoolBot document
Title: `tt upgrade` upgrades database schema.

This patch adds `tt upgrade` command to automate the process of
upgrading the database schema across both master and replica instances.
The command follows the required steps in the correct order, ensuring
proper synchronization between instances, and minimizes the need for
manual intervention.

```
tt upgrade [<APP_NAME>] [flags]
```

`tt upgrade` command steps:

- For each replicaset:
  - On the master instance:
    1. Execute the following commands sequentially:
       ```
       box.schema.upgrade()
       box.snapshot()
       ```
  - On each replica:
    1. Wait until the replica applies all transactions produced by
       `box.schema.upgrade()` on the master by comparing the vector
       clocks (vclock).
    2. Once synchronization is confirmed, execute the following
       commands sequentially on the replica:
       ```
       box.schema.upgrade()
       box.snapshot()
       ```

> If any errors occur during the upgrade process, the process will
  stop and an error report will be generated.

There are flags supported by this command:
- `--replicaset (-r) stringArray`: specify the replicaset name(s) to
  upgrade;
- `--timeout (-t) int`: timeout for waiting the LSN synchronization
  in seconds (default 5);

Usage examples:
- Update all active replicasets managed by `tt`:
  ```
  $tt upgrade
  ```
- Update active replicasets of application <APP_NAME>:
  ```
  $tt upgrade <APP_NAME>
  ```
- Update the specified replicaset(s) of application <APP_NAME>:
  ```
  $tt upgrade <APP_NAME> -r <RS_NAME_1> -r <RS_NAME_2> ...
  ```
  • Loading branch information
mandesero committed Sep 11, 2024
1 parent 3e3e081 commit fe0f6a5
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 6 deletions.
1 change: 1 addition & 0 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ After that tt will be able to manage the application using 'replicaset_example'
NewKillCmd(),
NewLogCmd(),
NewEnableCmd(),
NewUpgradeCmd(),
)
if err := injectCmds(rootCmd); err != nil {
panic(err.Error())
Expand Down
83 changes: 83 additions & 0 deletions cli/cmd/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package cmd

import (
"github.com/spf13/cobra"
"github.com/tarantool/tt/cli/cmd/internal"
"github.com/tarantool/tt/cli/cmdcontext"
"github.com/tarantool/tt/cli/modules"
"github.com/tarantool/tt/cli/replicaset"
replicasetcmd "github.com/tarantool/tt/cli/replicaset/cmd"
"github.com/tarantool/tt/cli/running"
"github.com/tarantool/tt/cli/upgrade"
"github.com/tarantool/tt/cli/util"
)

var (
Timeout *int
pendingReplicasetAliases *[]string
)

func NewUpgradeCmd() *cobra.Command {
var upgradeCmd = &cobra.Command{
Use: "upgrade [<APP_NAME>]",
Short: "upgrade tarantool schema",
Example: `tt upgrade - Upgrade all active replicasets
tt upgrade <APP_NAME> - Upgrade replicasets of the specified app <APP_NAME>
tt upgrade <APP_NAME> -r <RS_NAME> - Upgrade specific replicaset <RS_NAME> of app <APP_NAME>`,
Run: func(cmd *cobra.Command, args []string) {
cmdCtx.CommandName = cmd.Name()
err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo,
upgradeReplicasets, args)
util.HandleCmdErr(cmd, err)
},
ValidArgsFunction: func(
cmd *cobra.Command,
args []string,
toComplete string) ([]string, cobra.ShellCompDirective) {
return internal.ValidArgsFunction(
cliOpts, &cmdCtx, cmd, toComplete,
running.ExtractAppNames,
running.ExtractInstanceNames)
},
}

pendingReplicasetAliases = upgradeCmd.Flags().StringArrayP("replicaset", "r",
[]string{}, "specify the replicaset name(s) to upgrade")

Timeout = upgradeCmd.Flags().IntP("timeout", "t", 5,
"timeout for waiting the LSN synchronization (in seconds)")

return upgradeCmd
}

func prepareReplicasets(cmdCtx *cmdcontext.CmdCtx, args []string) (replicaset.Replicasets, error) {
var ctx replicasetCtx
if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil {
return replicaset.Replicasets{}, err
}
if ctx.IsInstanceConnect {
defer ctx.Conn.Close()
}
statusCtx := replicasetcmd.StatusCtx{
IsApplication: ctx.IsApplication,
RunningCtx: ctx.RunningCtx,
Conn: ctx.Conn,
Orchestrator: ctx.Orchestrator,
}

replicasets, err := replicasetcmd.GetReplicasets(statusCtx)
return replicasets, err
}

func upgradeReplicasets(cmdCtx *cmdcontext.CmdCtx, args []string) error {
if !isConfigExist(cmdCtx) {
return errNoConfig
}

replicasets, err := prepareReplicasets(cmdCtx, args)
if err != nil {
return err
}

return upgrade.Upgrade(replicasets, *pendingReplicasetAliases, *Timeout)
}
21 changes: 15 additions & 6 deletions cli/replicaset/cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,37 @@ type StatusCtx struct {
Orchestrator replicaset.Orchestrator
}

// Status shows a replicaset status.
func Status(statusCtx StatusCtx) error {
func GetReplicasets(statusCtx StatusCtx) (replicaset.Replicasets, error) {
var replicasets replicaset.Replicasets
orchestratorType, err := getOrchestratorType(statusCtx.Orchestrator,
statusCtx.Conn, statusCtx.RunningCtx)
if err != nil {
return err
return replicasets, err
}

var orchestrator replicasetOrchestrator
if statusCtx.IsApplication {
if orchestrator, err = makeApplicationOrchestrator(
orchestratorType, statusCtx.RunningCtx, nil, nil); err != nil {
return err
return replicasets, err
}
} else {
if orchestrator, err = makeInstanceOrchestrator(
orchestratorType, statusCtx.Conn); err != nil {
return err
return replicasets, err
}
}

replicasets, err := orchestrator.Discovery(replicaset.SkipCache)
replicasets, err = orchestrator.Discovery(replicaset.SkipCache)
if err != nil {
return replicasets, err
}
return replicasets, err
}

// Status shows a replicaset status.
func Status(statusCtx StatusCtx) error {
replicasets, err := GetReplicasets(statusCtx)
if err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions cli/upgrade/lua/upgrade.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
local ok, err
ok, err = pcall(box.schema.upgrade)
if ok then
ok, err = pcall(box.snapshot)
end

return {
lsn = box.info.lsn,
iid = box.info.id,
err = (not ok) and tostring(err) or nil,
}
183 changes: 183 additions & 0 deletions cli/upgrade/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package upgrade

import (
_ "embed"
"errors"
"fmt"
"time"

"github.com/mitchellh/mapstructure"
"github.com/tarantool/tt/cli/connector"
"github.com/tarantool/tt/cli/replicaset"
"github.com/tarantool/tt/cli/running"
)

//go:embed lua/upgrade.lua
var upgradeLuaScript string

type SyncInfo struct {
LSN uint32 `mapstructure:"lsn"`
IID uint32 `mapstructure:"iid"`
Err *string `mapstructure:"err"`
}

func internalUpgrade(conn connector.Connector) (SyncInfo, error) {
var upgradeInfo SyncInfo
res, err := conn.Eval(upgradeLuaScript, []any{}, connector.RequestOpts{})
if err != nil {
return upgradeInfo, err
}
if err := mapstructure.Decode(res[0], &upgradeInfo); err != nil {
return upgradeInfo, err
}
if upgradeInfo.Err != nil {
return upgradeInfo, errors.New(*upgradeInfo.Err)
}
return upgradeInfo, nil
}

func WaitLSN(conn connector.Connector, masterUpgradeInfo SyncInfo, timeout int) error {
for i := 0; i < timeout; i++ {
var res []interface{}
res, err := conn.Eval(
fmt.Sprintf("return box.info.vclock[%d]", masterUpgradeInfo.IID),
[]any{}, connector.RequestOpts{})
if err != nil || len(res) == 0 {
return err
}
var lsn uint32
switch v := res[0].(type) {
case uint16:
lsn = uint32(v)
case uint32:
lsn = v
}
if lsn >= masterUpgradeInfo.LSN {
return nil
}
time.Sleep(time.Second)
}
return errors.New("")
}

func GetAllowedReplicasets(allReplicasets replicaset.Replicasets,
pendingReplicasetAliases []string) ([]replicaset.Replicaset, error) {
if len(pendingReplicasetAliases) == 0 {
return allReplicasets.Replicasets, nil
}

replicasetMap := make(map[string]replicaset.Replicaset)
for _, rs := range allReplicasets.Replicasets {
replicasetMap[rs.Alias] = rs
}

var allowedReplicasets []replicaset.Replicaset

for _, alias := range pendingReplicasetAliases {
replicaset, exists := replicasetMap[alias]
if !exists {
return nil, fmt.Errorf("replicaset with alias %q doesn't exist", alias)
}
allowedReplicasets = append(allowedReplicasets, replicaset)
}

return allowedReplicasets, nil
}

func Upgrade(replicasets replicaset.Replicasets, pendingReplicasetAliases []string,
lsnTimeout int) error {
allowedReplicasets, err := GetAllowedReplicasets(replicasets, pendingReplicasetAliases)
if err != nil {
return err
}

printReplicasetStatus := func(alias, status string) {
fmt.Printf("• %s: %s\n", alias, status)
}

for _, rs := range allowedReplicasets {
var masterRun *running.InstanceCtx = nil
var replicRun []running.InstanceCtx

for _, inst := range rs.Instances {
run := inst.InstanceCtx
fullInstanceName := running.GetAppInstanceName(run)
conn, err := connector.Connect(connector.ConnectOpts{
Network: "unix",
Address: run.ConsoleSocket,
})
if err != nil {
printReplicasetStatus(rs.Alias, "error")
return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err)
}

res, err := conn.Eval(
"return (type(box.cfg) == 'function') or box.info.ro",
[]any{}, connector.RequestOpts{})
if err != nil || len(res) == 0 {
printReplicasetStatus(rs.Alias, "error")
return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err)
}

if !res[0].(bool) {
if masterRun != nil {
printReplicasetStatus(rs.Alias, "error")
return fmt.Errorf("[%s]: %s and %s are both masters",
rs.Alias, running.GetAppInstanceName(*masterRun),
fullInstanceName)
}
masterRun = &run
} else {
replicRun = append(replicRun, run)
}
}
if masterRun == nil {
printReplicasetStatus(rs.Alias, "error")
return fmt.Errorf("[%s]: has not master instance", rs.Alias)
}
var conn connector.Connector
conn, err = connector.Connect(connector.ConnectOpts{
Network: "unix",
Address: masterRun.ConsoleSocket,
})

if err != nil {
printReplicasetStatus(rs.Alias, "error")
return fmt.Errorf("[%s][%s]: %s", rs.Alias,
running.GetAppInstanceName(*masterRun), err)
}
masterUpgradeInfo, err := internalUpgrade(conn)
if err != nil {
printReplicasetStatus(rs.Alias, "error")
return fmt.Errorf("[%s][%s]: %s", rs.Alias,
running.GetAppInstanceName(*masterRun), err)
}
for _, run := range replicRun {
fullInstanceName := running.GetAppInstanceName(run)
conn, err = connector.Connect(connector.ConnectOpts{
Network: "unix",
Address: run.ConsoleSocket,
})
if err != nil {
printReplicasetStatus(rs.Alias, "error")
return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err)
}
err = WaitLSN(conn, masterUpgradeInfo, lsnTimeout)
if err != nil {
printReplicasetStatus(rs.Alias, "error")
return fmt.Errorf("[%s]: LSN wait timeout: error waiting LSN %d "+
"in vclock component %d on %s: time quota %d seconds "+
"exceeded", rs.Alias, masterUpgradeInfo.LSN,
masterUpgradeInfo.IID, fullInstanceName, lsnTimeout)
}
res, err := conn.Eval("return box.snapshot()", []any{},
connector.RequestOpts{})
if err != nil || len(res) == 0 {
printReplicasetStatus(rs.Alias, "error")
return fmt.Errorf("[%s][%s]: %s", rs.Alias, fullInstanceName, err)
}
}
printReplicasetStatus(rs.Alias, "ok")
}
return nil
}

0 comments on commit fe0f6a5

Please sign in to comment.