Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait for node to report address before removing taint #158

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 71 additions & 9 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func prepareLogger(level string, json bool) *logrus.Entry {
return log
}

func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Interface, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Interface, assigner address.Assigner, node *types.Node, cfg *config.Config) (string, error) {
ctx, cancel := context.WithCancel(c)
defer cancel()

Expand All @@ -101,22 +101,23 @@ func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Inter
"retry-counter": retryCounter,
"retry-attempts": cfg.RetryAttempts,
}).Debug("assigning static public IP address to node")
err := func(ctx context.Context) error {
assignedAddress, err := func(ctx context.Context) (string, error) {
if err := lock.Lock(ctx); err != nil {
return errors.Wrap(err, "failed to acquire lock")
return "", errors.Wrap(err, "failed to acquire lock")
}
log.Debug("lock acquired")
defer func() {
lock.Unlock(ctx) //nolint:errcheck
log.Debug("lock released")
}()
if err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy); err != nil {
return err //nolint:wrapcheck
assignedAddress, err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy)
if err != nil {
return "", err //nolint:wrapcheck
}
return nil
return assignedAddress, nil
}(c)
if err == nil || errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
return nil
return assignedAddress, nil
}

log.WithError(err).WithFields(logrus.Fields{
Expand All @@ -130,7 +131,64 @@ func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Inter
continue
case <-ctx.Done():
// If the context is done, return an error indicating that the operation was cancelled
return errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
return "", errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
}
}
return "", errors.New("reached maximum number of retries")
}

func waitForAddressToBeReported(c context.Context, log *logrus.Entry, explorer nd.Explorer, node *types.Node, assignedAddress string, cfg *config.Config) error {
ctx, cancel := context.WithCancel(c)
defer cancel()

// ticker for retry interval
ticker := time.NewTicker(cfg.RetryInterval)
defer ticker.Stop()

for retryCounter := 0; retryCounter <= cfg.RetryAttempts; retryCounter++ {
log.WithFields(logrus.Fields{
"node": node.Name,
"instance": node.Instance,
"address": assignedAddress,
"retry-counter": retryCounter,
"retry-attempts": cfg.RetryAttempts,
}).Debug("Waiting for node to report assigned address")

nodeInfo, err := explorer.GetNode(ctx, node.Name)
if err == nil {
for _, ip := range nodeInfo.ExternalIPs {
if ip.String() == assignedAddress {
log.WithFields(logrus.Fields{
"node": node.Name,
"instance": node.Instance,
"address": assignedAddress,
"retry-counter": retryCounter,
"retry-attempts": cfg.RetryAttempts,
}).Info("Node is reporting assigned address")
return nil
}
}
log.WithError(err).WithFields(logrus.Fields{
"node": node.Name,
"instance": node.Instance,
"address": assignedAddress,
}).Warn("Node is not yet reporting the assigned address")
} else {
log.WithError(err).WithFields(logrus.Fields{
"node": node.Name,
"instance": node.Instance,
"address": assignedAddress,
}).Error("failed to check if node is reporting the assigned address")
}

log.Infof("retrying after %v", cfg.RetryInterval)

select {
case <-ticker.C:
continue
case <-ctx.Done():
// If the context is done, return an error indicating that the operation was cancelled
return errors.Wrap(ctx.Err(), "context cancelled while waiting for node to report assigned address")
}
}
return errors.New("reached maximum number of retries")
Expand Down Expand Up @@ -169,12 +227,16 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
return errors.Wrap(err, "initializing assigner")
}

err = assignAddress(ctx, log, clientset, assigner, n, cfg)
assignedAddress, err := assignAddress(ctx, log, clientset, assigner, n, cfg)
if err != nil {
return errors.Wrap(err, "assigning static public IP address")
}

if cfg.TaintKey != "" {
if err := waitForAddressToBeReported(ctx, log, explorer, n, assignedAddress, cfg); err != nil {
return errors.Wrap(err, "waiting for node to report assigned address")
}

logger := log.WithField("taint-key", cfg.TaintKey)
tainter := nd.NewTainter(clientset)

Expand Down
Loading
Loading