diff --git a/cmd/main.go b/cmd/main.go index 7f8992f..6d736cc 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,6 +9,7 @@ import ( "github.com/doitintl/kubeip/internal/address" "github.com/doitintl/kubeip/internal/config" + "github.com/doitintl/kubeip/internal/lease" nd "github.com/doitintl/kubeip/internal/node" "github.com/doitintl/kubeip/internal/types" "github.com/pkg/errors" @@ -23,8 +24,10 @@ import ( type contextKey string const ( - developModeKey contextKey = "develop-mode" - unassignTimeout = 5 * time.Minute + developModeKey contextKey = "develop-mode" + unassignTimeout = 5 * time.Minute + kubeipLockName = "kubeip-lock" + defaultLeaseDuration = 5 ) var ( @@ -79,7 +82,7 @@ func prepareLogger(level string, json bool) *logrus.Entry { return log } -func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assigner, node *types.Node, cfg *config.Config) error { +func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Interface, assigner address.Assigner, node *types.Node, cfg *config.Config) error { ctx, cancel := context.WithCancel(c) defer cancel() @@ -87,6 +90,9 @@ func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assign ticker := time.NewTicker(cfg.RetryInterval) defer ticker.Stop() + // create new cluster wide lock + lock := lease.NewKubeLeaseLock(client, kubeipLockName, "default", node.Instance, cfg.LeaseDuration) + for retryCounter := 0; retryCounter <= cfg.RetryAttempts; retryCounter++ { log.WithFields(logrus.Fields{ "node": node.Name, @@ -95,7 +101,20 @@ func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assign "retry-counter": retryCounter, "retry-attempts": cfg.RetryAttempts, }).Debug("assigning static public IP address to node") - err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy) + err := func(ctx context.Context) error { + if err := lock.Lock(ctx); err != nil { + return errors.Wrap(err, "failed to acquire lock") + } + log.Debug("lock acquired") + defer func() { + lock.Unlock(ctx) //nolint:errcheck + log.Debug("lock released") + }() + if err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy); err != nil { + return err //nolint:wrapcheck + } + return nil + }(c) if err == nil || errors.Is(err, address.ErrStaticIPAlreadyAssigned) { return nil } @@ -152,7 +171,7 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error { errorCh := make(chan error, 1) // buffered channel to avoid goroutine leak go func() { defer close(errorCh) // close the channel when the goroutine exits to avoid goroutine leak - e := assignAddress(ctx, log, assigner, n, cfg) + e := assignAddress(ctx, log, clientset, assigner, n, cfg) if e != nil { errorCh <- e } @@ -267,6 +286,13 @@ func main() { EnvVars: []string{"RETRY_ATTEMPTS"}, Category: "Configuration", }, + &cli.IntFlag{ + Name: "lease-duration", + Usage: "duration of the kubernetes lease", + Value: defaultLeaseDuration, + EnvVars: []string{"LEASE_DURATION"}, + Category: "Configuration", + }, &cli.BoolFlag{ Name: "release-on-exit", Usage: "release the static public IP address on exit", diff --git a/cmd/main_test.go b/cmd/main_test.go index b8bba4f..68439eb 100644 --- a/cmd/main_test.go +++ b/cmd/main_test.go @@ -11,6 +11,7 @@ import ( mocks "github.com/doitintl/kubeip/mocks/address" "github.com/pkg/errors" tmock "github.com/stretchr/testify/mock" + "k8s.io/client-go/kubernetes/fake" ) func Test_assignAddress(t *testing.T) { @@ -45,6 +46,7 @@ func Test_assignAddress(t *testing.T) { OrderBy: "test-order-by", RetryAttempts: 3, RetryInterval: time.Millisecond, + LeaseDuration: 1, }, }, }, @@ -70,6 +72,7 @@ func Test_assignAddress(t *testing.T) { OrderBy: "test-order-by", RetryAttempts: 3, RetryInterval: time.Millisecond, + LeaseDuration: 1, }, }, }, @@ -93,6 +96,7 @@ func Test_assignAddress(t *testing.T) { OrderBy: "test-order-by", RetryAttempts: 3, RetryInterval: time.Millisecond, + LeaseDuration: 1, }, }, wantErr: true, @@ -125,6 +129,7 @@ func Test_assignAddress(t *testing.T) { OrderBy: "test-order-by", RetryAttempts: 10, RetryInterval: 5 * time.Millisecond, + LeaseDuration: 1, }, }, wantErr: true, @@ -152,6 +157,7 @@ func Test_assignAddress(t *testing.T) { OrderBy: "test-order-by", RetryAttempts: 3, RetryInterval: 15 * time.Millisecond, + LeaseDuration: 1, }, }, wantErr: true, @@ -161,7 +167,8 @@ func Test_assignAddress(t *testing.T) { t.Run(tt.name, func(t *testing.T) { log := prepareLogger("debug", false) assigner := tt.args.assignerFn(t) - if err := assignAddress(tt.args.c, log, assigner, tt.args.node, tt.args.cfg); (err != nil) != tt.wantErr { + client := fake.NewSimpleClientset() + if err := assignAddress(tt.args.c, log, client, assigner, tt.args.node, tt.args.cfg); (err != nil) != tt.wantErr { t.Errorf("assignAddress() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/examples/aws/eks.tf b/examples/aws/eks.tf index 63db025..c4f3ff3 100644 --- a/examples/aws/eks.tf +++ b/examples/aws/eks.tf @@ -167,6 +167,11 @@ resource "kubernetes_cluster_role" "kubeip_cluster_role" { resources = ["nodes"] verbs = ["get"] } + rule { + api_groups = ["coordination.k8s.io"] + resources = ["leases"] + verbs = ["create", "delete", "get", "list", "update"] + } depends_on = [ kubernetes_service_account.kubeip_service_account, module.eks diff --git a/examples/gcp/gke.tf b/examples/gcp/gke.tf index 40276be..faab4ee 100644 --- a/examples/gcp/gke.tf +++ b/examples/gcp/gke.tf @@ -227,6 +227,11 @@ resource "kubernetes_cluster_role" "kubeip_cluster_role" { resources = ["nodes"] verbs = ["get"] } + rule { + api_groups = ["coordination.k8s.io"] + resources = ["leases"] + verbs = ["create", "delete", "get", "list", "update"] + } depends_on = [ kubernetes_service_account.kubeip_service_account, google_container_cluster.kubeip_cluster @@ -303,6 +308,10 @@ resource "kubernetes_daemonset" "kubeip_daemonset" { name = "LOG_JSON" value = "true" } + env { + name = "LEASE_DURATION" + value = "20" + } resources { requests = { cpu = "100m" diff --git a/internal/address/gcp.go b/internal/address/gcp.go index 13afa99..8927ee1 100644 --- a/internal/address/gcp.go +++ b/internal/address/gcp.go @@ -3,7 +3,6 @@ package address import ( "context" "fmt" - "math/rand" "strings" "time" @@ -27,7 +26,6 @@ const ( accessConfigKind = "compute#accessConfig" defaultPrefixLength = 96 maxRetries = 10 // number of retries for assigning ephemeral public IP address - maxWaitListTime = 10 // max time to wait before listing addresses ) var ( @@ -223,11 +221,6 @@ func (a *gcpAssigner) Assign(ctx context.Context, instanceID, zone string, filte return errors.Wrapf(err, "check if static public IP is already assigned to instance %s", instanceID) } - // add random sleep to reduce the chance of multiple kubeip instances getting the same address list - waitTime := time.Duration(rand.Intn(maxWaitListTime)) * time.Second //nolint:gosec - a.logger.WithField("waitTime", waitTime).Debug("waiting before listing addresses") - time.Sleep(waitTime) - // get available reserved public IP addresses addresses, err := a.listAddresses(filter, orderBy, reservedStatus) if err != nil { diff --git a/internal/config/config.go b/internal/config/config.go index 5900bf3..ddd4e1d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,6 +29,8 @@ type Config struct { RetryAttempts int `json:"retry-attempts"` // ReleaseOnExit releases the IP address on exit ReleaseOnExit bool `json:"release-on-exit"` + // LeaseDuration is the duration of the kubernetes lease + LeaseDuration int `json:"lease-duration"` } func NewConfig(c *cli.Context) *Config { @@ -44,5 +46,6 @@ func NewConfig(c *cli.Context) *Config { cfg.Region = c.String("region") cfg.IPv6 = c.Bool("ipv6") cfg.ReleaseOnExit = c.Bool("release-on-exit") + cfg.LeaseDuration = c.Int("lease-duration") return &cfg } diff --git a/internal/lease/lock.go b/internal/lease/lock.go new file mode 100644 index 0000000..352f09c --- /dev/null +++ b/internal/lease/lock.go @@ -0,0 +1,135 @@ +package lease + +import ( + "context" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" +) + +type KubeLock interface { + Lock(ctx context.Context) error + Unlock(ctx context.Context) error +} + +type kubeLeaseLock struct { + client kubernetes.Interface + leaseName string + namespace string + holderIdentity string + leaseDuration int // seconds + cancelFunc context.CancelFunc +} + +func NewKubeLeaseLock(client kubernetes.Interface, leaseName, namespace, holderIdentity string, leaseDurationSeconds int) KubeLock { + return &kubeLeaseLock{ + client: client, + leaseName: leaseName, + namespace: namespace, + holderIdentity: holderIdentity, + leaseDuration: leaseDurationSeconds, + } +} + +func (k *kubeLeaseLock) Lock(ctx context.Context) error { + backoff := wait.Backoff{ + Duration: time.Second, // start with 1 second + Factor: 1.5, //nolint:gomnd // multiply by 1.5 on each retry + Jitter: 0.5, //nolint:gomnd // add 50% jitter to wait time on each retry + Steps: 100, //nolint:gomnd // retry 100 times + Cap: time.Hour, // but never wait more than 1 hour + } + + return wait.ExponentialBackoff(backoff, func() (bool, error) { //nolint:wrapcheck + timestamp := metav1.MicroTime{Time: time.Now()} + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: k.leaseName, + Namespace: k.namespace, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &k.holderIdentity, + LeaseDurationSeconds: ptr.To(int32(k.leaseDuration)), + AcquireTime: ×tamp, + RenewTime: ×tamp, + }, + } + + _, err := k.client.CoordinationV1().Leases(k.namespace).Create(ctx, lease, metav1.CreateOptions{}) + if err != nil { + if errors.IsAlreadyExists(err) { + // If the lease already exists, check if it's held by another holder + existingLease, getErr := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{}) + if getErr != nil { + return false, getErr //nolint:wrapcheck + } + // check if the lease is expired + if existingLease.Spec.RenewTime != nil && time.Since(existingLease.Spec.RenewTime.Time) > time.Duration(k.leaseDuration)*time.Second { + // If the lease is expired, delete it and retry + delErr := k.client.CoordinationV1().Leases(k.namespace).Delete(ctx, k.leaseName, metav1.DeleteOptions{}) + if delErr != nil { + return false, delErr //nolint:wrapcheck + } + return false, nil + } + // check if the lease is held by another holder + if existingLease.Spec.HolderIdentity != nil && *existingLease.Spec.HolderIdentity != k.holderIdentity { + // If the lease is held by another holder, return false to retry + return false, nil + } + return true, nil + } + return false, err //nolint:wrapcheck + } + + // Create a child context with cancellation + ctx, k.cancelFunc = context.WithCancel(ctx) + go k.renewLeasePeriodically(ctx) + + return true, nil + }) +} + +func (k *kubeLeaseLock) renewLeasePeriodically(ctx context.Context) { + // let's renew the lease every 1/2 of the lease duration; use milliseconds for ticker + ticker := time.NewTicker(time.Duration(k.leaseDuration*500) * time.Millisecond) //nolint:gomnd + defer ticker.Stop() + + for { + select { + case <-ticker.C: + lease, err := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{}) + if err != nil || lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != k.holderIdentity { + return + } + + lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} + k.client.CoordinationV1().Leases(k.namespace).Update(ctx, lease, metav1.UpdateOptions{}) //nolint:errcheck + case <-ctx.Done(): + // Exit the goroutine when the context is cancelled + return + } + } +} + +func (k *kubeLeaseLock) Unlock(ctx context.Context) error { + // Call the cancel function to stop the lease renewal process + if k.cancelFunc != nil { + k.cancelFunc() + } + lease, err := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{}) + if err != nil { + return err //nolint:wrapcheck + } + + if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != k.holderIdentity { + return nil + } + + return k.client.CoordinationV1().Leases(k.namespace).Delete(ctx, k.leaseName, metav1.DeleteOptions{}) //nolint:wrapcheck +} diff --git a/internal/lease/lock_test.go b/internal/lease/lock_test.go new file mode 100644 index 0000000..bf6444b --- /dev/null +++ b/internal/lease/lock_test.go @@ -0,0 +1,136 @@ +package lease + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/ptr" +) + +func TestLockAndUnlock(t *testing.T) { + tests := []struct { + name string + leaseExists bool + holderIdentity string + leaseCurrentHolder *string + leaseDuration int + skipLock bool + expectLockErr bool + expectUnlockErr bool + }{ + { + name: "Lock acquires lease when none exists", + holderIdentity: "test-holder", + leaseExists: false, + leaseDuration: 1, + }, + { + name: "Lock acquires lease when held by another and expires", + leaseExists: true, + holderIdentity: "test-holder", + leaseCurrentHolder: ptr.To("another-holder"), + leaseDuration: 2, + }, + { + name: "Unlock releases lease", + leaseExists: true, + holderIdentity: "test-holder", + leaseCurrentHolder: ptr.To("test-holder"), + leaseDuration: 1, + }, + { + name: "Unlock does not release lease when locked by another holder", + leaseExists: true, + leaseCurrentHolder: ptr.To("another-holder"), + holderIdentity: "test-holder", + leaseDuration: 1, + skipLock: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client := fake.NewSimpleClientset() + if tt.leaseExists { + timestamp := metav1.MicroTime{Time: time.Now()} + lease := &v1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-lease", + Namespace: "test-namespace", + }, + Spec: v1.LeaseSpec{ + HolderIdentity: tt.leaseCurrentHolder, + LeaseDurationSeconds: ptr.To(int32(1)), + AcquireTime: ×tamp, + RenewTime: ×tamp, + }, + } + + _, err := client.CoordinationV1().Leases("test-namespace").Create(context.Background(), lease, metav1.CreateOptions{}) + require.NoError(t, err) + } + + lock := NewKubeLeaseLock(client, "test-lease", "test-namespace", tt.holderIdentity, tt.leaseDuration) + + if !tt.skipLock { + err := lock.Lock(context.Background()) + if tt.expectLockErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + } + + err := lock.Unlock(context.Background()) + if tt.expectUnlockErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestConcurrentLock(t *testing.T) { + client := fake.NewSimpleClientset() + + var wg sync.WaitGroup + wg.Add(2) + + // Goroutine 1: Acquire the lock and hold it for 5 seconds + go func() { + defer wg.Done() + lock := NewKubeLeaseLock(client, "test-lease", "test-namespace", "test-holder-1", 5) + err := lock.Lock(context.Background()) + assert.NoError(t, err) + fmt.Println("Lock acquired by goroutine 1") + time.Sleep(2 * time.Second) + err = lock.Unlock(context.Background()) + assert.NoError(t, err) + fmt.Println("Lock released by goroutine 1") + }() + + time.Sleep(100 * time.Millisecond) + + // Goroutine 2: Try to acquire the lock and wait until it succeeds + go func() { + defer wg.Done() + lock := NewKubeLeaseLock(client, "test-lease", "test-namespace", "test-holder-2", 5) + err := lock.Lock(context.Background()) + assert.NoError(t, err) + fmt.Println("Lock acquired by goroutine 2") + err = lock.Unlock(context.Background()) + assert.NoError(t, err) + fmt.Println("Lock released by goroutine 2") + }() + + wg.Wait() +}