Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay repair of a rebooting unreachable machine #732

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This project employs a versioning scheme described in [RELEASE.md](RELEASE.md#ve

### Added

- Add sabakan-triggered automatic repair functionality in [#725](https://github.com/cybozu-go/cke/pull/725)
- Add sabakan-triggered automatic repair functionality in [#725](https://github.com/cybozu-go/cke/pull/725) and [#732](https://github.com/cybozu-go/cke/pull/732)

### Fixed

Expand Down
2 changes: 2 additions & 0 deletions constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Constraints struct {
MaximumWorkers int `json:"maximum-workers"`
RebootMaximumUnreachable int `json:"maximum-unreachable-nodes-for-reboot"`
MaximumRepairs int `json:"maximum-repair-queue-entries"`
RepairRebootingSeconds int `json:"wait-seconds-to-repair-rebooting"`
}

// Check checks the cluster satisfies the constraints
Expand Down Expand Up @@ -43,5 +44,6 @@ func DefaultConstraints() *Constraints {
MaximumWorkers: 0,
RebootMaximumUnreachable: 0,
MaximumRepairs: 0,
RepairRebootingSeconds: 0,
}
}
1 change: 1 addition & 0 deletions docs/ckecli.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ Set a constraint on the cluster configuration.
- `maximum-workers`
- `maximum-unreachable-nodes-for-reboot`
- `maximum-repair-queue-entries`
- `wait-seconds-to-repair-rebooting`

### `ckecli constraints show`

Expand Down
1 change: 1 addition & 0 deletions docs/constraints.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ Cluster should satisfy these constraints.
| `maximum-workers` | int | 0 | The maximum number of worker nodes. 0 means unlimited. |
| `maximum-unreachable-nodes-for-reboot` | int | 0 | The maximum number of unreachable nodes allowed for operating reboot. |
| `maximum-repair-queue-entries` | int | 0 | The maximum number of repair queue entries |
| `wait-seconds-to-repair-rebooting` | int | 0 | The wait time in seconds to repair a rebooting machine |
6 changes: 6 additions & 0 deletions docs/sabakan-triggered-repair.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ The maximum number of recent repair queue entries and new failure reports is [co

As stated above, CKE considers all persisting queue entries as "recent" for simplicity.

### Limiter for planned reboot

A machine may become "UNREACHABLE" very quickly even if it is [being rebooted in a planned manner](reboot.md).
CKE should wait for a while before starting repair operations for a rebooting machine.

A user can [configure the wait time](ckecli.md#ckecli-constraints-set-name-value) as a [constraint `wait-seconds-to-repair-rebooting`](constraints.md)

[sabakan]: https://github.com/cybozu-go/sabakan
[schema]: https://github.com/cybozu-go/sabakan/blob/main/gql/graph/schema.graphqls
5 changes: 5 additions & 0 deletions pkg/ckecli/cmd/constraints_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ NAME is one of:
maximum-workers
maximum-unreachable-nodes-for-reboot
maximum-repair-queue-entries
wait-seconds-to-repair-rebooting

VALUE is an integer.`,

Expand Down Expand Up @@ -58,6 +59,10 @@ VALUE is an integer.`,
cstrSet = func(cstr *cke.Constraints) {
cstr.MaximumRepairs = val
}
case "wait-seconds-to-repair-rebooting":
cstrSet = func(cstr *cke.Constraints) {
cstr.RepairRebootingSeconds = val
}
default:
return errors.New("no such constraint: " + args[0])
}
Expand Down
8 changes: 7 additions & 1 deletion sabakan/integrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,18 @@ func (ig integrator) runRepairer(ctx context.Context, clusterStatus *cke.Cluster
return nil
}

rebootEntries, err := st.GetRebootsEntries(ctx)
if err != nil {
return err
}
rebootEntries = cke.DedupRebootQueueEntries(rebootEntries)

constraints, err := st.GetConstraints(ctx)
if err != nil {
return err
}

entries := Repairer(machines, clusterStatus.RepairQueue.Entries, constraints)
entries := Repairer(machines, clusterStatus.RepairQueue.Entries, rebootEntries, constraints, time.Now().UTC())

for _, entry := range entries {
err := st.RegisterRepairsEntry(ctx, entry)
Expand Down
24 changes: 21 additions & 3 deletions sabakan/repairer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,27 @@ package sabakan

import (
"strings"
"time"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/log"
)

func Repairer(machines []Machine, entries []*cke.RepairQueueEntry, constraints *cke.Constraints) []*cke.RepairQueueEntry {
func Repairer(machines []Machine, repairEntries []*cke.RepairQueueEntry, rebootEntries []*cke.RebootQueueEntry, constraints *cke.Constraints, now time.Time) []*cke.RepairQueueEntry {
recent := make(map[string]bool)
for _, entry := range entries {
for _, entry := range repairEntries {
// entry.Operation is ignored when checking duplication
recent[entry.Address] = true
}

rebootLimit := now.Add(time.Duration(-constraints.RepairRebootingSeconds) * time.Second)
rebootingSince := make(map[string]time.Time)
for _, entry := range rebootEntries {
if entry.Status == cke.RebootStatusRebooting {
rebootingSince[entry.Node] = entry.LastTransitionTime // entry.Node denotes IP address
}
}

newMachines := make([]Machine, 0, len(machines))
for _, machine := range machines {
if len(machine.Spec.IPv4) == 0 {
Expand All @@ -31,10 +40,19 @@ func Repairer(machines []Machine, entries []*cke.RepairQueueEntry, constraints *
continue
}

since, ok := rebootingSince[machine.Spec.IPv4[0]]
if ok && since.After(rebootLimit) && machine.Status.State == StateUnreachable {
log.Info("ignore rebooting unreachable machine", map[string]interface{}{
"serial": machine.Spec.Serial,
"address": machine.Spec.IPv4[0],
})
continue
}

newMachines = append(newMachines, machine)
}

if len(entries)+len(newMachines) > constraints.MaximumRepairs {
if len(repairEntries)+len(newMachines) > constraints.MaximumRepairs {
log.Warn("ignore too many repair requests", nil)
return nil
}
Expand Down
56 changes: 52 additions & 4 deletions sabakan/repairer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sabakan

import (
"testing"
"time"

"github.com/cybozu-go/cke"
"github.com/google/go-cmp/cmp"
Expand All @@ -10,75 +11,122 @@ import (

func TestRepairer(t *testing.T) {
constraints := &cke.Constraints{
MaximumRepairs: 3,
MaximumRepairs: 3,
RepairRebootingSeconds: 300,
}

machines := []Machine{
{Spec: MachineSpec{Serial: "0000"}, Status: MachineStatus{State: StateUnhealthy}},
{Spec: MachineSpec{Serial: "1111", IPv4: []string{"1.1.1.1"}, BMC: BMC{Type: "type1"}}, Status: MachineStatus{State: StateUnhealthy}},
{Spec: MachineSpec{Serial: "1111", IPv4: []string{"1.1.1.1"}, BMC: BMC{Type: "type1"}}, Status: MachineStatus{State: StateUnreachable}},
{Spec: MachineSpec{Serial: "2222", IPv4: []string{"2.2.2.2"}, BMC: BMC{Type: "type2"}}, Status: MachineStatus{State: StateUnhealthy}},
{Spec: MachineSpec{Serial: "3333", IPv4: []string{"3.3.3.3"}, BMC: BMC{Type: "type3"}}, Status: MachineStatus{State: StateUnreachable}},
{Spec: MachineSpec{Serial: "4444", IPv4: []string{"4.4.4.4"}, BMC: BMC{Type: "type4"}}, Status: MachineStatus{State: StateUnreachable}},
}

entries := []*cke.RepairQueueEntry{
nil,
cke.NewRepairQueueEntry("unhealthy", "type1", "1.1.1.1"),
cke.NewRepairQueueEntry("unreachable", "type1", "1.1.1.1"),
cke.NewRepairQueueEntry("unhealthy", "type2", "2.2.2.2"),
cke.NewRepairQueueEntry("unreachable", "type3", "3.3.3.3"),
cke.NewRepairQueueEntry("unreachable", "type4", "4.4.4.4"),
}

now := time.Now().UTC()
recent := now.Add(-30 * time.Second)
stale := now.Add(-3000 * time.Second)
rebootEntries := []*cke.RebootQueueEntry{
nil,
{Node: "1.1.1.1", Status: cke.RebootStatusRebooting, LastTransitionTime: recent},
{Node: "2.2.2.2", Status: cke.RebootStatusRebooting, LastTransitionTime: recent},
{Node: "3.3.3.3", Status: cke.RebootStatusRebooting, LastTransitionTime: stale},
{Node: "4.4.4.4", Status: cke.RebootStatusDraining, LastTransitionTime: recent},
}

tests := []struct {
name string
failedMachines []Machine
queuedEntries []*cke.RepairQueueEntry
rebootEntries []*cke.RebootQueueEntry
expectedEntries []*cke.RepairQueueEntry
}{
{
name: "NoFailedMachine",
failedMachines: []Machine{},
queuedEntries: []*cke.RepairQueueEntry{entries[2]},
rebootEntries: nil,
expectedEntries: []*cke.RepairQueueEntry{},
},
{
name: "OneFailedMachine",
failedMachines: []Machine{machines[1]},
queuedEntries: []*cke.RepairQueueEntry{entries[2]},
rebootEntries: nil,
expectedEntries: []*cke.RepairQueueEntry{entries[1]},
},
{
name: "IgnoreNoIPAddress",
failedMachines: []Machine{machines[0], machines[1]},
queuedEntries: []*cke.RepairQueueEntry{entries[2]},
rebootEntries: nil,
expectedEntries: []*cke.RepairQueueEntry{entries[1]},
},
{
name: "IgnoreRecentlyRepaired",
failedMachines: []Machine{machines[1], machines[2], machines[3]},
queuedEntries: []*cke.RepairQueueEntry{entries[2]},
rebootEntries: nil,
expectedEntries: []*cke.RepairQueueEntry{entries[1], entries[3]},
},
{
name: "IgnoreRecentlyRepairedWithDifferentOperation",
failedMachines: []Machine{machines[1], machines[2], machines[3]},
queuedEntries: []*cke.RepairQueueEntry{cke.NewRepairQueueEntry("unreachable", "type2", "2.2.2.2")},
rebootEntries: nil,
expectedEntries: []*cke.RepairQueueEntry{entries[1], entries[3]},
},
{
name: "IgnoreTooManyFailedMachines",
failedMachines: []Machine{machines[1], machines[2], machines[3]},
queuedEntries: []*cke.RepairQueueEntry{entries[2], entries[4]},
rebootEntries: nil,
expectedEntries: []*cke.RepairQueueEntry{},
},
{
name: "IgnoreRebootingUnreachableMachine",
failedMachines: []Machine{machines[1]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[1]},
expectedEntries: []*cke.RepairQueueEntry{},
},
{
name: "RebootingButUnhealthy",
failedMachines: []Machine{machines[2]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[2]},
expectedEntries: []*cke.RepairQueueEntry{entries[2]},
},
{
name: "RebootingButStale",
failedMachines: []Machine{machines[3]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[3]},
expectedEntries: []*cke.RepairQueueEntry{entries[3]},
},
{
name: "QueuedButNotRebooting",
failedMachines: []Machine{machines[4]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[4]},
expectedEntries: []*cke.RepairQueueEntry{entries[4]},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

entries := Repairer(tt.failedMachines, tt.queuedEntries, constraints)
entries := Repairer(tt.failedMachines, tt.queuedEntries, tt.rebootEntries, constraints, now)
if !cmp.Equal(entries, tt.expectedEntries, cmpopts.EquateEmpty()) {
t.Errorf("!cmp.Equal(entries, tt.newEntries), actual: %v, expected: %v", entries, tt.expectedEntries)
}
Expand Down
2 changes: 1 addition & 1 deletion server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,8 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain

// Sort/filter entries to limit the number of concurrent repairs.
// - Entries being deleted are dequeued unconditionally.
// - Entries just repaired are moved to succeeded status unconditionally.
// - Succeeded/failed entries are left unchanged.
// - Entries just repaired are moved to succeeded status.
// - Entries already being processed have higher priority than newly queued entries.
// - Entries waiting for unexpired drain-retry-timeout are filtered out.
// - Other types of timeout-wait are considered as "being processed" and
Expand Down