Skip to content

Commit

Permalink
update/aws-tests (#123)
Browse files Browse the repository at this point in the history
* more tests for AWS flow
* build pr-xxx docker
* var for kubeip-agent image version
  • Loading branch information
alexei-led authored Oct 28, 2023
1 parent 749801e commit ce43fbb
Show file tree
Hide file tree
Showing 10 changed files with 619 additions and 100 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ jobs:
runs-on: ubuntu-latest
needs: validate
# build only on master branch and tags
if: ${{ !contains(github.event.head_commit.message,'[skip ci]') && (github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/tags/'))) }}
if: ${{
!contains(github.event.head_commit.message, '[skip ci]') &&
(
(github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/tags/'))) ||
(github.event_name == 'pull_request' && github.event.pull_request.draft == false)
)
}}
steps:
- name: checkout
uses: actions/checkout@v4
Expand All @@ -82,7 +88,6 @@ jobs:
uses: docker/setup-buildx-action@v3

- name: login to DockerHub
if: ${{ github.event_name != 'pull_request' }}
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
Expand Down
7 changes: 5 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assign

for {
err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy)
if err != nil && errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
log.Infof("static public IP address already assigned to node instance %s", node.Instance)
return nil
}
if err != nil {
log.WithError(err).Errorf("failed to assign static public IP address to node %s", node.Name)
if retryCounter < cfg.RetryAttempts {
Expand All @@ -105,9 +109,8 @@ func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assign
return errors.Wrap(err, "context is done")
}
}
break
return nil
}
return nil
}

func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
Expand Down
2 changes: 1 addition & 1 deletion examples/aws/eks.tf
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ resource "kubernetes_daemonset" "kubeip_daemonset" {
priority_class_name = "system-node-critical"
container {
name = "kubeip-agent"
image = "doitintl/kubeip-agent"
image = "doitintl/kubeip-agent:${var.kubeip_version}"
env {
name = "NODE_NAME"
value_from {
Expand Down
5 changes: 5 additions & 0 deletions examples/aws/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ variable "public_cidr_ranges" {
variable "kubernetes_version" {
type = string
default = "1.28"
}

variable "kubeip_version" {
type = string
default = "latest"
}
2 changes: 1 addition & 1 deletion examples/gcp/gke.tf
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ resource "kubernetes_daemonset" "kubeip_daemonset" {
priority_class_name = "system-node-critical"
container {
name = "kubeip-agent"
image = "doitintl/kubeip-agent"
image = "doitintl/kubeip-agent:${var.kubeip_version}"
env {
name = "NODE_NAME"
value_from {
Expand Down
5 changes: 5 additions & 0 deletions examples/gcp/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ variable "machine_type" {
variable "ipv6_support" {
type = bool
default = false
}

variable "kubeip_version" {
type = string
default = "latest"
}
4 changes: 3 additions & 1 deletion internal/address/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
)

var (
ErrUnknownCloudProvider = errors.New("unknown cloud provider")
ErrUnknownCloudProvider = errors.New("unknown cloud provider")
ErrStaticIPAlreadyAssigned = errors.New("static public IP already assigned")
ErrNoStaticIPAssigned = errors.New("no static public IP assigned")
)

type Assigner interface {
Expand Down
184 changes: 104 additions & 80 deletions internal/address/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,54 +165,77 @@ func (a *awsAssigner) forceCheckAddressAssigned(ctx context.Context, allocationI
return false, nil
}

//nolint:funlen,gocyclo
func (a *awsAssigner) Assign(ctx context.Context, instanceID, _ string, filter []string, orderBy string) error {
// get elastic IP attached to the instance
filters := make(map[string][]string)
filters["instance-id"] = []string{instanceID}
addresses, err := a.eipLister.List(ctx, filters, true)
err := a.checkElasticIPAssigned(ctx, instanceID)
if err != nil {
return errors.Wrapf(err, "failed to list elastic IPs attached to instance %s", instanceID)
return errors.Wrapf(err, "check if elastic IP is already assigned to instance %s", instanceID)
}
if len(addresses) > 0 {
a.logger.Infof("elastic IP %s is already attached to instance %s", *addresses[0].PublicIp, instanceID)
return nil

// get available elastic IPs based on filter and orderBy
addresses, err := a.getAvailableElasticIPs(ctx, filter, orderBy)
if err != nil {
return errors.Wrap(err, "failed to get available elastic IPs")
}

// get available elastic IPs
filters = make(map[string][]string)
for _, f := range filter {
name, values, err2 := parseShorthandFilter(f)
if err2 != nil {
return errors.Wrapf(err2, "failed to parse filter %s", f)
}
filters[name] = values
// get EC2 instance
instance, err := a.instanceGetter.Get(ctx, instanceID, a.region)
if err != nil {
return errors.Wrapf(err, "failed to get instance %s", instanceID)
}
addresses, err = a.eipLister.List(context.Background(), filters, false)
// get primary network interface ID with public IP address (DeviceIndex == 0)
networkInterfaceID, err := a.getNetworkInterfaceID(instance)
if err != nil {
return errors.Wrap(err, "failed to list available elastic IPs")
return errors.Wrapf(err, "failed to get network interface ID for instance %s", instanceID)
}

// if no available elastic IPs, return error
if len(addresses) == 0 {
return errors.Errorf("no available elastic IPs")
// try to assign available addresses until succeeds
// due to concurrency, it is possible that another kubeip instance will assign the same address
for i := range addresses {
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
"networkInterfaceID": networkInterfaceID,
}).Debug("assigning elastic IP to the instance")
err = a.tryAssignAddress(ctx, &addresses[i], networkInterfaceID, instanceID)
if err != nil {
a.logger.WithError(err).Warn("failed to assign elastic IP address")
a.logger.Debug("retrying with another address")
} else {
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
}).Info("elastic IP assigned to the instance")
break // break if address assigned successfully
}
}

// log available addresses IPs
ips := make([]string, 0, len(addresses))
for _, address := range addresses {
ips = append(ips, *address.PublicIp)
if err != nil {
return errors.Wrap(err, "failed to assign elastic IP address")
}
a.logger.WithField("addresses", ips).Debugf("found %d available addresses", len(addresses))
return nil
}

// get EC2 instance
instance, err := a.instanceGetter.Get(ctx, instanceID, a.region)
func (a *awsAssigner) tryAssignAddress(ctx context.Context, address *types.Address, networkInterfaceID, instanceID string) error {
// force check if address is already assigned (reduce the chance of assigning the same address by multiple kubeip instances)
addressAssigned, err := a.forceCheckAddressAssigned(ctx, *address.AllocationId)
if err != nil {
return errors.Wrapf(err, "failed to get instance %s", instanceID)
return errors.Wrapf(err, "failed to check if address %s is assigned", *address.PublicIp)
}
if addressAssigned {
return errors.Errorf("address %s is already assigned", *address.PublicIp)
}
if err = a.eipAssigner.Assign(ctx, networkInterfaceID, *address.AllocationId); err != nil {
return errors.Wrapf(err, "failed to assign elastic IP %s to the instance %s", *address.PublicIp, instanceID)
}
return nil
}

func (a *awsAssigner) getNetworkInterfaceID(instance *types.Instance) (string, error) {
// get network interface ID
if instance.NetworkInterfaces == nil || len(instance.NetworkInterfaces) == 0 {
return errors.Errorf("no network interfaces found for instance %s", instanceID)
return "", errors.Errorf("no network interfaces found for instance %s", *instance.InstanceId)
}
// get primary network interface ID with public IP address (DeviceIndex == 0)
networkInterfaceID := ""
Expand All @@ -224,72 +247,73 @@ func (a *awsAssigner) Assign(ctx context.Context, instanceID, _ string, filter [
}
}
if networkInterfaceID == "" {
return errors.Errorf("no network interfaces with public IP address found for instance %s", instanceID)
return "", errors.Errorf("no network interfaces with public IP address found for instance %s", *instance.InstanceId)
}
return networkInterfaceID, nil
}

// sort addresses by orderBy field
sortAddressesByField(addresses, orderBy)

// try to assign all available addresses until one succeeds
// due to concurrency, it is possible that another kubeip instance will assign the same address
for i := range addresses {
// force check if address is already assigned (reduce the chance of assigning the same address by multiple kubeip instances)
var addressAssigned bool
addressAssigned, err = a.forceCheckAddressAssigned(ctx, *addresses[i].AllocationId)
if err != nil {
a.logger.WithError(err).Errorf("failed to check if address %s is assigned", *addresses[i].PublicIp)
a.logger.Debug("trying next address")
continue
}
if addressAssigned {
a.logger.WithField("address", addresses[i].PublicIp).Debug("address is already assigned")
a.logger.Debug("trying next address")
continue
}
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
"networkInterfaceID": networkInterfaceID,
}).Debug("assigning elastic IP to the instance")
if err = a.eipAssigner.Assign(ctx, networkInterfaceID, *addresses[i].AllocationId); err != nil {
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
"networkInterfaceID": networkInterfaceID,
}).Debug("failed to assign elastic IP to the instance")
a.logger.Debug("trying next address")
continue
}
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
}).Info("elastic IP assigned to the instance")
break
}
func (a *awsAssigner) checkElasticIPAssigned(ctx context.Context, instanceID string) error {
filters := make(map[string][]string)
filters["instance-id"] = []string{instanceID}
addresses, err := a.eipLister.List(ctx, filters, true)
if err != nil {
return errors.Wrap(err, "failed to assign elastic IP address")
return errors.Wrapf(err, "failed to list elastic IPs attached to instance %s", instanceID)
}
if len(addresses) > 0 {
return ErrStaticIPAlreadyAssigned
}
return nil
}

func (a *awsAssigner) Unassign(ctx context.Context, instanceID, _ string) error {
func (a *awsAssigner) getAssignedElasticIP(ctx context.Context, instanceID string) (*types.Address, error) {
// get elastic IP attached to the instance
filters := make(map[string][]string)
filters["instance-id"] = []string{instanceID}
addresses, err := a.eipLister.List(ctx, filters, true)
if err != nil {
return errors.Wrapf(err, "failed to list elastic IPs attached to instance %s", instanceID)
return nil, errors.Wrapf(err, "failed to list elastic IPs attached to instance %s", instanceID)
}
if len(addresses) == 0 {
return nil, ErrNoStaticIPAssigned
}
return &addresses[0], nil
}

func (a *awsAssigner) getAvailableElasticIPs(ctx context.Context, filter []string, orderBy string) ([]types.Address, error) {
filters := make(map[string][]string)
for _, f := range filter {
name, values, err := parseShorthandFilter(f)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse filter %s", f)
}
filters[name] = values
}
addresses, err := a.eipLister.List(ctx, filters, false)
if err != nil {
return nil, errors.Wrap(err, "failed to list available elastic IPs")
}
if len(addresses) == 0 {
a.logger.Infof("no elastic IP attached to instance %s", instanceID)
return nil
return nil, errors.Errorf("no available elastic IPs")
}
// sort addresses by orderBy field
sortAddressesByField(addresses, orderBy)
// log available addresses IPs
ips := make([]string, 0, len(addresses))
for _, address := range addresses {
ips = append(ips, *address.PublicIp)
}
a.logger.WithField("addresses", ips).Debugf("Found %d available addresses", len(addresses))

return addresses, nil
}

func (a *awsAssigner) Unassign(ctx context.Context, instanceID, _ string) error {
// get elastic IP attached to the instance
address, err := a.getAssignedElasticIP(ctx, instanceID)
if err != nil {
return errors.Wrapf(err, "check if elastic IP is assigned to instance %s", instanceID)
}
// unassign elastic IP from the instance
address := addresses[0]
if err = a.eipAssigner.Unassign(ctx, *address.AssociationId); err != nil {
return errors.Wrap(err, "failed to unassign elastic IP")
}
Expand Down
Loading

0 comments on commit ce43fbb

Please sign in to comment.