Skip to content

Commit

Permalink
feat: ibmcloud be to use local setup rather than k8 kind cluster
Browse files Browse the repository at this point in the history
Signed-off-by: aavarghese <[email protected]>
  • Loading branch information
aavarghese committed Dec 3, 2024
1 parent 4ee6eab commit 84fdc30
Show file tree
Hide file tree
Showing 28 changed files with 395 additions and 149 deletions.
1 change: 1 addition & 0 deletions cmd/subcommands/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ func init() {
cmd.AddCommand(component.Minio())
cmd.AddCommand(component.Worker())
cmd.AddCommand(component.WorkStealer())
cmd.AddCommand(component.RunLocally())
}
42 changes: 42 additions & 0 deletions cmd/subcommands/component/run-locally.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package component

import (
"context"

"github.com/spf13/cobra"
"lunchpail.io/cmd/options"
"lunchpail.io/pkg/build"
"lunchpail.io/pkg/runtime"
)

type RunLocallyOptions struct {
Component string
LLIR string
build.LogOptions
}

func AddRunLocallyOptions(cmd *cobra.Command) *RunLocallyOptions {
options := RunLocallyOptions{}
cmd.Flags().StringVarP(&options.Component, "component", "", "", "")
cmd.Flags().StringVar(&options.LLIR, "llir", "", "")
cmd.MarkFlagRequired("component")
cmd.MarkFlagRequired("llir")
return &options
}

func RunLocally() *cobra.Command {
cmd := &cobra.Command{
Use: "run-locally",
Short: "Commands for running a component locally",
Long: "Commands for running a component locally",
}

runOpts := AddRunLocallyOptions(cmd)
options.AddLogOptions(cmd)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
return runtime.RunLocally(context.Background(), runOpts.Component, runOpts.LLIR, runOpts.LogOptions)
}

return cmd
}
2 changes: 1 addition & 1 deletion cmd/subcommands/queue/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Drain() *cobra.Command {
return err
}

return queue.Drain(ctx, backend, run.ForStep(step), *opts.Log)
return queue.Drain(ctx, backend, run.ForStep(step), opts.Queue, *opts.Log)
}

return cmd
Expand Down
2 changes: 1 addition & 1 deletion cmd/subcommands/queue/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Ls() *cobra.Command {
return err
}

files, errors, err := queue.Ls(ctx, backend, runContext.ForStep(step), path, *opts.Log)
files, errors, err := queue.Ls(ctx, backend, runContext.ForStep(step), path, opts.Queue, *opts.Log)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/subcommands/queue/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Upload() *cobra.Command {
run = rrun.Name
}

return queue.UploadFiles(ctx, backend, q.RunContext{RunName: run}, []upload.Upload{upload.Upload{LocalPath: args[0], Bucket: args[1]}}, *opts.Log)
return queue.UploadFiles(ctx, backend, q.RunContext{RunName: run}, []upload.Upload{upload.Upload{LocalPath: args[0], Bucket: args[1]}}, opts.Queue, *opts.Log)
}

return cmd
Expand Down
2 changes: 1 addition & 1 deletion pkg/be/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Backend interface {
InstanceCount(ctx context.Context, c lunchpail.Component, run queue.RunContext) (int, error)

// Queue properties for a given run, plus ensure access to the endpoint from this client
AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error)
AccessQueue(ctx context.Context, run queue.RunContext, rclone string, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error)

// Return a streamer
Streamer(ctx context.Context, run queue.RunContext) streamer.Streamer
Expand Down
200 changes: 99 additions & 101 deletions pkg/be/ibmcloud/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"fmt"
"math"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -17,11 +17,9 @@ import (
"github.com/elotl/cloud-init/config"
"golang.org/x/crypto/ssh"
"golang.org/x/sync/errgroup"
q "lunchpail.io/pkg/ir/queue"

"lunchpail.io/pkg/be/kubernetes"
"lunchpail.io/pkg/be/kubernetes/common"
"lunchpail.io/pkg/ir/llir"
"lunchpail.io/pkg/lunchpail"
"lunchpail.io/pkg/util"
)

Expand Down Expand Up @@ -51,7 +49,7 @@ func (i *intCounter) inc() {
i.lock.Unlock()
}

func createInstance(vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, c llir.ShellComponent, resourceGroupID string, vpcID string, keyID string, zone string, profile string, subnetID string, secGroupID string, imageID string, namespace string, copts llir.Options) (*vpcv1.Instance, error) {
func createInstance(vpcService *vpcv1.VpcV1, name string, resourceGroupID string, vpcID string, keyID string, zone string, profile string, subnetID string, secGroupID string, imageID string, namespace string, copts llir.Options, cc *config.CloudConfig) (*vpcv1.Instance, error) {
networkInterfacePrototypeModel := &vpcv1.NetworkInterfacePrototype{
Name: &name,
Subnet: &vpcv1.SubnetIdentityByID{
Expand All @@ -62,29 +60,6 @@ func createInstance(vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, c llir.S
}},
}

// TODO pass through actual Cli Options?
opts := common.Options{Options: copts}

appYamlString, err := kubernetes.MarshalComponentAsStandalone(ir, c, namespace, opts)
if err != nil {
return nil, fmt.Errorf("failed to marshall yaml: %v", err)
}
cc := &config.CloudConfig{
WriteFiles: []config.File{
{
Path: "/app.yaml",
Content: appYamlString,
Owner: "root:root",
RawFilePermissions: "0644",
}},
RunCmd: []string{"sleep 10", //Minimum of 10 seconds needed for cluster to be able to run `apply`
"while ! kind get clusters | grep lunchpail; do sleep 2; done",
"echo 'Kind cluster is ready'",
"env HOME=/root kubectl create ns " + namespace,
"n=0; until [ $n -ge 60 ]; do env HOME=/root kubectl get serviceaccount default -o name -n " + namespace + " && break; n=$((n + 1)); sleep 1; done",
"env HOME=/root kubectl create -f /app.yaml -n " + namespace},
}

instancePrototypeModel := &vpcv1.InstancePrototypeInstanceByImage{
Name: &name,
ResourceGroup: &vpcv1.ResourceGroupIdentity{
Expand Down Expand Up @@ -273,123 +248,146 @@ func createVPC(vpcService *vpcv1.VpcV1, name string, appName string, resourceGro
return *vpc.ID, nil
}

func createAndInitVM(ctx context.Context, vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, resourceGroupID string, keyType string, publicKey string, zone string, profile string, imageID string, namespace string, opts llir.Options) error {
func createImage(vpcService *vpcv1.VpcV1, name string, resourceGroupID string, vmID string) (string, error) {
options := &vpcv1.CreateImageOptions{
ImagePrototype: &vpcv1.ImagePrototype{
Name: &name,
ResourceGroup: &vpcv1.ResourceGroupIdentity{
ID: &resourceGroupID,
},
SourceVolume: &vpcv1.VolumeIdentityByID{
ID: &vmID,
},
},
}
image, response, err := vpcService.CreateImage(options)
if err != nil {
return "", fmt.Errorf("failed to create an Image: %v and the response is: %s", err, response)
}
return *image.ID, nil
}

func createResources(ctx context.Context, vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, resourceGroupID string, keyType string, publicKey string, zone string, profile string, imageID string, namespace string, opts llir.Options) (string, error) {
var instanceID string
t1s := time.Now()
vpcID, err := createVPC(vpcService, name, ir.AppName, resourceGroupID)
if err != nil {
return err
return "", err
}
t1e := time.Now()

t2s := t1e
keyID, err := createSSHKey(vpcService, name, resourceGroupID, keyType, publicKey)
if err != nil {
return err
return "", err
}
t2e := time.Now()

t3s := t2e
subnetID, err := createSubnet(vpcService, name, resourceGroupID, vpcID, zone)
if err != nil {
return err
return "", err
}
t3e := time.Now()

t4s := t3e
secGroupID, err := createSecurityGroup(vpcService, name, resourceGroupID, vpcID)
if err != nil {
return err
return "", err
}
t4e := time.Now()

t5s := t4e
if err = createSecurityGroupRule(vpcService, secGroupID); err != nil {
return err
return "", err
}
t5e := time.Now()

group, _ := errgroup.WithContext(ctx)
t6s := time.Now()
// One Component for WorkStealer, one for Dispatcher, and each per WorkerPool
poolCount := intCounter{}
if err = createVMForComponents(ctx, vpcService, name, ir, resourceGroupID, zone, profile, imageID, namespace, vpcID, keyID, subnetID, secGroupID, opts); err != nil {
return "", err
}
t6e := time.Now()

if opts.Log.Verbose {
fmt.Fprintf(os.Stderr, "Setup done %s\n", util.RelTime(t1s, t6e))
fmt.Fprintf(os.Stderr, " - VPC %s\n", util.RelTime(t1s, t1e))
fmt.Fprintf(os.Stderr, " - SSH %s\n", util.RelTime(t2s, t2e))
fmt.Fprintf(os.Stderr, " - Subnet %s\n", util.RelTime(t3s, t3e))
fmt.Fprintf(os.Stderr, " - SecurityGroup %s\n", util.RelTime(t4s, t4e))
fmt.Fprintf(os.Stderr, " - SecurityGroupRule %s\n", util.RelTime(t5s, t5e))
fmt.Fprintf(os.Stderr, " - VMs %s\n", util.RelTime(t6s, t6e))
}
return instanceID, nil
}

func createVMForComponents(ctx context.Context, vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, resourceGroupID string, zone string, profile string, imageID string, namespace string, vpcID string, keyID string, subnetID string, secGroupID string, opts llir.Options) error {
group, _ := errgroup.WithContext(ctx)
var verboseFlag string

for _, c := range ir.Components {
instanceName := name + "-" + string(c.C())
group.Go(func() error {
if c.C() == lunchpail.DispatcherComponent || c.C() == lunchpail.WorkStealerComponent {
instance, err := createInstance(vpcService, instanceName, ir, c, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts)
if err != nil {
return err
}
if opts.Log.Verbose {
fmt.Fprintf(os.Stderr, "Creating VM %s\n", instanceName)
}

//TODO VSI instances other than jumpbox or main pod should not have floatingIP. Remove below after testing
floatingIPID, err := createFloatingIP(vpcService, instanceName, resourceGroupID, zone)
if err != nil {
return err
}
componentB64, err := util.ToJsonB64(c)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
return err
}

options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{
ID: &floatingIPID,
InstanceID: instance.ID,
NetworkInterfaceID: instance.PrimaryNetworkInterface.ID,
}
_, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options)
if err != nil {
return fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response)
}
} else if c.C() == lunchpail.WorkersComponent {
poolCount.inc()
workerCount := c.Workers()
poolName := instanceName + strconv.Itoa(poolCount.counter) //multiple worker pools, maybe

//Compute number of VSIs to be provisioned and job parallelism for each VSI
parallelism, numInstances, err := computeParallelismAndInstanceCount(vpcService, profile, int32(workerCount))
if err != nil {
return fmt.Errorf("failed to compute number of instances and job parallelism: %v", err)
}
llirB64, err := util.ToJsonB64(ir)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
return err
}

for i := 0; i < numInstances; i++ {
workerName := poolName + "-" + strconv.Itoa(i) //multiple worker instances
c = c.SetWorkers(int(parallelism[i]))
instance, err := createInstance(vpcService, workerName, ir, c, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts)
if err != nil {
return err
}

floatingIPID, err := createFloatingIP(vpcService, workerName, resourceGroupID, zone)
if err != nil {
return err
}

options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{
ID: &floatingIPID,
InstanceID: instance.ID,
NetworkInterfaceID: instance.PrimaryNetworkInterface.ID,
}
_, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options)
if err != nil {
return fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response)
}
}
if opts.Log.Verbose {
verboseFlag = "--verbose"
}

cc := &config.CloudConfig{
RunCmd: []string{"curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o /minio-binaries/mc",
"chmod +x /minio-binaries/mc",
"export PATH=$PATH:/minio-binaries/",
"mc alias set myminio " + ir.Context.Queue.Endpoint + " " + ir.Context.Queue.AccessKey + " " + ir.Context.Queue.SecretKey, // setting mc config
"apt-get install jq -y",
"exec=$(mc stat myminio/" + ir.Context.Queue.Bucket + "/" + ir.Context.Run.AsFile(q.Blobs) + "/ --json | jq -r '.name')",
"mc get myminio/" + ir.Context.Queue.Bucket + "/" + ir.Context.Run.AsFile(q.Blobs) + "/$exec /lunchpail", //use mc client to download binary
"chmod +x /lunchpail",
"env HOME=/root /lunchpail component run-locally --component " + string(componentB64) + " --llir " + string(llirB64) + " " + verboseFlag},
}

//TODO: Compute number of VSIs to be provisioned and job parallelism for each VSI based on number of workers and workerpools
group.Go(func() error {
instance, err := createInstance(vpcService, instanceName, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts, cc)
if err != nil {
return err
}

floatingIPID, err := createFloatingIP(vpcService, instanceName, resourceGroupID, zone)
if err != nil {
return err
}

options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{
ID: &floatingIPID,
InstanceID: instance.ID,
NetworkInterfaceID: instance.PrimaryNetworkInterface.ID,
}
_, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options)
if err != nil {
return fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response)
}
return nil
})
}
if err := group.Wait(); err != nil {
return err
}
t6e := time.Now()

fmt.Printf("Setup done %s\n", util.RelTime(t1s, t6e))
fmt.Printf(" - VPC %s\n", util.RelTime(t1s, t1e))
fmt.Printf(" - SSH %s\n", util.RelTime(t2s, t2e))
fmt.Printf(" - Subnet %s\n", util.RelTime(t3s, t3e))
fmt.Printf(" - SecurityGroup %s\n", util.RelTime(t4s, t4e))
fmt.Printf(" - SecurityGroupRule %s\n", util.RelTime(t5s, t5e))
fmt.Printf(" - VMs %s\n", util.RelTime(t6s, t6e))
return nil
}

func (backend Backend) SetAction(ctx context.Context, opts llir.Options, ir llir.LLIR, action Action) error {
runname := ir.RunName()

Expand All @@ -406,7 +404,7 @@ func (backend Backend) SetAction(ctx context.Context, opts llir.Options, ir llir
}
zone = randomZone
}
if err := createAndInitVM(ctx, backend.vpcService, runname, ir, backend.config.ResourceGroup.GUID, backend.sshKeyType, backend.sshPublicKey, zone, opts.Profile, opts.ImageID, backend.namespace, opts); err != nil {
if _, err := createResources(ctx, backend.vpcService, runname, ir, backend.config.ResourceGroup.GUID, backend.sshKeyType, backend.sshPublicKey, zone, opts.Profile, opts.ImageID, backend.namespace, opts); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 84fdc30

Please sign in to comment.