Skip to content

Commit

Permalink
Merge pull request #679 from cgalibern/dev
Browse files Browse the repository at this point in the history
fix om daemon leave regression
  • Loading branch information
cgalibern authored Jan 24, 2025
2 parents c62306b + 7b6f834 commit a2585be
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 43 deletions.
7 changes: 6 additions & 1 deletion core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ type (
}
)

var (
// DefaultClientTimeout is the default client timeout value used by New
DefaultClientTimeout = 5 * time.Second
)

// New allocates a new client configuration and returns the reference
// so users are not tempted to use client.Config{} dereferenced, which would
// make loadContext useless.
func New(opts ...funcopt.O) (*T, error) {
t := &T{
timeout: 5 * time.Second,
timeout: DefaultClientTimeout,
}
if err := funcopt.Apply(t, opts...); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion core/om/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func newCmdDaemonLeave() *cobra.Command {
},
}
flags := cmd.Flags()
flags.DurationVar(&options.Timeout, "timeout", 5*time.Second, "maximum duration to wait for local node removed from cluster")
flags.DurationVar(&options.Timeout, "timeout", 0, "maximum duration to wait for local node removed from cluster")
return cmd
}

Expand Down
98 changes: 57 additions & 41 deletions core/omcmd/daemon_leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/opensvc/om3/core/event"
"github.com/opensvc/om3/core/object"
"github.com/opensvc/om3/daemon/api"
"github.com/opensvc/om3/daemon/daemonenv"
"github.com/opensvc/om3/daemon/msgbus"
"github.com/opensvc/om3/util/hostname"
)
Expand All @@ -25,34 +26,52 @@ type (
// APINode is a cluster node where the leave request will be posted
APINode string

cli *client.T
localhost string
evReader event.ReadCloser
peerClient *client.T
localhost string
evReader event.ReadCloser
}
)

func (t *CmdDaemonLeave) Run() (err error) {
var (
tk string

tkCli *client.T
localClient *client.T

deadLine time.Time
)
leaveDeadLine := time.Now().Add(t.Timeout)
t.localhost = hostname.Hostname()
ctx, cancel := context.WithTimeout(context.Background(), t.Timeout)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err = t.checkParams(); err != nil {
return fmt.Errorf("daemon leave: %w", err)
if t.Timeout > 0 {
deadLine = time.Now().Add(t.Timeout)
ctxWithDeadline, deadlineCancel := context.WithDeadline(ctx, deadLine)
defer deadlineCancel()
ctx = ctxWithDeadline
}

if t.APINode, err = t.peerClusterNode(); err != nil {
return fmt.Errorf("daemon leave: unable to find a peer node to announce we are leaving: %w", err)
}

// Create token using local cli
if tkCli, err = client.New(client.WithTimeout(t.Timeout)); err != nil {
return fmt.Errorf("can't create client to get new token: %w", err)
if t.isRunning() {
if err := t.nodeDrain(ctx); err != nil {
return err
}
}

if localClient, err = client.New(); err != nil {
return fmt.Errorf("unable to create leave token: %w", err)
} else {
duration := fmt.Sprintf("%ds", int(leaveDeadLine.Sub(time.Now()).Seconds()))
params := api.PostAuthTokenParams{Duration: &duration, Role: &api.Roles{api.Leave}}
resp, err := tkCli.PostAuthTokenWithResponse(ctx, &params)
// the default token duration should be enough for next steps: post leave and wait for completion
params := api.PostAuthTokenParams{Role: &api.Roles{api.Leave}}
if deadLine, hasDeadline := ctx.Deadline(); hasDeadline {
// ensure token duration can be used until deadline reached
tkDuration := deadLine.Sub(time.Now()).String()
params.Duration = &tkDuration
}
resp, err := localClient.PostAuthTokenWithResponse(ctx, &params)
if err != nil {
return fmt.Errorf("can't get leave token: %w", err)
} else if resp.StatusCode() != http.StatusOK {
Expand All @@ -62,28 +81,22 @@ func (t *CmdDaemonLeave) Run() (err error) {
}
}

if t.isRunning() {
if err := t.nodeDrain(ctx); err != nil {
return err
}
}

t.cli, err = client.New(
client.WithURL(t.APINode),
t.peerClient, err = client.New(
client.WithURL(daemonenv.HTTPNodeURL(t.APINode)),
client.WithBearer(tk),
)
if err != nil {
return
}

if err := t.setEvReader(); err != nil {
if err := t.setEvReader(deadLine.Sub(time.Now())); err != nil {
return err
}
defer func() {
_ = t.evReader.Close()
}()

if err := t.leave(ctx, t.cli); err != nil {
if err := t.leave(ctx, t.peerClient); err != nil {
return err
}
if err := t.waitResult(ctx); err != nil {
Expand All @@ -110,18 +123,22 @@ func (t *CmdDaemonLeave) Run() (err error) {
return nil
}

func (t *CmdDaemonLeave) setEvReader() (err error) {
func (t *CmdDaemonLeave) setEvReader(duration time.Duration) (err error) {
filters := []string{
"LeaveSuccess,removed=" + t.localhost + ",node=" + t.APINode,
"LeaveError,leave-node=" + t.localhost,
"LeaveIgnored,leave-node=" + t.localhost,
}

t.evReader, err = t.cli.NewGetEvents().
getEvents := t.peerClient.NewGetEvents().
SetRelatives(false).
SetFilters(filters).
SetDuration(t.Timeout).
GetReader()
SetFilters(filters)

if duration > 0 {
getEvents.SetDuration(duration)
}

t.evReader, err = getEvents.GetReader()
return
}

Expand Down Expand Up @@ -168,22 +185,21 @@ func (t *CmdDaemonLeave) leave(ctx context.Context, c *client.T) error {
return nil
}

func (t *CmdDaemonLeave) checkParams() error {
func (t *CmdDaemonLeave) peerClusterNode() (string, error) {
if ccfg, err := object.NewCluster(object.WithVolatile(true)); err != nil {
return fmt.Errorf("retrieve cluster config: %w", err)
return "", err
} else if clusterNodes, err := ccfg.Nodes(); err != nil {
return fmt.Errorf("retrieve cluster nodes: %w", err)
} else if len(clusterNodes) < 2 {
return fmt.Errorf("not available on single node cluster")
} else if t.APINode != "" {
return "", err
} else if len(clusterNodes) == 0 {
return "", fmt.Errorf("unexpected cluster nodes: %v", clusterNodes)
} else if len(clusterNodes) == 1 {
return "", fmt.Errorf("not available on single node cluster")
} else {
for _, node := range clusterNodes {
if node != hostname.Hostname() {
t.APINode = node
return nil
if node != "" && node != hostname.Hostname() {
return node, nil
}
}
return fmt.Errorf("unable to find api node to post leave request")
} else {
return nil
return "", fmt.Errorf("unexpected cluster nodes: %v", clusterNodes)
}
}
3 changes: 3 additions & 0 deletions daemon/daemonapi/post_daemon_leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func (a *DaemonAPI) PostDaemonLeave(ctx echo.Context, params api.PostDaemonLeave
if node == "" {
log.Warnf("invalid node value: '%s'", node)
return JSONProblem(ctx, http.StatusBadRequest, "Invalid parameters", "Missing node param")
} else if node == a.localhost {
log.Infof("removal of localhost from cluster node is forbidden")
return JSONProblemf(ctx, http.StatusBadRequest, "Invalid parameters", "node '%s' is localhost", node)
}
log.Infof("publish leave request for node %s", node)
a.EventBus.Pub(&msgbus.LeaveRequest{Node: node}, a.LabelLocalhost, labelOriginAPI)
Expand Down

0 comments on commit a2585be

Please sign in to comment.