Skip to content

Commit

Permalink
replicaset: bootstrap for Cartridge
Browse files Browse the repository at this point in the history
Part of #316

@TarantoolBot document
Title: `tt replicaset bootstrap` bootstraps a Cartridge app or an instance

This patch adds new subcommand for the replicaset module.
`tt replicaset bootstrap [flags] <APP_NAME|APP_NAME:INSTANCE_NAME>` bootstraps a Cartridge cluster
or an instance.

The command combines two old cartridge-cli commands: `setup` and `join`.
Examples:

`tt replicaset bootstrap cartridge_app` bootstraps the cluster from the default
config file ("replicasets.yml" in the app directory), the file can be specified with `--file` option.

`tt replicaset bootstrap --bootstrap-vshard cartridge_app` bootstraps the cluster and vshard.

`tt replicaset bootstrap --replicaset replicaset cartridge_app:inst` joins the instance "inst" to the replicaset "replicaset".
  • Loading branch information
askalt authored and psergee committed Jul 16, 2024
1 parent 2e988d2 commit 8b5a6f3
Show file tree
Hide file tree
Showing 11 changed files with 592 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
versions >= 3.2.
- `tt enable`: create a symbolic link in 'instances_enabled' directory to a script or
an application directory.
- `tt replicaset bootstrap`: command to bootstrap a Cartridge cluster or an instance.

### Fixed

Expand Down
344 changes: 331 additions & 13 deletions cli/replicaset/cartridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,26 @@ package replicaset
import (
_ "embed"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/apex/log"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"gopkg.in/yaml.v2"

"github.com/tarantool/tt/cli/connector"
"github.com/tarantool/tt/cli/running"
"github.com/tarantool/tt/cli/util"
"github.com/tarantool/tt/cli/version"
)

const (
defaultCartridgeReplicasetsFilename = "replicasets.yml"
defaultCartridgeInstancesFilename = "instances.yml"
)

var (
//go:embed lua/cartridge/get_topology_replicasets_body.lua
cartridgeGetTopologyReplicasetsBody string
Expand Down Expand Up @@ -293,9 +304,312 @@ func (c *CartridgeApplication) Demote(ctx DemoteCtx) error {
return newErrDemoteByAppNotSupported(OrchestratorCartridge)
}

// Bootstrap is not supported for an application by the Cartridge orchestrator.
func (c *CartridgeApplication) Bootstrap(BootstrapCtx) error {
return newErrBootstrapByAppNotSupported(OrchestratorCartridge)
// cartridgeReplicasetConfig describes a replicaset config for the cartridge application.
type cartridgeReplicasetConfig struct {
Alias string `yaml:"alias,omitempty"`
Instances []string `yaml:"instances"`
Roles []string `yaml:"roles"`
Weight *float64 `yaml:"weight,omitempty"`
AllRW *bool `yaml:"all_rw,omitempty"`
VShardGroup *string `yaml:"vshard_group,omitempty"`
}

// cartridgeInstanceConfig describes a instance config for the cartridge application.
type cartridgeInstanceConfig struct {
URI string `yaml:"advertise_uri"`
}

// parseYaml parses YAML to the specified type.
func parseYaml[T any](filename string) (T, error) {
content, err := os.ReadFile(filename)
var ret T
if err != nil {
return ret, err
}
err = yaml.Unmarshal(content, &ret)
return ret, err
}

// getCartridgeReplicasetsConfig extracts a replicasets config.
func getCartridgeReplicasetsConfig(appDir,
filename string) (map[string]cartridgeReplicasetConfig, string, error) {
if filename == "" {
filename = filepath.Join(appDir, defaultCartridgeReplicasetsFilename)
}
filename, err := util.GetYamlFileName(filename, true)
if err != nil {
return nil, "", err
}
cfg, err := parseYaml[map[string]cartridgeReplicasetConfig](filename)
if err != nil {
return nil, filename, err
}
return cfg, filename, nil
}

// getCartridgeInstancesConfig extracts a instances config.
func getCartridgeInstancesConfig(appName,
appDir string) (map[string]cartridgeInstanceConfig, error) {
filename, err := util.GetYamlFileName(
filepath.Join(appDir, defaultCartridgeInstancesFilename), true)
if err != nil {
return nil, err
}
rawCfg, err := parseYaml[map[string]cartridgeInstanceConfig](filename)
if err != nil {
return nil, err
}

cfg := map[string]cartridgeInstanceConfig{}
appPrefix := fmt.Sprintf("%s.", appName)
for key, instCfg := range rawCfg {
instName, found := strings.CutPrefix(key, appPrefix)
if found {
cfg[instName] = instCfg
}
}
return cfg, nil
}

// Bootstrap bootstraps replicasets or a certain instance by the Cartridge orchestrator.
func (c *CartridgeApplication) Bootstrap(ctx BootstrapCtx) error {
if len(c.runningCtx.Instances) == 0 {
return fmt.Errorf("failed to bootstrap: there are no running instances")
}
var (
appDir = c.runningCtx.Instances[0].AppDir
appName = c.runningCtx.Instances[0].AppName
)
instancesCfg, err := getCartridgeInstancesConfig(appName, appDir)
if err != nil {
return fmt.Errorf("failed to get instances config: %w", err)
}
discovered, err := c.Discovery(SkipCache)
if err != nil {
return fmt.Errorf("failed to discovery: %w", err)
}

var (
eval InstanceEvalFunc
instances = filterDiscovered(c.runningCtx.Instances, discovered)
)
if ctx.Instance != "" {
// Bootstrap an instance.
eval = func(_ running.InstanceCtx, evaler connector.Evaler) (bool, error) {
return true,
c.bootstrapInstance(ctx.Instance, ctx.Replicaset, evaler, discovered,
instancesCfg, ctx.Timeout)
}
} else {
// Bootstrap a replicasets from the config.
replicasetCfg, replicasetscCfgPath, err := getCartridgeReplicasetsConfig(appDir,
ctx.ReplicasetsFile)
if err != nil {
return fmt.Errorf("failed to get replicasets config: %w", err)
}
log.Infof("Bootstrap replicasets described in %s", replicasetscCfgPath)
eval = func(_ running.InstanceCtx, evaler connector.Evaler) (bool, error) {
return true,
c.bootstrapReplicasets(evaler, discovered, replicasetCfg, instancesCfg, ctx.Timeout)
}
if discovered.State != StateBootstrapped {
// Initial bootstrapping, use some instance from the config.
cfgInstNames := map[string]struct{}{}
for _, replicaset := range replicasetCfg {
for _, inst := range replicaset.Instances {
cfgInstNames[inst] = struct{}{}
}
}
instances = filterInstances(c.runningCtx.Instances, func(
inst running.InstanceCtx) bool {
_, ok := cfgInstNames[inst.InstName]
return ok
})
}
}

if len(instances) == 0 {
return fmt.Errorf("not found any instance to perform bootstrapping")
}
err = EvalForeach(instances, InstanceEvalFunc(eval))
if err != nil {
return err
}

if ctx.BootstrapVShard {
// VShard bootstrapping takes the instances from the discovery cache, so re-discovery.
_, err = c.Discovery(SkipCache)
if err != nil {
return fmt.Errorf("failed to re-discovery: %w", err)
}
// Bootstrap vshard.
err = c.BootstrapVShard(VShardBootstrapCtx{Timeout: ctx.Timeout})
}
return err
}

// bootstrapInstance bootstrap an instance.
func (c *CartridgeApplication) bootstrapInstance(instanceName, replicasetName string,
evaler connector.Evaler, discovered Replicasets,
instancesCfg map[string]cartridgeInstanceConfig, timeout int) error {
if replicasetName == "" {
return fmt.Errorf("a replicaset name is empty")
}
var (
replicasetUUID string
found bool
)
for _, replicaset := range discovered.Replicasets {
if replicaset.Alias == replicasetName {
found = true
replicasetUUID = replicaset.UUID
break
}
for _, instance := range replicaset.Instances {
if instance.Alias == instanceName {
return fmt.Errorf("instance %q is bootstrapped already", instanceName)
}
}
}
if !found {
return fmt.Errorf("a replicaset %q not found in the bootstrapped cluster", replicasetName)
}
instancesUUID := map[string]string{}
joinOpts, err := getCartridgeJoinServersOpts(instancesCfg, []string{instanceName},
instancesUUID)
if err != nil {
return err
}
opts := []cartridgeEditReplicasetsOpts{{
UUID: &replicasetUUID,
JoinServers: joinOpts,
}}
return cartridgeEditReplicasets(evaler, opts, timeout)
}

// bootstrapReplicasets bootstraps replicasets from the config.
func (c *CartridgeApplication) bootstrapReplicasets(evaler connector.Evaler, discovered Replicasets,
replicasetsCfg map[string]cartridgeReplicasetConfig,
instancesCfg map[string]cartridgeInstanceConfig,
timeout int) error {

majorVer, err := getCartridgeMajorVersion(evaler)
if err != nil {
return fmt.Errorf("failed to get cartridge major version: %w", err)
}
if majorVer < 2 && discovered.State != StateBootstrapped {
if len(replicasetsCfg) == 0 {
return fmt.Errorf("empty replicasets config")
}
for rname, cfg := range replicasetsCfg {
// Create first replicaset with single instance, since in the old Cartridge
// bootstrapping cluster from scratch should be performed
// on a single-server replicaset only.
if len(cfg.Instances) == 0 {
return fmt.Errorf("replicaset %q is empty", rname)
}
instances := cfg.Instances
initialReplicasetCfg := cfg
initialReplicasetCfg.Instances, cfg.Instances = instances[:1], instances[1:]

initialCfg := map[string]cartridgeReplicasetConfig{}
initialCfg[rname] = initialReplicasetCfg
if len(cfg.Instances) == 0 {
// There are no more instances to bootstrap.
delete(replicasetsCfg, rname)
} else {
replicasetsCfg[rname] = cfg
}

if err := updateCartridgeReplicasets(evaler, discovered, initialCfg,
instancesCfg, timeout); err != nil {
return err
}
break
}
}

if err := updateCartridgeReplicasets(evaler, discovered, replicasetsCfg,
instancesCfg, timeout); err != nil {
return err
}

return err
}

// getCartridgeJoinServersOpts returns opts to join new servers.
// It lookups for an instance in the UUID map and if an instance is not found,
// generates new UUID and adds the instance to the join options.
func getCartridgeJoinServersOpts(instancesCfg map[string]cartridgeInstanceConfig,
instances []string, instancesUUID map[string]string) ([]cartridgeJoinServersOpts, error) {
opts := make([]cartridgeJoinServersOpts, 0)
for _, instance := range instances {
if _, UUIDExists := instancesUUID[instance]; UUIDExists {
continue
}
cfg, found := instancesCfg[instance]
if !found {
return nil, fmt.Errorf("instance %q not found in the instance config", instance)
}
instanceUUID := uuid.New().String()
instancesUUID[instance] = instanceUUID
opts = append(opts, cartridgeJoinServersOpts{
URI: cfg.URI,
UUID: &instanceUUID,
})
}
return opts, nil
}

// updateCartridgeReplicasets updates replicasets using the config.
// If some instance was not discovered, creates it.
func updateCartridgeReplicasets(evaler connector.Evaler, discovered Replicasets,
replicasetCfg map[string]cartridgeReplicasetConfig,
instancesCfg map[string]cartridgeInstanceConfig,
timeout int) error {
instanceUUID := map[string]string{}
replicasetUUID := map[string]string{}
for _, replicaset := range discovered.Replicasets {
replicasetUUID[replicaset.Alias] = replicaset.UUID
for _, instance := range replicaset.Instances {
instanceUUID[instance.Alias] = instance.UUID
}
}

var editOpts []cartridgeEditReplicasetsOpts
for rname, rcfg := range replicasetCfg {
replicasetName := rname
opts := cartridgeEditReplicasetsOpts{
Alias: &replicasetName,
Roles: rcfg.Roles,
AllRW: rcfg.AllRW,
Weight: rcfg.Weight,
VshardGroup: rcfg.VShardGroup,
}
if uuid, found := replicasetUUID[replicasetName]; found {
// Link opts to the existing replicaset.
// admin_edit_topology() recognizes replicasets by UUID.
opts.UUID = &uuid
}
var err error
opts.JoinServers, err = getCartridgeJoinServersOpts(instancesCfg,
rcfg.Instances, instanceUUID)
if err != nil {
return err
}
var failoverPriority []string
for _, inst := range rcfg.Instances {
uuid, ok := instanceUUID[inst]
if !ok {
return fmt.Errorf("instance %q uuid not found", inst)
}
failoverPriority = append(failoverPriority, uuid)
}
opts.FailoverPriority = failoverPriority
editOpts = append(editOpts, opts)
}

return cartridgeEditReplicasets(evaler, editOpts, timeout)
}

// Expel expels an instance from a Cartridge replicasets.
Expand Down Expand Up @@ -447,18 +761,22 @@ func cartridgeHealthCheckIsNeeded(evaler connector.Evaler) (bool, error) {
return major < 2, nil
}

// cartridgeJoinServersOpts describes options for server joining.
type cartridgeJoinServersOpts struct {
URI string `msgpack:"uri"`
UUID *string `msgpack:"uuid,omitempty"`
}

// cartridgeEditReplicasetsOpts describes options for replicaset editing.
type cartridgeEditReplicasetsOpts struct {
UUID *string `msgpack:"uuid,omitempty"`
Alias *string `msgpack:"alias,omitempty"`
Roles []string `msgpack:"roles,omitempty"`
AllRW *bool `msgpack:"all_rw,omitempty"`
Weight *float64 `msgpack:"weight,omitempty"`
VshardGroup *string `msgpack:"vshard_group,omitempty"`
JoinServers []struct {
URI string `msgpack:"uri"`
} `msgpack:"join_servers,omitempty"`
FailoverPriority []string `msgpack:"failover_priority,omitempty"`
UUID *string `msgpack:"uuid,omitempty"`
Alias *string `msgpack:"alias,omitempty"`
Roles []string `msgpack:"roles,omitempty"`
AllRW *bool `msgpack:"all_rw,omitempty"`
Weight *float64 `msgpack:"weight,omitempty"`
VshardGroup *string `msgpack:"vshard_group,omitempty"`
JoinServers []cartridgeJoinServersOpts `msgpack:"join_servers,omitempty"`
FailoverPriority []string `msgpack:"failover_priority,omitempty"`
}

// cartridgeEditInstancesOpts describes options for instances editing.
Expand Down
2 changes: 1 addition & 1 deletion cli/replicaset/cartridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestCartridgeApplication_Bootstrap(t *testing.T) {
app := replicaset.NewCartridgeApplication(running.RunningCtx{})
err := app.Bootstrap(replicaset.BootstrapCtx{})
assert.EqualError(t, err,
`bootstrap is not supported for an application by "cartridge" orchestrator`)
`failed to bootstrap: there are no running instances`)
}

func TestCartridgeInstance_Demote(t *testing.T) {
Expand Down
Loading

0 comments on commit 8b5a6f3

Please sign in to comment.