Skip to content

Commit

Permalink
Merge pull request #715 from cybozu-go/change-cancel
Browse files Browse the repository at this point in the history
change priorty of reboot queue cancel
  • Loading branch information
YZ775 authored Apr 1, 2024
2 parents 7d544a4 + 2d19ad3 commit e50e9fd
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 15 deletions.
19 changes: 19 additions & 0 deletions op/reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,25 @@ func (c rebootDequeueCommand) Command() cke.Command {
}
}

//

type rebootCancelOp struct {
rebootDequeueOp
}

// RebootCancelOp returns an Operator to dequeue cancelled reboot entries.
func RebootCancelOp(entries []*cke.RebootQueueEntry) cke.Operator {
return &rebootCancelOp{
rebootDequeueOp{
entries: entries,
},
}
}

func (o *rebootCancelOp) Name() string {
return "reboot-cancel"
}

func listProtectedNamespaces(ctx context.Context, cs *kubernetes.Clientset, ls *metav1.LabelSelector) (map[string]bool, error) {
selector, err := metav1.LabelSelectorAsSelector(ls)
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion op/reboot_decide.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ func CheckRebootDequeue(ctx context.Context, c *cke.Cluster, rqEntries []*cke.Re
for _, entry := range rqEntries {
switch {
case !entry.ClusterMember(c):
case entry.Status == cke.RebootStatusCancelled:
case entry.Status == cke.RebootStatusRebooting && rebootCompleted(ctx, c, entry):
default:
continue
Expand All @@ -257,6 +256,18 @@ func CheckRebootDequeue(ctx context.Context, c *cke.Cluster, rqEntries []*cke.Re
return dequeued
}

func CheckRebootCancelled(ctx context.Context, c *cke.Cluster, rqEntries []*cke.RebootQueueEntry) []*cke.RebootQueueEntry {
cancelled := []*cke.RebootQueueEntry{}

for _, entry := range rqEntries {
if entry.Status == cke.RebootStatusCancelled {
cancelled = append(cancelled, entry)
}
}

return cancelled
}

func rebootCompleted(ctx context.Context, c *cke.Cluster, entry *cke.RebootQueueEntry) bool {
if c.Reboot.CommandTimeoutSeconds != nil && *c.Reboot.CommandTimeoutSeconds != 0 {
var cancel context.CancelFunc
Expand Down
12 changes: 7 additions & 5 deletions server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,15 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
newlyDrained := op.ChooseDrainedNodes(cluster, apiServers, rqEntries)
drainCompleted, drainTimedout, _ := op.CheckDrainCompletion(ctx, inf, nf.HealthyAPIServer(), cluster, rqEntries)
rebootDequeued := op.CheckRebootDequeue(ctx, cluster, rqEntries)
rebootCancelled := op.CheckRebootCancelled(ctx, cluster, rqEntries)

ops, phase := DecideOps(cluster, status, constraints, rcs, DecideOpsRebootArgs{
RQEntries: rqEntries,
NewlyDrained: newlyDrained,
DrainCompleted: drainCompleted,
DrainTimedout: drainTimedout,
RebootDequeued: rebootDequeued,
RQEntries: rqEntries,
NewlyDrained: newlyDrained,
DrainCompleted: drainCompleted,
DrainTimedout: drainTimedout,
RebootDequeued: rebootDequeued,
RebootCancelled: rebootCancelled,
}, c.config)

st := &cke.ServerStatus{
Expand Down
16 changes: 11 additions & 5 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
)

type DecideOpsRebootArgs struct {
RQEntries []*cke.RebootQueueEntry
NewlyDrained []*cke.RebootQueueEntry
DrainCompleted []*cke.RebootQueueEntry
DrainTimedout []*cke.RebootQueueEntry
RebootDequeued []*cke.RebootQueueEntry
RQEntries []*cke.RebootQueueEntry
NewlyDrained []*cke.RebootQueueEntry
DrainCompleted []*cke.RebootQueueEntry
DrainTimedout []*cke.RebootQueueEntry
RebootDequeued []*cke.RebootQueueEntry
RebootCancelled []*cke.RebootQueueEntry
}

// DecideOps returns the next operations to do and the operation phase.
Expand Down Expand Up @@ -883,6 +884,11 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp
return nil, false
}

if len(rebootArgs.RebootCancelled) > 0 {
phaseReboot = true
ops = append(ops, op.RebootCancelOp(rebootArgs.RebootCancelled))
return ops, phaseReboot
}
if len(rebootArgs.NewlyDrained) > 0 {
phaseReboot = true
sshCheckNodes := make([]*cke.Node, 0, len(nf.cluster.Nodes))
Expand Down
13 changes: 9 additions & 4 deletions server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,11 @@ func (d testData) withRebootDequeued(entries []*cke.RebootQueueEntry) testData {
return d
}

func (d testData) withRebootCancelled(entries []*cke.RebootQueueEntry) testData {
d.RebootArgs.RebootCancelled = entries
return d
}

func (d testData) withDisableProxy() testData {
d.Cluster.Options.Proxy.Disable = true
return d
Expand Down Expand Up @@ -1274,14 +1279,14 @@ func TestDecideOps(t *testing.T) {
Node: nodeNames[2],
Status: cke.RebootStatusCancelled,
},
}).withRebootDequeued([]*cke.RebootQueueEntry{
}).withRebootCancelled([]*cke.RebootQueueEntry{
{
Index: 1,
Node: nodeNames[2],
Status: cke.RebootStatusCancelled,
},
}),
ExpectedOps: []opData{{"reboot-dequeue", 1}},
ExpectedOps: []opData{{"reboot-cancel", 1}},
},
{
Name: "UserResourceAdd",
Expand Down Expand Up @@ -2700,15 +2705,15 @@ func TestDecideOps(t *testing.T) {
Node: nodeNames[4],
Status: cke.RebootStatusCancelled,
},
}).withRebootDequeued([]*cke.RebootQueueEntry{
}).withRebootCancelled([]*cke.RebootQueueEntry{
{
Index: 1,
Node: nodeNames[4],
Status: cke.RebootStatusCancelled,
},
}),
ExpectedOps: []opData{
{"reboot-dequeue", 1},
{"reboot-cancel", 1},
},
},
}
Expand Down

0 comments on commit e50e9fd

Please sign in to comment.