Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/concurrency-issue #139

Merged
merged 3 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/doitintl/kubeip/internal/address"
"github.com/doitintl/kubeip/internal/config"
"github.com/doitintl/kubeip/internal/lease"
nd "github.com/doitintl/kubeip/internal/node"
"github.com/doitintl/kubeip/internal/types"
"github.com/pkg/errors"
Expand All @@ -23,8 +24,10 @@ import (
type contextKey string

const (
developModeKey contextKey = "develop-mode"
unassignTimeout = 5 * time.Minute
developModeKey contextKey = "develop-mode"
unassignTimeout = 5 * time.Minute
kubeipLockName = "kubeip-lock"
defaultLeaseDuration = 5
)

var (
Expand Down Expand Up @@ -79,14 +82,17 @@ func prepareLogger(level string, json bool) *logrus.Entry {
return log
}

func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Interface, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
ctx, cancel := context.WithCancel(c)
defer cancel()

// ticker for retry interval
ticker := time.NewTicker(cfg.RetryInterval)
defer ticker.Stop()

// create new cluster wide lock
lock := lease.NewKubeLeaseLock(client, kubeipLockName, "default", node.Instance, cfg.LeaseDuration)

for retryCounter := 0; retryCounter <= cfg.RetryAttempts; retryCounter++ {
log.WithFields(logrus.Fields{
"node": node.Name,
Expand All @@ -95,7 +101,20 @@ func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assign
"retry-counter": retryCounter,
"retry-attempts": cfg.RetryAttempts,
}).Debug("assigning static public IP address to node")
err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy)
err := func(ctx context.Context) error {
if err := lock.Lock(ctx); err != nil {
return errors.Wrap(err, "failed to acquire lock")
}
log.Debug("lock acquired")
defer func() {
lock.Unlock(ctx) //nolint:errcheck
log.Debug("lock released")
}()
if err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy); err != nil {
return err //nolint:wrapcheck
}
return nil
}(c)
if err == nil || errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
return nil
}
Expand Down Expand Up @@ -152,7 +171,7 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
errorCh := make(chan error, 1) // buffered channel to avoid goroutine leak
go func() {
defer close(errorCh) // close the channel when the goroutine exits to avoid goroutine leak
e := assignAddress(ctx, log, assigner, n, cfg)
e := assignAddress(ctx, log, clientset, assigner, n, cfg)
if e != nil {
errorCh <- e
}
Expand Down Expand Up @@ -267,6 +286,13 @@ func main() {
EnvVars: []string{"RETRY_ATTEMPTS"},
Category: "Configuration",
},
&cli.IntFlag{
Name: "lease-duration",
Usage: "duration of the kubernetes lease",
Value: defaultLeaseDuration,
EnvVars: []string{"LEASE_DURATION"},
Category: "Configuration",
},
&cli.BoolFlag{
Name: "release-on-exit",
Usage: "release the static public IP address on exit",
Expand Down
9 changes: 8 additions & 1 deletion cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
mocks "github.com/doitintl/kubeip/mocks/address"
"github.com/pkg/errors"
tmock "github.com/stretchr/testify/mock"
"k8s.io/client-go/kubernetes/fake"
)

func Test_assignAddress(t *testing.T) {
Expand Down Expand Up @@ -45,6 +46,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 3,
RetryInterval: time.Millisecond,
LeaseDuration: 1,
},
},
},
Expand All @@ -70,6 +72,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 3,
RetryInterval: time.Millisecond,
LeaseDuration: 1,
},
},
},
Expand All @@ -93,6 +96,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 3,
RetryInterval: time.Millisecond,
LeaseDuration: 1,
},
},
wantErr: true,
Expand Down Expand Up @@ -125,6 +129,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 10,
RetryInterval: 5 * time.Millisecond,
LeaseDuration: 1,
},
},
wantErr: true,
Expand Down Expand Up @@ -152,6 +157,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 3,
RetryInterval: 15 * time.Millisecond,
LeaseDuration: 1,
},
},
wantErr: true,
Expand All @@ -161,7 +167,8 @@ func Test_assignAddress(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
log := prepareLogger("debug", false)
assigner := tt.args.assignerFn(t)
if err := assignAddress(tt.args.c, log, assigner, tt.args.node, tt.args.cfg); (err != nil) != tt.wantErr {
client := fake.NewSimpleClientset()
if err := assignAddress(tt.args.c, log, client, assigner, tt.args.node, tt.args.cfg); (err != nil) != tt.wantErr {
t.Errorf("assignAddress() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
5 changes: 5 additions & 0 deletions examples/aws/eks.tf
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ resource "kubernetes_cluster_role" "kubeip_cluster_role" {
resources = ["nodes"]
verbs = ["get"]
}
rule {
api_groups = ["coordination.k8s.io"]
resources = ["leases"]
verbs = ["create", "delete", "get", "list", "update"]
}
depends_on = [
kubernetes_service_account.kubeip_service_account,
module.eks
Expand Down
9 changes: 9 additions & 0 deletions examples/gcp/gke.tf
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ resource "kubernetes_cluster_role" "kubeip_cluster_role" {
resources = ["nodes"]
verbs = ["get"]
}
rule {
api_groups = ["coordination.k8s.io"]
resources = ["leases"]
verbs = ["create", "delete", "get", "list", "update"]
}
depends_on = [
kubernetes_service_account.kubeip_service_account,
google_container_cluster.kubeip_cluster
Expand Down Expand Up @@ -303,6 +308,10 @@ resource "kubernetes_daemonset" "kubeip_daemonset" {
name = "LOG_JSON"
value = "true"
}
env {
name = "LEASE_DURATION"
value = "20"
}
resources {
requests = {
cpu = "100m"
Expand Down
7 changes: 0 additions & 7 deletions internal/address/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package address
import (
"context"
"fmt"
"math/rand"
"strings"
"time"

Expand All @@ -27,7 +26,6 @@ const (
accessConfigKind = "compute#accessConfig"
defaultPrefixLength = 96
maxRetries = 10 // number of retries for assigning ephemeral public IP address
maxWaitListTime = 10 // max time to wait before listing addresses
)

var (
Expand Down Expand Up @@ -223,11 +221,6 @@ func (a *gcpAssigner) Assign(ctx context.Context, instanceID, zone string, filte
return errors.Wrapf(err, "check if static public IP is already assigned to instance %s", instanceID)
}

// add random sleep to reduce the chance of multiple kubeip instances getting the same address list
waitTime := time.Duration(rand.Intn(maxWaitListTime)) * time.Second //nolint:gosec
a.logger.WithField("waitTime", waitTime).Debug("waiting before listing addresses")
time.Sleep(waitTime)

// get available reserved public IP addresses
addresses, err := a.listAddresses(filter, orderBy, reservedStatus)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Config struct {
RetryAttempts int `json:"retry-attempts"`
// ReleaseOnExit releases the IP address on exit
ReleaseOnExit bool `json:"release-on-exit"`
// LeaseDuration is the duration of the kubernetes lease
LeaseDuration int `json:"lease-duration"`
}

func NewConfig(c *cli.Context) *Config {
Expand All @@ -44,5 +46,6 @@ func NewConfig(c *cli.Context) *Config {
cfg.Region = c.String("region")
cfg.IPv6 = c.Bool("ipv6")
cfg.ReleaseOnExit = c.Bool("release-on-exit")
cfg.LeaseDuration = c.Int("lease-duration")
return &cfg
}
135 changes: 135 additions & 0 deletions internal/lease/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package lease

import (
"context"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/ptr"
)

type KubeLock interface {
Lock(ctx context.Context) error
Unlock(ctx context.Context) error
}

type kubeLeaseLock struct {
client kubernetes.Interface
leaseName string
namespace string
holderIdentity string
leaseDuration int // seconds
cancelFunc context.CancelFunc
}

func NewKubeLeaseLock(client kubernetes.Interface, leaseName, namespace, holderIdentity string, leaseDurationSeconds int) KubeLock {
return &kubeLeaseLock{
client: client,
leaseName: leaseName,
namespace: namespace,
holderIdentity: holderIdentity,
leaseDuration: leaseDurationSeconds,
}
}

func (k *kubeLeaseLock) Lock(ctx context.Context) error {
backoff := wait.Backoff{
Duration: time.Second, // start with 1 second
Factor: 1.5, //nolint:gomnd // multiply by 1.5 on each retry
Jitter: 0.5, //nolint:gomnd // add 50% jitter to wait time on each retry
Steps: 100, //nolint:gomnd // retry 100 times
Cap: time.Hour, // but never wait more than 1 hour
}

return wait.ExponentialBackoff(backoff, func() (bool, error) { //nolint:wrapcheck
timestamp := metav1.MicroTime{Time: time.Now()}
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: k.leaseName,
Namespace: k.namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &k.holderIdentity,
LeaseDurationSeconds: ptr.To(int32(k.leaseDuration)),
AcquireTime: &timestamp,
RenewTime: &timestamp,
},
}

_, err := k.client.CoordinationV1().Leases(k.namespace).Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
if errors.IsAlreadyExists(err) {
// If the lease already exists, check if it's held by another holder
existingLease, getErr := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{})
if getErr != nil {
return false, getErr //nolint:wrapcheck
}
// check if the lease is expired
if existingLease.Spec.RenewTime != nil && time.Since(existingLease.Spec.RenewTime.Time) > time.Duration(k.leaseDuration)*time.Second {
// If the lease is expired, delete it and retry
delErr := k.client.CoordinationV1().Leases(k.namespace).Delete(ctx, k.leaseName, metav1.DeleteOptions{})
if delErr != nil {
return false, delErr //nolint:wrapcheck
}
return false, nil
}
// check if the lease is held by another holder
if existingLease.Spec.HolderIdentity != nil && *existingLease.Spec.HolderIdentity != k.holderIdentity {
// If the lease is held by another holder, return false to retry
return false, nil
}
return true, nil
}
return false, err //nolint:wrapcheck
}

// Create a child context with cancellation
ctx, k.cancelFunc = context.WithCancel(ctx)
go k.renewLeasePeriodically(ctx)

return true, nil
})
}

func (k *kubeLeaseLock) renewLeasePeriodically(ctx context.Context) {
// let's renew the lease every 1/2 of the lease duration; use milliseconds for ticker
ticker := time.NewTicker(time.Duration(k.leaseDuration*500) * time.Millisecond) //nolint:gomnd
defer ticker.Stop()

for {
select {
case <-ticker.C:
lease, err := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{})
if err != nil || lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != k.holderIdentity {
return
}

lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
k.client.CoordinationV1().Leases(k.namespace).Update(ctx, lease, metav1.UpdateOptions{}) //nolint:errcheck
case <-ctx.Done():
// Exit the goroutine when the context is cancelled
return
}
}
}

func (k *kubeLeaseLock) Unlock(ctx context.Context) error {
// Call the cancel function to stop the lease renewal process
if k.cancelFunc != nil {
k.cancelFunc()
}
lease, err := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{})
if err != nil {
return err //nolint:wrapcheck
}

if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != k.holderIdentity {
return nil
}

return k.client.CoordinationV1().Leases(k.namespace).Delete(ctx, k.leaseName, metav1.DeleteOptions{}) //nolint:wrapcheck
}
Loading
Loading