From d7cdd22c34fb41cc8513c405e2e43ec67859a9a4 Mon Sep 17 00:00:00 2001 From: Nicholas Baker Date: Fri, 30 Aug 2024 16:43:18 -0700 Subject: [PATCH] nodeadm: add retry until daemon is detected running --- nodeadm/cmd/nodeadm/init/init.go | 7 ++- nodeadm/internal/aws/ecr/ecr.go | 3 +- nodeadm/internal/containerd/daemon.go | 21 ++++++- nodeadm/internal/containerd/sandbox.go | 3 +- nodeadm/internal/daemon/interface.go | 11 ++-- nodeadm/internal/daemon/systemd.go | 2 +- nodeadm/internal/kubelet/daemon.go | 21 ++++++- nodeadm/internal/util/retry.go | 79 ++++++++++++++++++++++---- 8 files changed, 122 insertions(+), 25 deletions(-) diff --git a/nodeadm/cmd/nodeadm/init/init.go b/nodeadm/cmd/nodeadm/init/init.go index febb1159c..9a4b666f7 100644 --- a/nodeadm/cmd/nodeadm/init/init.go +++ b/nodeadm/cmd/nodeadm/init/init.go @@ -2,6 +2,8 @@ package init import ( "context" + "os" + "os/signal" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -46,6 +48,9 @@ func (c *initCmd) Flaggy() *flaggy.Subcommand { } func (c *initCmd) Run(log *zap.Logger, opts *cli.GlobalOptions) error { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + log.Info("Checking user is root..") root, err := cli.IsRunningAsRoot() if err != nil { @@ -126,7 +131,7 @@ func (c *initCmd) Run(log *zap.Logger, opts *cli.GlobalOptions) error { nameField := zap.String("name", daemon.Name()) log.Info("Ensuring daemon is running..", nameField) - if err := daemon.EnsureRunning(); err != nil { + if err := daemon.EnsureRunning(ctx); err != nil { return err } log.Info("Daemon is running", nameField) diff --git a/nodeadm/internal/aws/ecr/ecr.go b/nodeadm/internal/aws/ecr/ecr.go index c70018980..3e317c314 100644 --- a/nodeadm/internal/aws/ecr/ecr.go +++ b/nodeadm/internal/aws/ecr/ecr.go @@ -6,7 +6,6 @@ import ( "go.uber.org/zap" "net" "strings" - "time" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/ecr" @@ -23,7 +22,7 @@ func GetAuthorizationToken(awsRegion string) (string, error) { } ecrClient := ecr.NewFromConfig(awsConfig) var token *ecr.GetAuthorizationTokenOutput - err = util.RetryExponentialBackoff(3, 2*time.Second, func() error { + err = util.NewRetrier(util.WithBackoffExponential()).Retry(context.TODO(), func() error { token, err = ecrClient.GetAuthorizationToken(context.Background(), &ecr.GetAuthorizationTokenInput{}) return err }) diff --git a/nodeadm/internal/containerd/daemon.go b/nodeadm/internal/containerd/daemon.go index 6a0ddd740..02b2fe31a 100644 --- a/nodeadm/internal/containerd/daemon.go +++ b/nodeadm/internal/containerd/daemon.go @@ -1,8 +1,13 @@ package containerd import ( + "context" + "fmt" + "time" + "github.com/awslabs/amazon-eks-ami/nodeadm/internal/api" "github.com/awslabs/amazon-eks-ami/nodeadm/internal/daemon" + "github.com/awslabs/amazon-eks-ami/nodeadm/internal/util" ) const ContainerdDaemonName = "containerd" @@ -23,8 +28,20 @@ func (cd *containerd) Configure(c *api.NodeConfig) error { return writeContainerdConfig(c) } -func (cd *containerd) EnsureRunning() error { - return cd.daemonManager.StartDaemon(ContainerdDaemonName) +func (cd *containerd) EnsureRunning(ctx context.Context) error { + if err := cd.daemonManager.StartDaemon(ContainerdDaemonName); err != nil { + return err + } + return util.NewRetrier(util.WithRetryAlways(), util.WithBackoffFixed(250*time.Millisecond)).Retry(ctx, func() error { + status, err := cd.daemonManager.GetDaemonStatus(ContainerdDaemonName) + if err != nil { + return err + } + if status != daemon.DaemonStatusRunning { + return fmt.Errorf("%s status is not %q", ContainerdDaemonName, daemon.DaemonStatusRunning) + } + return nil + }) } func (cd *containerd) PostLaunch(c *api.NodeConfig) error { diff --git a/nodeadm/internal/containerd/sandbox.go b/nodeadm/internal/containerd/sandbox.go index 41c97e318..42c58c1e9 100644 --- a/nodeadm/internal/containerd/sandbox.go +++ b/nodeadm/internal/containerd/sandbox.go @@ -1,6 +1,7 @@ package containerd import ( + "context" "fmt" "os/exec" "regexp" @@ -44,7 +45,7 @@ func cacheSandboxImage(cfg *api.NodeConfig) error { imageSpec := &v1.ImageSpec{Image: sandboxImage} authConfig := &v1.AuthConfig{Auth: ecrUserToken} - return util.RetryExponentialBackoff(3, 2*time.Second, func() error { + return util.NewRetrier(util.WithBackoffExponential()).Retry(context.TODO(), func() error { zap.L().Info("Pulling sandbox image..", zap.String("image", sandboxImage)) imageRef, err := client.PullImage(imageSpec, authConfig, nil) if err != nil { diff --git a/nodeadm/internal/daemon/interface.go b/nodeadm/internal/daemon/interface.go index 4ec53db8f..596acde0f 100644 --- a/nodeadm/internal/daemon/interface.go +++ b/nodeadm/internal/daemon/interface.go @@ -1,20 +1,21 @@ package daemon -import "github.com/awslabs/amazon-eks-ami/nodeadm/internal/api" +import ( + "context" + + "github.com/awslabs/amazon-eks-ami/nodeadm/internal/api" +) type Daemon interface { // Configure configures the daemon. Configure(*api.NodeConfig) error - // EnsureRunning ensures that the daemon is running. // If the daemon is not running, it will be started. // If the daemon is already running, and has been re-configured, it will be restarted. - EnsureRunning() error - + EnsureRunning(context.Context) error // PostLaunch runs any additional step that needs to occur after the service // daemon as been started PostLaunch(*api.NodeConfig) error - // Name returns the name of the daemon. Name() string } diff --git a/nodeadm/internal/daemon/systemd.go b/nodeadm/internal/daemon/systemd.go index a76db56a2..54010cca2 100644 --- a/nodeadm/internal/daemon/systemd.go +++ b/nodeadm/internal/daemon/systemd.go @@ -55,7 +55,7 @@ func (m *systemdDaemonManager) GetDaemonStatus(name string) (DaemonStatus, error if err != nil { return DaemonStatusUnknown, err } - switch status.Value.String() { + switch status.Value.Value().(string) { case "active": return DaemonStatusRunning, nil case "inactive": diff --git a/nodeadm/internal/kubelet/daemon.go b/nodeadm/internal/kubelet/daemon.go index f8fd77039..f807052fa 100644 --- a/nodeadm/internal/kubelet/daemon.go +++ b/nodeadm/internal/kubelet/daemon.go @@ -1,8 +1,13 @@ package kubelet import ( + "context" + "fmt" + "time" + "github.com/awslabs/amazon-eks-ami/nodeadm/internal/api" "github.com/awslabs/amazon-eks-ami/nodeadm/internal/daemon" + "github.com/awslabs/amazon-eks-ami/nodeadm/internal/util" ) const KubeletDaemonName = "kubelet" @@ -44,8 +49,20 @@ func (k *kubelet) Configure(cfg *api.NodeConfig) error { return nil } -func (k *kubelet) EnsureRunning() error { - return k.daemonManager.StartDaemon(KubeletDaemonName) +func (k *kubelet) EnsureRunning(ctx context.Context) error { + if err := k.daemonManager.StartDaemon(KubeletDaemonName); err != nil { + return err + } + return util.NewRetrier(util.WithRetryAlways(), util.WithBackoffFixed(250*time.Millisecond)).Retry(ctx, func() error { + status, err := k.daemonManager.GetDaemonStatus(KubeletDaemonName) + if err != nil { + return err + } + if status != daemon.DaemonStatusRunning { + return fmt.Errorf("%s status is not %q", KubeletDaemonName, daemon.DaemonStatusRunning) + } + return nil + }) } func (k *kubelet) PostLaunch(_ *api.NodeConfig) error { diff --git a/nodeadm/internal/util/retry.go b/nodeadm/internal/util/retry.go index 902395f23..f3e76e1eb 100644 --- a/nodeadm/internal/util/retry.go +++ b/nodeadm/internal/util/retry.go @@ -1,16 +1,73 @@ package util -import "time" - -func RetryExponentialBackoff(attempts int, initial time.Duration, f func() error) error { - var err error - wait := initial - for i := 0; i < attempts; i++ { - if err = f(); err == nil { - return nil +import ( + "context" + "time" +) + +type Retrier struct { + ConditionFn func(*Retrier) bool + BackoffFn func(*Retrier) time.Duration + + LastErr error + LastWait time.Duration + LastIter int +} + +func (r *Retrier) Retry(ctx context.Context, fn func() error) error { + for r.LastIter = 0; r.ConditionFn(r); r.LastIter++ { + if r.LastErr = fn(); r.LastErr == nil { + return r.LastErr + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + time.Sleep(r.LastWait) + r.LastWait = r.BackoffFn(r) } - time.Sleep(wait) - wait *= 2 } - return err + return r.LastErr +} + +type fnOpt func(*Retrier) + +func NewRetrier(fnOpts ...fnOpt) *Retrier { + retrier := Retrier{ + LastErr: nil, + LastIter: 0, + LastWait: time.Second, + } + for _, fn := range append([]fnOpt{ + WithRetryCount(5), + WithBackoffExponential(), + }, fnOpts...) { + fn(&retrier) + } + return &retrier +} + +func WithRetryCount(maxAttempts int) fnOpt { + return func(r *Retrier) { + r.ConditionFn = func(r *Retrier) bool { return r.LastIter < maxAttempts } + } +} + +func WithRetryAlways() fnOpt { + return func(r *Retrier) { + r.ConditionFn = func(r *Retrier) bool { return true } + } +} + +func WithBackoffFixed(interval time.Duration) fnOpt { + return func(r *Retrier) { + r.LastWait = interval + r.BackoffFn = func(r *Retrier) time.Duration { return r.LastWait } + } +} + +func WithBackoffExponential() fnOpt { + return func(r *Retrier) { + r.BackoffFn = func(r *Retrier) time.Duration { return r.LastWait * 2 } + } }