Skip to content

Commit

Permalink
Ephemerial Pools, first attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Feb 2, 2024
1 parent 1b28511 commit 2c38fe7
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 13 deletions.
57 changes: 47 additions & 10 deletions pkg/aws/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,25 @@ func (configMapInfo AwsDynamicConfig) GetInstanceAddress(kubeClient client.Clien
}
if len(res.Reservations) > 0 {
if len(res.Reservations[0].Instances) > 0 {
if res.Reservations[0].Instances[0].PublicDnsName != nil && *res.Reservations[0].Instances[0].PublicDnsName != "" {
instance := res.Reservations[0].Instances[0]
return configMapInfo.checkInstanceConnectivity(&instance, log)
}
}
return "", nil
}

server, _ := net.ResolveTCPAddr("tcp", *res.Reservations[0].Instances[0].PublicDnsName+":22")
conn, err := net.DialTCP("tcp", nil, server)
if err != nil {
log.Error(err, "failed to connect to AWS instance")
return "", err
}
defer conn.Close()
func (configMapInfo AwsDynamicConfig) checkInstanceConnectivity(instance *types.Instance, log *logr.Logger) (string, error) {
if instance.PublicDnsName != nil && *instance.PublicDnsName != "" {

return *res.Reservations[0].Instances[0].PublicDnsName, nil
}
server, _ := net.ResolveTCPAddr("tcp", *instance.PublicDnsName+":22")
conn, err := net.DialTCP("tcp", nil, server)
if err != nil {
log.Error(err, "failed to connect to AWS instance")
return "", err
}
defer conn.Close()

return *instance.PublicDnsName, nil
}
return "", nil
}
Expand All @@ -164,6 +170,37 @@ func (configMapInfo AwsDynamicConfig) TerminateInstance(kubeClient client.Client
return err
}

func (configMapInfo AwsDynamicConfig) ListInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]cloud.CloudVMInstance, error) {
log.Info("attempting to list AWS instances")
cfg, err := config.LoadDefaultConfig(ctx,
config.WithCredentialsProvider(SecretCredentialsProvider{Name: configMapInfo.Secret, Namespace: configMapInfo.SystemNamespace, Client: kubeClient}),
config.WithRegion(configMapInfo.Region))
if err != nil {
return nil, err
}

// Create an EC2 client
ec2Client := ec2.NewFromConfig(cfg)
res, err := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{Filters: []types.Filter{{Name: aws.String("tag:" + cloud.InstanceTag), Values: []string{instanceTag}}, {Name: aws.String("tag:" + MultiPlatformManaged), Values: []string{"true"}}}})
if err != nil {
log.Error(err, "failed to describe instance")
return nil, err
}
ret := []cloud.CloudVMInstance{}
for _, res := range res.Reservations {
for _, inst := range res.Instances {
if inst.State.Name != types.InstanceStateNameTerminated {
address, err := configMapInfo.checkInstanceConnectivity(&inst, log)

Check failure

Code scanning / gosec

Implicit memory aliasing in for loop. Error

Implicit memory aliasing in for loop.
if err == nil {
ret = append(ret, cloud.CloudVMInstance{InstanceId: cloud.InstanceIdentifier(*inst.InstanceId), StartTime: *inst.LaunchTime, Address: address})
log.Info(fmt.Sprintf("counting instance %s towards running count", *inst.InstanceId))
}
}
}
}
return ret, nil
}

type SecretCredentialsProvider struct {
Name string
Namespace string
Expand Down
8 changes: 8 additions & 0 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)

const InstanceTag = "multi-platform-instance"
Expand All @@ -13,7 +14,14 @@ type CloudProvider interface {
TerminateInstance(kubeClient client.Client, log *logr.Logger, ctx context.Context, instance InstanceIdentifier) error
GetInstanceAddress(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceId InstanceIdentifier) (string, error)
CountInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) (int, error)
ListInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]CloudVMInstance, error)
SshUser() string
}

type CloudVMInstance struct {
InstanceId InstanceIdentifier
StartTime time.Time
Address string
}

type InstanceIdentifier string
4 changes: 4 additions & 0 deletions pkg/ibm/ibmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/md5" //#nosec
"encoding/base64"
"encoding/json"
"fmt"
"github.com/IBM/go-sdk-core/v4/core"
"github.com/go-logr/logr"
"github.com/google/uuid"
Expand Down Expand Up @@ -137,6 +138,9 @@ func (r IBMPowerDynamicConfig) GetInstanceAddress(kubeClient client.Client, log
return checkAddressLive(ip, log)
}

func (r IBMPowerDynamicConfig) ListInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]cloud.CloudVMInstance, error) {
return nil, fmt.Errorf("not impelemented")
}
func (r IBMPowerDynamicConfig) TerminateInstance(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceId cloud.InstanceIdentifier) error {
log.Info("attempting to terminate power server %s", "instance", instanceId)
service, err := r.authenticate(kubeClient, ctx)
Expand Down
29 changes: 29 additions & 0 deletions pkg/ibm/ibmz.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,35 @@ func (r IBMZDynamicConfig) CountInstances(kubeClient client.Client, log *logr.Lo
return count, nil
}

func (r IBMZDynamicConfig) ListInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]cloud.CloudVMInstance, error) {
vpcService, err := r.authenticate(kubeClient, ctx)
if err != nil {
return nil, err
}

vpc, err := r.lookupVpc(vpcService)
if err != nil {
return nil, err
}
instances, _, err := vpcService.ListInstances(&vpcv1.ListInstancesOptions{ResourceGroupID: vpc.ResourceGroup.ID, VPCName: &r.Vpc})
if err != nil {
return nil, err
}
ret := []cloud.CloudVMInstance{}
for _, instance := range instances.Instances {
if strings.HasPrefix(*instance.Name, instanceTag) {
identifier := cloud.InstanceIdentifier(*instance.ID)
addr, err := r.GetInstanceAddress(kubeClient, log, ctx, identifier)
if err != nil {
log.Error(err, "not listing instance as address cannot be assigned yet", "instance", *instance.ID)
} else {
ret = append(ret, cloud.CloudVMInstance{InstanceId: identifier, Address: addr, StartTime: time.Time(*instance.CreatedAt)})
}
}
}
return ret, nil
}

func (r IBMZDynamicConfig) lookupSubnet(vpcService *vpcv1.VpcV1) (*vpcv1.Subnet, error) {
subnets, _, err := vpcService.ListSubnets(&vpcv1.ListSubnetsOptions{})
if err != nil {
Expand Down
125 changes: 125 additions & 0 deletions pkg/reconciler/taskrun/dynamicpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package taskrun

import (
"context"
"crypto/rand"
"encoding/hex"
"github.com/go-logr/logr"
"github.com/redhat-appstudio/multi-platform-controller/pkg/cloud"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"
)

type DynamicHostPool struct {
CloudProvider cloud.CloudProvider
SshSecret string
Platform string
MaxInstances int
Concurrency int
MaxAge time.Duration
InstanceTag string
}

func (a DynamicHostPool) buildHostPool(r *ReconcileTaskRun, ctx context.Context, log *logr.Logger, instanceTag string) (*HostPool, int, error) {
ret := map[string]*Host{}
instances, err := a.CloudProvider.ListInstances(r.client, log, ctx, instanceTag)
if err != nil {
return nil, 0, err

Check warning on line 29 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L25-L29

Added lines #L25 - L29 were not covered by tests
}
oldInstanceCount := 0
for _, inst := range instances {
if inst.StartTime.Add(a.MaxAge).Before(time.Now()) {

Check warning on line 33 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L31-L33

Added lines #L31 - L33 were not covered by tests
// These are shut down on deallocation
// TODO: scan for active tasks, and shut down here if not found
oldInstanceCount++
} else {
ret[string(inst.InstanceId)] = &Host{Name: string(inst.InstanceId), Address: inst.Address, User: a.CloudProvider.SshUser(), Concurrency: a.Concurrency, Platform: a.Platform, Secret: a.SshSecret, StartTime: &inst.StartTime}

Check failure

Code scanning / gosec

Implicit memory aliasing in for loop. Error

Implicit memory aliasing in for loop.

Check warning on line 38 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L36-L38

Added lines #L36 - L38 were not covered by tests
}
}
return &HostPool{hosts: ret}, oldInstanceCount, nil

Check warning on line 41 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L41

Added line #L41 was not covered by tests
}

func (a DynamicHostPool) Deallocate(r *ReconcileTaskRun, ctx context.Context, log *logr.Logger, tr *v1.TaskRun, secretName string, selectedHost string) error {

Check warning on line 44 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L44

Added line #L44 was not covered by tests

hostPool, oldInstanceCount, err := a.buildHostPool(r, ctx, log, a.InstanceTag)
if err != nil {
return err

Check warning on line 48 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L46-L48

Added lines #L46 - L48 were not covered by tests
}
err = hostPool.Deallocate(r, ctx, log, tr, secretName, selectedHost)
if err != nil {
return err

Check warning on line 52 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L50-L52

Added lines #L50 - L52 were not covered by tests
}
if oldInstanceCount > 0 {

Check warning on line 54 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L54

Added line #L54 was not covered by tests
// Maybe this is an old instance
startTime := hostPool.hosts[selectedHost].StartTime
if startTime.Add(a.MaxAge).Before(time.Now()) {

Check warning on line 57 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L56-L57

Added lines #L56 - L57 were not covered by tests
// Old host, check if other tasks are using it
trs := v1.TaskRunList{}
err := r.client.List(ctx, &trs, client.MatchingLabels{AssignedHost: selectedHost})
if err != nil {
return err

Check warning on line 62 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L59-L62

Added lines #L59 - L62 were not covered by tests
}
if len(trs.Items) == 0 {
log.Info("deallocating old instance", "instance", selectedHost)

Check warning on line 65 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}
err = a.CloudProvider.TerminateInstance(r.client, log, ctx, cloud.InstanceIdentifier(selectedHost))
if err != nil {
return err

Check warning on line 69 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L67-L69

Added lines #L67 - L69 were not covered by tests
}
}
}
return nil

Check warning on line 73 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L73

Added line #L73 was not covered by tests
}

func (a DynamicHostPool) Allocate(r *ReconcileTaskRun, ctx context.Context, log *logr.Logger, tr *v1.TaskRun, secretName string, instanceTag string) (reconcile.Result, error) {

Check warning on line 76 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L76

Added line #L76 was not covered by tests

hostPool, oldInstanceCount, err := a.buildHostPool(r, ctx, log, instanceTag)
if err != nil {
return reconcile.Result{}, err

Check warning on line 80 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L78-L80

Added lines #L78 - L80 were not covered by tests
}
_, err = hostPool.Allocate(r, ctx, log, tr, secretName, instanceTag)
if err != nil {
return reconcile.Result{}, err

Check warning on line 84 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L82-L84

Added lines #L82 - L84 were not covered by tests
}
if tr.Labels == nil || tr.Labels[WaitingForPlatformLabel] == "" {

Check warning on line 86 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L86

Added line #L86 was not covered by tests
//We only need to launch an instance if the task run is waiting for a label
return reconcile.Result{}, err

Check warning on line 88 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L88

Added line #L88 was not covered by tests
}

// Count will handle instances that are not ready yet
count, err := a.CloudProvider.CountInstances(r.client, log, ctx, instanceTag)
if err != nil {
return reconcile.Result{}, err

Check warning on line 94 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L92-L94

Added lines #L92 - L94 were not covered by tests
}
// We don't count old instances towards the total, as they will shut down soon
if count-oldInstanceCount >= a.MaxInstances {
log.Info("cannot provision new instances")

Check warning on line 98 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L97-L98

Added lines #L97 - L98 were not covered by tests
// Too many instances, we just have to wait
return reconcile.Result{RequeueAfter: time.Minute}, err

Check warning on line 100 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L100

Added line #L100 was not covered by tests
}
name, err := getRandomString(8)
if err != nil {
return reconcile.Result{}, err

Check warning on line 104 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L102-L104

Added lines #L102 - L104 were not covered by tests
}

// Counter intuitively we don't need the instance id
// It will be picked up on the list call
inst, err := a.CloudProvider.LaunchInstance(r.client, log, ctx, name, instanceTag)
if err != nil {
return reconcile.Result{}, err

Check warning on line 111 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L109-L111

Added lines #L109 - L111 were not covered by tests
}

log.Info("allocated instance", "instance", inst)
return reconcile.Result{RequeueAfter: time.Minute}, err

Check warning on line 115 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L114-L115

Added lines #L114 - L115 were not covered by tests

}

func getRandomString(length int) (string, error) {
bytes := make([]byte, length/2+1)
if _, err := rand.Read(bytes); err != nil {
return "", err

Check warning on line 122 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L119-L122

Added lines #L119 - L122 were not covered by tests
}
return hex.EncodeToString(bytes)[0:length], nil

Check warning on line 124 in pkg/reconciler/taskrun/dynamicpool.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/dynamicpool.go#L124

Added line #L124 was not covered by tests
}
38 changes: 35 additions & 3 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ const (

ServiceAccountName = "multi-platform-controller"

PlatformParam = "PLATFORM"
DynamicPlatforms = "dynamic-platforms"
AllowedNamespaces = "allowed-namespaces"
PlatformParam = "PLATFORM"
DynamicPlatforms = "dynamic-platforms"
DynamicPoolPlatforms = "dynamic-pool-platforms"
AllowedNamespaces = "allowed-namespaces"
)

type ReconcileTaskRun struct {
Expand Down Expand Up @@ -495,6 +496,36 @@ func (r *ReconcileTaskRun) readConfiguration(ctx context.Context, log *logr.Logg
}
}

dynamicPool := cm.Data[DynamicPoolPlatforms]
for _, platform := range strings.Split(dynamicPool, ",") {
platformConfigName := strings.ReplaceAll(platform, "/", "-")
if platform == targetPlatform {

typeName := cm.Data["dynamic-pool."+platformConfigName+".type"]
allocfunc := r.cloudProviders[typeName]
if allocfunc == nil {
return nil, "", errors2.New("unknown dynamic provisioning type " + typeName)

Check warning on line 507 in pkg/reconciler/taskrun/taskrun.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/taskrun.go#L504-L507

Added lines #L504 - L507 were not covered by tests
}
maxInstances, err := strconv.Atoi(cm.Data["dynamic-pool."+platformConfigName+".max-instances"])
if err != nil {
return nil, "", err

Check warning on line 511 in pkg/reconciler/taskrun/taskrun.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/taskrun.go#L509-L511

Added lines #L509 - L511 were not covered by tests
}
concurrency, err := strconv.Atoi(cm.Data["dynamic-pool."+platformConfigName+".concurrency"])
if err != nil {
return nil, "", err

Check warning on line 515 in pkg/reconciler/taskrun/taskrun.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/taskrun.go#L513-L515

Added lines #L513 - L515 were not covered by tests
}
return DynamicHostPool{
CloudProvider: allocfunc(platformConfigName, cm.Data, r.operatorNamespace),
SshSecret: cm.Data["dynamic-pool."+platformConfigName+".ssh-secret"],
Platform: platform,
MaxInstances: maxInstances,
MaxAge: time.Hour * 24, //TODO Configurable
Concurrency: concurrency,
InstanceTag: cm.Data["instance-tag"],
}, cm.Data["instance-tag"], nil

Check warning on line 525 in pkg/reconciler/taskrun/taskrun.go

View check run for this annotation

Codecov / codecov/patch

pkg/reconciler/taskrun/taskrun.go#L517-L525

Added lines #L517 - L525 were not covered by tests
}
}

ret := HostPool{hosts: map[string]*Host{}, targetPlatform: targetPlatform}
for k, v := range cm.Data {
if !strings.HasPrefix(k, "host.") {
Expand Down Expand Up @@ -596,6 +627,7 @@ type Host struct {
Concurrency int
Platform string
Secret string
StartTime *time.Time // Only used for the dynamic pool
}

func platformLabel(platform string) string {
Expand Down
5 changes: 5 additions & 0 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,11 @@ type MockCloud struct {
Addressses map[cloud.InstanceIdentifier]string
}

func (m *MockCloud) ListInstances(kubeClient runtimeclient.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]cloud.CloudVMInstance, error) {
//TODO implement me
panic("implement me")
}

func (m *MockCloud) CountInstances(kubeClient runtimeclient.Client, log *logr.Logger, ctx context.Context, instanceTag string) (int, error) {
return m.Running, nil
}
Expand Down

0 comments on commit 2c38fe7

Please sign in to comment.