From 2c38fe7c71ddca97bc9c314ad08fab2edd5c5be9 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Fri, 2 Feb 2024 13:45:11 +1100 Subject: [PATCH] Ephemerial Pools, first attempt --- pkg/aws/ec2.go | 57 +++++++++-- pkg/cloud/cloud.go | 8 ++ pkg/ibm/ibmp.go | 4 + pkg/ibm/ibmz.go | 29 ++++++ pkg/reconciler/taskrun/dynamicpool.go | 125 +++++++++++++++++++++++++ pkg/reconciler/taskrun/taskrun.go | 38 +++++++- pkg/reconciler/taskrun/taskrun_test.go | 5 + 7 files changed, 253 insertions(+), 13 deletions(-) create mode 100644 pkg/reconciler/taskrun/dynamicpool.go diff --git a/pkg/aws/ec2.go b/pkg/aws/ec2.go index 834e3e85..bc9a924d 100644 --- a/pkg/aws/ec2.go +++ b/pkg/aws/ec2.go @@ -130,19 +130,25 @@ func (configMapInfo AwsDynamicConfig) GetInstanceAddress(kubeClient client.Clien } if len(res.Reservations) > 0 { if len(res.Reservations[0].Instances) > 0 { - if res.Reservations[0].Instances[0].PublicDnsName != nil && *res.Reservations[0].Instances[0].PublicDnsName != "" { + instance := res.Reservations[0].Instances[0] + return configMapInfo.checkInstanceConnectivity(&instance, log) + } + } + return "", nil +} - server, _ := net.ResolveTCPAddr("tcp", *res.Reservations[0].Instances[0].PublicDnsName+":22") - conn, err := net.DialTCP("tcp", nil, server) - if err != nil { - log.Error(err, "failed to connect to AWS instance") - return "", err - } - defer conn.Close() +func (configMapInfo AwsDynamicConfig) checkInstanceConnectivity(instance *types.Instance, log *logr.Logger) (string, error) { + if instance.PublicDnsName != nil && *instance.PublicDnsName != "" { - return *res.Reservations[0].Instances[0].PublicDnsName, nil - } + server, _ := net.ResolveTCPAddr("tcp", *instance.PublicDnsName+":22") + conn, err := net.DialTCP("tcp", nil, server) + if err != nil { + log.Error(err, "failed to connect to AWS instance") + return "", err } + defer conn.Close() + + return *instance.PublicDnsName, nil } return "", nil } @@ -164,6 +170,37 @@ func (configMapInfo AwsDynamicConfig) TerminateInstance(kubeClient client.Client return err } +func (configMapInfo AwsDynamicConfig) ListInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]cloud.CloudVMInstance, error) { + log.Info("attempting to list AWS instances") + cfg, err := config.LoadDefaultConfig(ctx, + config.WithCredentialsProvider(SecretCredentialsProvider{Name: configMapInfo.Secret, Namespace: configMapInfo.SystemNamespace, Client: kubeClient}), + config.WithRegion(configMapInfo.Region)) + if err != nil { + return nil, err + } + + // Create an EC2 client + ec2Client := ec2.NewFromConfig(cfg) + res, err := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{Filters: []types.Filter{{Name: aws.String("tag:" + cloud.InstanceTag), Values: []string{instanceTag}}, {Name: aws.String("tag:" + MultiPlatformManaged), Values: []string{"true"}}}}) + if err != nil { + log.Error(err, "failed to describe instance") + return nil, err + } + ret := []cloud.CloudVMInstance{} + for _, res := range res.Reservations { + for _, inst := range res.Instances { + if inst.State.Name != types.InstanceStateNameTerminated { + address, err := configMapInfo.checkInstanceConnectivity(&inst, log) + if err == nil { + ret = append(ret, cloud.CloudVMInstance{InstanceId: cloud.InstanceIdentifier(*inst.InstanceId), StartTime: *inst.LaunchTime, Address: address}) + log.Info(fmt.Sprintf("counting instance %s towards running count", *inst.InstanceId)) + } + } + } + } + return ret, nil +} + type SecretCredentialsProvider struct { Name string Namespace string diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 7e2e7d4e..7762f39b 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -4,6 +4,7 @@ import ( "context" "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/client" + "time" ) const InstanceTag = "multi-platform-instance" @@ -13,7 +14,14 @@ type CloudProvider interface { TerminateInstance(kubeClient client.Client, log *logr.Logger, ctx context.Context, instance InstanceIdentifier) error GetInstanceAddress(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceId InstanceIdentifier) (string, error) CountInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) (int, error) + ListInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]CloudVMInstance, error) SshUser() string } +type CloudVMInstance struct { + InstanceId InstanceIdentifier + StartTime time.Time + Address string +} + type InstanceIdentifier string diff --git a/pkg/ibm/ibmp.go b/pkg/ibm/ibmp.go index f7656ed6..6bae3f50 100644 --- a/pkg/ibm/ibmp.go +++ b/pkg/ibm/ibmp.go @@ -5,6 +5,7 @@ import ( "crypto/md5" //#nosec "encoding/base64" "encoding/json" + "fmt" "github.com/IBM/go-sdk-core/v4/core" "github.com/go-logr/logr" "github.com/google/uuid" @@ -137,6 +138,9 @@ func (r IBMPowerDynamicConfig) GetInstanceAddress(kubeClient client.Client, log return checkAddressLive(ip, log) } +func (r IBMPowerDynamicConfig) ListInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]cloud.CloudVMInstance, error) { + return nil, fmt.Errorf("not impelemented") +} func (r IBMPowerDynamicConfig) TerminateInstance(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceId cloud.InstanceIdentifier) error { log.Info("attempting to terminate power server %s", "instance", instanceId) service, err := r.authenticate(kubeClient, ctx) diff --git a/pkg/ibm/ibmz.go b/pkg/ibm/ibmz.go index b86857ff..7042ec6f 100644 --- a/pkg/ibm/ibmz.go +++ b/pkg/ibm/ibmz.go @@ -129,6 +129,35 @@ func (r IBMZDynamicConfig) CountInstances(kubeClient client.Client, log *logr.Lo return count, nil } +func (r IBMZDynamicConfig) ListInstances(kubeClient client.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]cloud.CloudVMInstance, error) { + vpcService, err := r.authenticate(kubeClient, ctx) + if err != nil { + return nil, err + } + + vpc, err := r.lookupVpc(vpcService) + if err != nil { + return nil, err + } + instances, _, err := vpcService.ListInstances(&vpcv1.ListInstancesOptions{ResourceGroupID: vpc.ResourceGroup.ID, VPCName: &r.Vpc}) + if err != nil { + return nil, err + } + ret := []cloud.CloudVMInstance{} + for _, instance := range instances.Instances { + if strings.HasPrefix(*instance.Name, instanceTag) { + identifier := cloud.InstanceIdentifier(*instance.ID) + addr, err := r.GetInstanceAddress(kubeClient, log, ctx, identifier) + if err != nil { + log.Error(err, "not listing instance as address cannot be assigned yet", "instance", *instance.ID) + } else { + ret = append(ret, cloud.CloudVMInstance{InstanceId: identifier, Address: addr, StartTime: time.Time(*instance.CreatedAt)}) + } + } + } + return ret, nil +} + func (r IBMZDynamicConfig) lookupSubnet(vpcService *vpcv1.VpcV1) (*vpcv1.Subnet, error) { subnets, _, err := vpcService.ListSubnets(&vpcv1.ListSubnetsOptions{}) if err != nil { diff --git a/pkg/reconciler/taskrun/dynamicpool.go b/pkg/reconciler/taskrun/dynamicpool.go new file mode 100644 index 00000000..90554948 --- /dev/null +++ b/pkg/reconciler/taskrun/dynamicpool.go @@ -0,0 +1,125 @@ +package taskrun + +import ( + "context" + "crypto/rand" + "encoding/hex" + "github.com/go-logr/logr" + "github.com/redhat-appstudio/multi-platform-controller/pkg/cloud" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "time" +) + +type DynamicHostPool struct { + CloudProvider cloud.CloudProvider + SshSecret string + Platform string + MaxInstances int + Concurrency int + MaxAge time.Duration + InstanceTag string +} + +func (a DynamicHostPool) buildHostPool(r *ReconcileTaskRun, ctx context.Context, log *logr.Logger, instanceTag string) (*HostPool, int, error) { + ret := map[string]*Host{} + instances, err := a.CloudProvider.ListInstances(r.client, log, ctx, instanceTag) + if err != nil { + return nil, 0, err + } + oldInstanceCount := 0 + for _, inst := range instances { + if inst.StartTime.Add(a.MaxAge).Before(time.Now()) { + // These are shut down on deallocation + // TODO: scan for active tasks, and shut down here if not found + oldInstanceCount++ + } else { + ret[string(inst.InstanceId)] = &Host{Name: string(inst.InstanceId), Address: inst.Address, User: a.CloudProvider.SshUser(), Concurrency: a.Concurrency, Platform: a.Platform, Secret: a.SshSecret, StartTime: &inst.StartTime} + } + } + return &HostPool{hosts: ret}, oldInstanceCount, nil +} + +func (a DynamicHostPool) Deallocate(r *ReconcileTaskRun, ctx context.Context, log *logr.Logger, tr *v1.TaskRun, secretName string, selectedHost string) error { + + hostPool, oldInstanceCount, err := a.buildHostPool(r, ctx, log, a.InstanceTag) + if err != nil { + return err + } + err = hostPool.Deallocate(r, ctx, log, tr, secretName, selectedHost) + if err != nil { + return err + } + if oldInstanceCount > 0 { + // Maybe this is an old instance + startTime := hostPool.hosts[selectedHost].StartTime + if startTime.Add(a.MaxAge).Before(time.Now()) { + // Old host, check if other tasks are using it + trs := v1.TaskRunList{} + err := r.client.List(ctx, &trs, client.MatchingLabels{AssignedHost: selectedHost}) + if err != nil { + return err + } + if len(trs.Items) == 0 { + log.Info("deallocating old instance", "instance", selectedHost) + } + err = a.CloudProvider.TerminateInstance(r.client, log, ctx, cloud.InstanceIdentifier(selectedHost)) + if err != nil { + return err + } + } + } + return nil +} + +func (a DynamicHostPool) Allocate(r *ReconcileTaskRun, ctx context.Context, log *logr.Logger, tr *v1.TaskRun, secretName string, instanceTag string) (reconcile.Result, error) { + + hostPool, oldInstanceCount, err := a.buildHostPool(r, ctx, log, instanceTag) + if err != nil { + return reconcile.Result{}, err + } + _, err = hostPool.Allocate(r, ctx, log, tr, secretName, instanceTag) + if err != nil { + return reconcile.Result{}, err + } + if tr.Labels == nil || tr.Labels[WaitingForPlatformLabel] == "" { + //We only need to launch an instance if the task run is waiting for a label + return reconcile.Result{}, err + } + + // Count will handle instances that are not ready yet + count, err := a.CloudProvider.CountInstances(r.client, log, ctx, instanceTag) + if err != nil { + return reconcile.Result{}, err + } + // We don't count old instances towards the total, as they will shut down soon + if count-oldInstanceCount >= a.MaxInstances { + log.Info("cannot provision new instances") + // Too many instances, we just have to wait + return reconcile.Result{RequeueAfter: time.Minute}, err + } + name, err := getRandomString(8) + if err != nil { + return reconcile.Result{}, err + } + + // Counter intuitively we don't need the instance id + // It will be picked up on the list call + inst, err := a.CloudProvider.LaunchInstance(r.client, log, ctx, name, instanceTag) + if err != nil { + return reconcile.Result{}, err + } + + log.Info("allocated instance", "instance", inst) + return reconcile.Result{RequeueAfter: time.Minute}, err + +} + +func getRandomString(length int) (string, error) { + bytes := make([]byte, length/2+1) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes)[0:length], nil +} diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index c4fb58ad..f15d4c03 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -60,9 +60,10 @@ const ( ServiceAccountName = "multi-platform-controller" - PlatformParam = "PLATFORM" - DynamicPlatforms = "dynamic-platforms" - AllowedNamespaces = "allowed-namespaces" + PlatformParam = "PLATFORM" + DynamicPlatforms = "dynamic-platforms" + DynamicPoolPlatforms = "dynamic-pool-platforms" + AllowedNamespaces = "allowed-namespaces" ) type ReconcileTaskRun struct { @@ -495,6 +496,36 @@ func (r *ReconcileTaskRun) readConfiguration(ctx context.Context, log *logr.Logg } } + dynamicPool := cm.Data[DynamicPoolPlatforms] + for _, platform := range strings.Split(dynamicPool, ",") { + platformConfigName := strings.ReplaceAll(platform, "/", "-") + if platform == targetPlatform { + + typeName := cm.Data["dynamic-pool."+platformConfigName+".type"] + allocfunc := r.cloudProviders[typeName] + if allocfunc == nil { + return nil, "", errors2.New("unknown dynamic provisioning type " + typeName) + } + maxInstances, err := strconv.Atoi(cm.Data["dynamic-pool."+platformConfigName+".max-instances"]) + if err != nil { + return nil, "", err + } + concurrency, err := strconv.Atoi(cm.Data["dynamic-pool."+platformConfigName+".concurrency"]) + if err != nil { + return nil, "", err + } + return DynamicHostPool{ + CloudProvider: allocfunc(platformConfigName, cm.Data, r.operatorNamespace), + SshSecret: cm.Data["dynamic-pool."+platformConfigName+".ssh-secret"], + Platform: platform, + MaxInstances: maxInstances, + MaxAge: time.Hour * 24, //TODO Configurable + Concurrency: concurrency, + InstanceTag: cm.Data["instance-tag"], + }, cm.Data["instance-tag"], nil + } + } + ret := HostPool{hosts: map[string]*Host{}, targetPlatform: targetPlatform} for k, v := range cm.Data { if !strings.HasPrefix(k, "host.") { @@ -596,6 +627,7 @@ type Host struct { Concurrency int Platform string Secret string + StartTime *time.Time // Only used for the dynamic pool } func platformLabel(platform string) string { diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index 685f5ceb..c3595242 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -469,6 +469,11 @@ type MockCloud struct { Addressses map[cloud.InstanceIdentifier]string } +func (m *MockCloud) ListInstances(kubeClient runtimeclient.Client, log *logr.Logger, ctx context.Context, instanceTag string) ([]cloud.CloudVMInstance, error) { + //TODO implement me + panic("implement me") +} + func (m *MockCloud) CountInstances(kubeClient runtimeclient.Client, log *logr.Logger, ctx context.Context, instanceTag string) (int, error) { return m.Running, nil }