diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index d45804f..f4de3d8 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -17,7 +17,7 @@ Please delete options that are not relevant. # Checklist: - [ ] My code follows the style guidelines of this project. See [contributing-guidelines.md](./../contributing-guidelines.md) -- [ ] Existing workload examples run after my changes (if applicable) +- [ ] Existing workload examples run to completion after my changes (if applicable) - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation diff --git a/.github/workflows/compile-release.yaml b/.github/workflows/compile-release.yaml new file mode 100644 index 0000000..31430b7 --- /dev/null +++ b/.github/workflows/compile-release.yaml @@ -0,0 +1,46 @@ +name: compile-release +on: + push: + tags: + - "v*" + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Extract version from tag + shell: bash + run: | + echo "VERSION=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV + echo "Using version: ${VERSION}" + + - name: Run build script + shell: bash + run: | + set -e + chmod +x build_cli_all_arch.sh + ./build_cli_all_arch.sh "$VERSION" + + - name: Compress workloads + shell: bash + run: | + zip -r workloads.zip ./workloads + + - name: Create draft release and upload assets + uses: softprops/action-gh-release@v2 + with: + files: | + builds/* + workloads.zip + token: '${{ secrets.GITHUB_TOKEN }}' + draft: true + prerelease: true diff --git a/.vscode/launch.json b/.vscode/launch.json index f687d18..5e68728 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,8 +6,8 @@ "type": "go", "request": "launch", "mode": "debug", - "program": "${workspaceFolder}", - "args": ["submit", "-p", "workloads/training/LLMs/lora-supervised-finetuning/lora-sft-zero3-single-multinode", "--ray", "-g", "4", "--dry-run"], + "program": "${workspaceFolder}/cmd/cli/main.go", + "args": ["submit", "-p", "${workspaceFolder}/workloads/training/LLMs/lora-supervised-finetuning/lora-sft-zero3-single-multinode", "--ray", "-g", "4", "--dry-run", "--storage=100Gi,longhorn"], "env": { "GO111MODULE": "on" }, @@ -18,8 +18,8 @@ "type": "go", "request": "launch", "mode": "debug", - "program": "${workspaceFolder}", - "args": ["serve", "-p", "workloads/inference/LLMs/online-inference/vllm-online-single-multinode", "--ray", "--replicas", "1", "--gpus-per-replica", "4", "--dry-run"], + "program": "${workspaceFolder}/cmd/cli/main.go", + "args": ["serve", "-p", "${workspaceFolder}/workloads/inference/LLMs/online-inference/vllm-online-single-multinode", "--ray", "--replicas", "1", "--gpus-per-replica", "4", "--dry-run"], "env": { "GO111MODULE": "on" }, @@ -30,8 +30,8 @@ "type": "go", "request": "launch", "mode": "debug", - "program": "${workspaceFolder}", - "args": ["submit", "-p", "workloads/training/LLMs/bert/hf-accelerate-bert", "-g", "4", "--dry-run"], + "program": "${workspaceFolder}/cmd/cli/main.go", + "args": ["submit", "-p", "${workspaceFolder}/workloads/training/LLMs/bert/hf-accelerate-bert", "-g", "4", "--dry-run"], "env": { "GO111MODULE": "on" }, @@ -42,7 +42,7 @@ "type": "go", "request": "launch", "mode": "debug", - "program": "${workspaceFolder}", + "program": "${workspaceFolder}/cmd/cli/main.go", "args": ["submit", "-i", "ghcr.io/silogen/rocm-ray:v0.4", "-g", "4"], "env": { "GO111MODULE": "on" @@ -54,8 +54,8 @@ "type": "go", "request": "launch", "mode": "debug", - "program": "${workspaceFolder}", - "args": ["submit", "-p", "workloads/training/LLMs/lora-supervised-finetuning/ds-zero3-single-multinode", "--ray", "-g", "4"], + "program": "${workspaceFolder}/cmd/cli/main.go", + "args": ["submit", "-p", "${workspaceFolder}/workloads/training/LLMs/lora-supervised-finetuning/ds-zero3-single-multinode", "--ray", "-g", "4"], "env": { "GO111MODULE": "on" }, @@ -66,7 +66,7 @@ "type": "go", "request": "launch", "mode": "debug", - "program": "${workspaceFolder}", + "program": "${workspaceFolder}/cmd/cli/main.go", "args": ["monitor", "deployment/avsuni-gpu-monitoring", "-n", "av-test"], "env": { "GO111MODULE": "on" diff --git a/README.md b/README.md index 8071cbc..5030d3a 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,22 @@ You can access this in the template via {{ .Custom.parent.child }} ``` +### Storage + +You can use the Kaiwo CLI to instruct a workload to use storage from a given storage class. If you do not provide any input for the CLI, the following default values are used: + +* The storage class name is read from the specified namespace's label `kaiwo-cli/default-storage-class` +* The storage amount is read from the specified namespace's label `kaiwo-cli/default-storage-quantity` + +If these values do not exist, an exception is raised. If you are using the cluster-admins examples from this repository, you can modify the namespace at [cluster-admins/kueue/cluster-queue.yaml](cluster-admins/kueue/cluster-queue.yaml) and add these values. If you want to skip adding storage, you must explicitly add the `--no-storage` flag. + +To specify storage, you can use the flags: + +* `--storage=2Gi` to specify the amount of storage and to use the default storage class name from the namespace labels +* `--storage=2Gi,mystorageclass` to specify both the amount of storage and the storage class name + +Note that the storage created is ephemeral and meant for caching, which means that it gets removed when the underlying pods get removed. However, the ephemeral storage is provisioned via a storage class, which ensures that the space requested is available and reserved for all pods before the workload starts. + ## Interacting with workloads While Kaiwo's primary purpose is to deploy workloads, it can also be used as a light tool to discover and interact with running workloads. diff --git a/cluster-admins/kueue/cluster-queue.yaml b/cluster-admins/kueue/cluster-queue.yaml index 768e137..a6cc4ac 100644 --- a/cluster-admins/kueue/cluster-queue.yaml +++ b/cluster-admins/kueue/cluster-queue.yaml @@ -5,7 +5,7 @@ metadata: spec: namespaceSelector: {} # match all. resourceGroups: - - coveredResources: ["cpu", "memory", "amd.com/gpu", "ephemeral-storage"] + - coveredResources: ["cpu", "memory", "amd.com/gpu"] flavors: - name: base-gpu-flavour resources: @@ -15,5 +15,3 @@ spec: nominalQuota: 1800Gi - name: "amd.com/gpu" nominalQuota: 16 - - name: "ephemeral-storage" - nominalQuota: 2000Gi diff --git a/pkg/cli/apply/run.go b/pkg/cli/apply/run.go index 2c6524e..6ad29d1 100644 --- a/pkg/cli/apply/run.go +++ b/pkg/cli/apply/run.go @@ -22,6 +22,11 @@ import ( "strconv" "strings" + "k8s.io/apimachinery/pkg/api/errors" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,16 +50,21 @@ func RunApply(workload workloads.Workload, workloadMeta any) error { return nil } + // Discover workload files + if err := loadFileList(&execFlags); err != nil { + return fmt.Errorf("error loading file list: %w", err) + } + // Generate workload configuration workloadConfig, err := workload.GenerateTemplateContext(execFlags) if err != nil { return fmt.Errorf("error generating workload config: %w", err) } - // Load custom configuration, if provided + // Load custom configuration, if available var customConfig any - if execFlags.CustomConfigPath != "" { - customConfig, err = loadCustomConfig(execFlags.CustomConfigPath) + if customConfigFile, ok := execFlags.WorkloadFiles[workloads.CustomTemplateValuesFilename]; ok { + customConfig, err = loadCustomConfig(customConfigFile) if err != nil { return fmt.Errorf("error loading custom config: %w", err) } @@ -72,27 +82,56 @@ func RunApply(workload workloads.Workload, workloadMeta any) error { logrus.Debugf("No explicit name provided, using name: %s", metaFlags.Name) } - // Parse environment variables - if execFlags.EnvFilePath == "" { - envFilePath = filepath.Join(execFlags.Path, workloads.EnvFilename) - } else { - envFilePath = execFlags.EnvFilePath - } - - if err := parseEnvFile(envFilePath, &metaFlags); err != nil { + // Parse environment variables, if any + if err := parseEnvFile(execFlags.WorkloadFiles[workloads.EnvFilename], &metaFlags); err != nil { return fmt.Errorf("error parsing environment: %w", err) } // Prepare scheduling flags - dynamicClient, err := k8s.GetClient() + clients, err := k8s.GetKubernetesClients() if err != nil { - return fmt.Errorf("error fetching Kubernetes client: %w", err) + return fmt.Errorf("error getting k8s clients: %w", err) } ctx := context.TODO() - schedulingFlags := GetSchedulingFlags() + schedulingFlags, err := GetSchedulingFlags() + if err != nil { + return fmt.Errorf("error getting scheduling flags: %w", err) + } + + if schedulingFlags.Storage != nil { + if schedulingFlags.Storage.StorageClassName == "" || schedulingFlags.Storage.Quantity == "" { + logrus.Info("Storage class name and / or quantity not provided, checking namespace labels for defaults") + + defaultStorageFlags, err := findDefaultStorageFlags(ctx, *clients.Clientset, metaFlags.Namespace) + if err != nil { + return fmt.Errorf("error checking for storage defaults: %w", err) + } + + if schedulingFlags.Storage.StorageClassName == "" { + if defaultStorageFlags.StorageClassName == "" { + return fmt.Errorf("storage requested, but no storage class name provided and no default exists in the namespace '%s' label '%s'", metaFlags.Namespace, workloads.KaiwoDefaultStorageClassNameLabel) + } + schedulingFlags.Storage.StorageClassName = defaultStorageFlags.StorageClassName + } + if schedulingFlags.Storage.Quantity == "" { + if defaultStorageFlags.Quantity == "" { + return fmt.Errorf("storage requested, but no quantity provided and no default exists in the namespace '%s' label '%s'", metaFlags.Namespace, workloads.KaiwoDefaultStorageQuantityLabel) + } + schedulingFlags.Storage.Quantity = defaultStorageFlags.Quantity + } + } + + storageClassExists, err := doesStorageClassExist(ctx, *clients.Clientset, schedulingFlags.Storage.StorageClassName) + if err != nil { + return fmt.Errorf("error checking if storage class exists: %w", err) + } + if !storageClassExists { + return fmt.Errorf("storage class '%s' does not exist", schedulingFlags.Storage.StorageClassName) + } + } - if err := fillSchedulingFlags(ctx, dynamicClient, &schedulingFlags, execFlags.ResourceFlavorGpuNodeLabelKey, metaFlags.EnvVars); err != nil { + if err := fillSchedulingFlags(ctx, clients.Client, schedulingFlags, execFlags.ResourceFlavorGpuNodeLabelKey, metaFlags.EnvVars); err != nil { return fmt.Errorf("error filling scheduling flags: %w", err) } logrus.Debugf("Successfully loaded scheduling info from Kubernetes") @@ -106,18 +145,143 @@ func RunApply(workload workloads.Workload, workloadMeta any) error { WorkloadMeta: workloadMeta, Workload: workloadConfig, Meta: metaFlags, - Scheduling: schedulingFlags, + Scheduling: *schedulingFlags, Custom: customConfig, } // Apply the workload - if err := workloads.ApplyWorkload(ctx, dynamicClient, workload, execFlags, templateContext); err != nil { + if err := workloads.ApplyWorkload(ctx, clients.Client, workload, execFlags, templateContext); err != nil { return fmt.Errorf("error applying workload: %w", err) } return nil } +func loadFileList(execFlags *workloads.ExecFlags) error { + if execFlags.Path == "" && execFlags.OverlayPath != "" { + return fmt.Errorf("cannot load workload with an overlay path without base path") + } + if execFlags.Path == "" { + return nil + } + + files := map[string]string{} + + entries, err := os.ReadDir(execFlags.Path) + if err != nil { + return fmt.Errorf("error reading directory: %w", err) + } + + for _, entry := range entries { + if !entry.IsDir() { + files[entry.Name()] = filepath.Join(execFlags.Path, entry.Name()) + } + } + + if execFlags.OverlayPath != "" { + overlayFiles, err := os.ReadDir(execFlags.OverlayPath) + if err != nil { + return fmt.Errorf("error reading directory: %w", err) + } + for _, overlayFile := range overlayFiles { + if !overlayFile.IsDir() { + files[overlayFile.Name()] = filepath.Join(execFlags.OverlayPath, overlayFile.Name()) + } + } + } + + for k, v := range files { + logrus.Debugf("Discovered file %s from %s", k, v) + } + + execFlags.WorkloadFiles = files + + return nil +} + +func findDefaultStorageFlags(ctx context.Context, clientset kubernetes.Clientset, namespace string) (*workloads.StorageSchedulingFlags, error) { + namespaceObject, err := clientset.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + logrus.Warnf("Namespace does not exist, cannot check for storage defaults. Either ensure that the namespace exists and has default values, specify the storage class and amount explicitly, or specify --no-storage to skip adding storage.") + return nil, fmt.Errorf("failed to find default storage class or quantity for namespace that does not exist: %s", namespace) + } + return nil, fmt.Errorf("error getting namespace: %w", err) + } + + flags := &workloads.StorageSchedulingFlags{} + + defaultStorageClassName, ok := namespaceObject.Labels[workloads.KaiwoDefaultStorageClassNameLabel] + if ok { + logrus.Debugf("Default storage class discovered: %s", defaultStorageClassName) + flags.StorageClassName = defaultStorageClassName + } else { + logrus.Debugf("Default storage class not found") + } + defaultStorageQuantity, ok := namespaceObject.Labels[workloads.KaiwoDefaultStorageQuantityLabel] + if ok { + logrus.Debugf("Default storage quantity discovered: %s", defaultStorageQuantity) + flags.Quantity = defaultStorageQuantity + } else { + logrus.Debugf("Default storage quantity not found") + } + + return flags, nil +} + +func doesStorageClassExist(ctx context.Context, clientset kubernetes.Clientset, storageClassName string) (bool, error) { + _, err := clientset.StorageV1().StorageClasses().Get(ctx, storageClassName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("error getting storage class %s: %w", storageClassName, err) + } + return true, nil +} + +func loadFileList(execFlags *workloads.ExecFlags) error { + if execFlags.Path == "" && execFlags.OverlayPath != "" { + return fmt.Errorf("cannot load workload with an overlay path without base path") + } + if execFlags.Path == "" { + return nil + } + + files := map[string]string{} + + entries, err := os.ReadDir(execFlags.Path) + if err != nil { + return fmt.Errorf("error reading directory: %w", err) + } + + for _, entry := range entries { + if !entry.IsDir() { + files[entry.Name()] = filepath.Join(execFlags.Path, entry.Name()) + } + } + + if execFlags.OverlayPath != "" { + overlayFiles, err := os.ReadDir(execFlags.OverlayPath) + if err != nil { + return fmt.Errorf("error reading directory: %w", err) + } + for _, overlayFile := range overlayFiles { + if !overlayFile.IsDir() { + files[overlayFile.Name()] = filepath.Join(execFlags.OverlayPath, overlayFile.Name()) + } + } + } + + for k, v := range files { + logrus.Debugf("Discovered file %s from %s", k, v) + } + + execFlags.WorkloadFiles = files + + return nil +} + // loadCustomConfig loads custom configuration data from a file func loadCustomConfig(path string) (any, error) { logrus.Debugln("Loading custom config") @@ -196,11 +360,11 @@ func fillSchedulingFlags( if schedulingFlags.RequestedReplicas > 0 && schedulingFlags.RequestedGPUsPerReplica > 0 { if schedulingFlags.RequestedGPUsPerReplica > schedulingFlags.GPUsAvailablePerNode { - return fmt.Errorf("You requested %d GPUs per replica, but there are only %d GPUs available per node", + return fmt.Errorf("you requested %d GPUs per replica, but there are only %d GPUs available per node", schedulingFlags.RequestedGPUsPerReplica, schedulingFlags.GPUsAvailablePerNode) } if schedulingFlags.TotalRequestedGPUs > 0 { - return fmt.Errorf("Cannot set requested gpus with --gpus when --replicas and --gpus-per-replica are set") + return fmt.Errorf("cannot set requested gpus with --gpus when --replicas and --gpus-per-replica are set") } schedulingFlags.CalculatedNumReplicas = schedulingFlags.RequestedReplicas schedulingFlags.CalculatedGPUsPerReplica = schedulingFlags.RequestedGPUsPerReplica diff --git a/pkg/cli/apply/serve.go b/pkg/cli/apply/serve.go index d36bcb6..91ad40e 100644 --- a/pkg/cli/apply/serve.go +++ b/pkg/cli/apply/serve.go @@ -33,6 +33,9 @@ func BuildServeCmd() *cobra.Command { Use: "serve", Short: "Serve a deployment process", PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + if err := cmd.Parent().PersistentPreRunE(cmd, args); err != nil { + return err + } return PreRunLoadConfig(cmd, args) }, RunE: func(cmd *cobra.Command, args []string) error { diff --git a/pkg/cli/apply/submit.go b/pkg/cli/apply/submit.go index 41fe365..c3b39ef 100644 --- a/pkg/cli/apply/submit.go +++ b/pkg/cli/apply/submit.go @@ -35,6 +35,9 @@ func BuildSubmitCmd() *cobra.Command { Use: "submit", Short: "Submit a job", PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + if err := cmd.Parent().PersistentPreRunE(cmd, args); err != nil { + return err + } return PreRunLoadConfig(cmd, args) }, RunE: func(cmd *cobra.Command, args []string) error { diff --git a/pkg/cli/apply/utils.go b/pkg/cli/apply/utils.go index a533631..c5405d8 100644 --- a/pkg/cli/apply/utils.go +++ b/pkg/cli/apply/utils.go @@ -30,13 +30,12 @@ const defaultGpuNodeLabelKey = "beta.amd.com/gpu.family.AI" // Exec flags var ( - dryRun bool - createNamespace bool - template string - path string - gpuNodeLabelKey string - customConfigPath string - envFilePath string + dryRun bool + createNamespace bool + template string + path string + overlayPath string + gpuNodeLabelKey string ) // AddExecFlags adds flags that are needed for the execution of apply functions @@ -44,11 +43,10 @@ func AddExecFlags(cmd *cobra.Command) { cmd.Flags().BoolVarP(&createNamespace, "create-namespace", "", false, "Create namespace if it does not exist") cmd.Flags().BoolVarP(&dryRun, "dry-run", "d", false, "Print the generated workload manifest without submitting it") cmd.Flags().StringVarP(&path, "path", "p", "", "Path to directory for workload code, entrypoint/serveconfig, env-file, etc. Either image or path is mandatory") + cmd.Flags().StringVarP(&overlayPath, "overlay-path", "o", "", "Additional overlay path. Files from both path and overlay-path are combined, if the file exists in both, the one from overlay-path is used") // TODO: remove gpuNodeLabelKey and have this logic be handled by the operator cmd.Flags().StringVarP(&gpuNodeLabelKey, "gpu-node-label-key", "", defaultGpuNodeLabelKey, fmt.Sprintf("Optional node label key used to specify the resource flavor GPU count. Defaults to %s", defaultGpuNodeLabelKey)) cmd.Flags().StringVarP(&template, "template", "t", "", "Optional path to a custom template to use for the workload. If not provided, a default template will be used unless template file found in workload directory") - cmd.Flags().StringVarP(&customConfigPath, "custom-config", "c", "", "Optional path to a custom YAML configuration file whose contents are made available in the template") - cmd.Flags().StringVarP(&envFilePath, "env-file", "", "", "Optional path to env file. Defaults to 'env' in workload code directory") } func GetExecFlags() workloads.ExecFlags { @@ -57,9 +55,8 @@ func GetExecFlags() workloads.ExecFlags { DryRun: dryRun, Template: template, Path: path, + OverlayPath: overlayPath, ResourceFlavorGpuNodeLabelKey: gpuNodeLabelKey, - CustomConfigPath: customConfigPath, - EnvFilePath: envFilePath, } } @@ -67,7 +64,7 @@ func GetExecFlags() workloads.ExecFlags { const ( defaultNamespace = "kaiwo" - defaultImage = "ghcr.io/silogen/rocm-ray:v0.5" + defaultImage = "ghcr.io/silogen/rocm-ray:v0.6" ) var ( @@ -103,6 +100,8 @@ var ( gpus int replicas int gpusPerReplica int + storage string + noStorage bool ) // AddSchedulingFlags adds flags related to (Kueue) scheduling @@ -110,25 +109,82 @@ func AddSchedulingFlags(cmd *cobra.Command) { cmd.Flags().IntVarP(&gpus, "gpus", "g", 0, "Number of GPUs requested for the workload") cmd.Flags().IntVarP(&replicas, "replicas", "", 0, "Number of replicas requested for the workload") cmd.Flags().IntVarP(&gpusPerReplica, "gpus-per-replica", "", 0, "Number of GPUs requested per replica") + cmd.Flags().StringVarP( + &storage, + "storage", + "", + "default", + "Storage requested for the workload, use: --storage=storageQuantity,storageClassName, --storage=storageQuantity to use the default storage class, or --storage=default (the default) to use defaults for both storage class and amount. "+ + fmt.Sprintf("The default storage class and amount can be configured in the namespace's labels (keys %s and %s). ", workloads.KaiwoDefaultStorageClassNameLabel, workloads.KaiwoDefaultStorageQuantityLabel)+ + "If you do not want to include storage, you must pass --no-storage explicitly.", + ) + cmd.Flags().BoolVarP(&noStorage, "no-storage", "", false, "Don't use storage for the workload") } // GetSchedulingFlags initializes the scheduling flags with the number of GPUs requested -func GetSchedulingFlags() workloads.SchedulingFlags { - return workloads.SchedulingFlags{ +func GetSchedulingFlags() (*workloads.SchedulingFlags, error) { + flags := &workloads.SchedulingFlags{ TotalRequestedGPUs: gpus, RequestedReplicas: replicas, RequestedGPUsPerReplica: gpusPerReplica, } + + if storage != "default" && noStorage { + return nil, fmt.Errorf("you must specify --storage or --no-storage, not both") + } + + if noStorage { + logrus.Info("No storage requested for workload") + return flags, nil + } + + if storage == "" { + return nil, fmt.Errorf("you must specify --storage or --no-storage") + } + + requestedStorage := "" + storageClassName := "" + + if storage != "default" { + split := strings.Split(storage, ",") + + if len(split) > 2 { + return nil, fmt.Errorf("invalid storage specifier %s", storage) + } + if len(split) > 1 { + storageClassName = split[1] + logrus.Infof("Requested storage class name %s", storageClassName) + } else { + logrus.Info("You did not pass a storage class name, the default storage class will be used if it exists") + } + if len(split) > 0 { + requestedStorage = split[0] + + if _, err := resource.ParseQuantity(requestedStorage); err != nil { + return nil, fmt.Errorf("invalid storage quantity %s", requestedStorage) + } + + logrus.Infof("Requested storage %s", requestedStorage) + } else { + logrus.Infof("You did not pass a storage quantity, the default amount (%s) will be used", requestedStorage) + } + } + + flags.Storage = &workloads.StorageSchedulingFlags{ + Quantity: requestedStorage, + StorageClassName: storageClassName, + } + + return flags, nil } type Config struct { DryRun bool `yaml:"dryRun"` CreateNamespace bool `yaml:"createNamespace"` Path string `yaml:"path"` + OverlayPath string `yaml:"overlayPath"` GpuNodeLabelKey string `yaml:"gpuNodeLabelKey"` Template string `yaml:"template"` - CustomConfig string `yaml:"customConfig"` - EnvFile string `yaml:"envFile"` Name string `yaml:"name"` Namespace string `yaml:"namespace"` Image string `yaml:"image"` @@ -174,10 +230,9 @@ func ApplyConfigToFlags(cmd *cobra.Command, config *Config) { setFlag("dry-run", fmt.Sprintf("%v", config.DryRun)) setFlag("create-namespace", fmt.Sprintf("%v", config.CreateNamespace)) setFlag("path", config.Path) + setFlag("overlay-path", config.OverlayPath) setFlag("gpu-node-label-key", config.GpuNodeLabelKey) setFlag("template", config.Template) - setFlag("custom-config", config.CustomConfig) - setFlag("env-file", config.EnvFile) // MetaFlags setFlag("name", config.Name) @@ -192,16 +247,26 @@ func ApplyConfigToFlags(cmd *cobra.Command, config *Config) { setFlag("gpus-per-replica", fmt.Sprintf("%d", config.RequestedGPUsPerReplica)) } -func PreRunLoadConfig(cmd *cobra.Command, args []string) error { - if path == "" { +func PreRunLoadConfig(cmd *cobra.Command, _ []string) error { + if path == "" && overlayPath == "" { return nil } - config, err := LoadConfigFromPath(path) + // First try to load config from the overlay path + config, err := LoadConfigFromPath(overlayPath) if err != nil { return fmt.Errorf("failed to load config: %w", err) } + if config == nil { + // If no overlay, try the main path + config, err = LoadConfigFromPath(path) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + logrus.Infof("Loaded config from %s", path) + } + if config != nil { ApplyConfigToFlags(cmd, config) logrus.Infof("Configuration loaded from %s", filepath.Join(path, workloads.KaiwoconfigFilename)) diff --git a/pkg/k8s/utils.go b/pkg/k8s/utils.go index 22801b9..e42d769 100644 --- a/pkg/k8s/utils.go +++ b/pkg/k8s/utils.go @@ -19,66 +19,13 @@ import ( "fmt" "os" "path/filepath" - "slices" "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) -func isBinaryFile(content []byte) bool { - return bytes.Contains(content, []byte{0}) -} - -// GenerateConfigMapFromDir generates a ConfigMap from a directory -func GenerateConfigMapFromDir(dir string, name string, namespace string, skipFiles []string) (*corev1.ConfigMap, error) { - files, err := os.ReadDir(dir) - if err != nil { - return nil, fmt.Errorf("failed to read directory: %w", err) - } - - data := make(map[string]string) - - for _, file := range files { - if file.IsDir() { - continue - } - - if slices.Contains(skipFiles, file.Name()) { - continue - } - - filePath := filepath.Join(dir, file.Name()) - content, err := os.ReadFile(filePath) - if err != nil { - return nil, fmt.Errorf("failed to read file %s: %w", filePath, err) - } - - // Skip binary files - if isBinaryFile(content) { - logrus.Warnf("Skipping binary file: %s", file.Name()) - continue - } - data[file.Name()] = string(content) - } - - if len(data) == 0 { - return nil, nil - } - - configMap := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - Data: data, - } - - return configMap, nil -} - type SecretVolume struct { Name string SecretName string diff --git a/pkg/tui/list/pod/select.go b/pkg/tui/list/pod/select.go index f62a33e..e944719 100644 --- a/pkg/tui/list/pod/select.go +++ b/pkg/tui/list/pod/select.go @@ -148,7 +148,7 @@ func runSelectAndDoAction(_ context.Context, _ k8s.KubernetesClients, state *tui var ( viewLogsAction runAction = "View logs" - monitorAction runAction = "Monitor" + monitorAction runAction = "Monitor GPUs" commandAction runAction = "Run command" ) diff --git a/pkg/tui/list/workload/delete.go b/pkg/tui/list/workload/delete.go index 8229172..677bf63 100644 --- a/pkg/tui/list/workload/delete.go +++ b/pkg/tui/list/workload/delete.go @@ -32,7 +32,9 @@ import ( func runDeleteWorkload(ctx context.Context, clients k8s.KubernetesClients, state *tuicomponents.RunState) (tuicomponents.StepResult, tuicomponents.RunStep[tuicomponents.RunState], error) { confirmDelete := false - resourceDescription := fmt.Sprintf("Confirm that you want to delete the %s workload '%s' in namespace %s", state.WorkloadType, state.WorkloadReference.GetName(), state.Namespace) + resourceDescription := fmt.Sprintf("Confirm that you want to delete the %s workload '%s' in namespace %s. "+ + "This will also remove any linked resources, such as automatically created PVCs and ConfigMaps", + state.WorkloadType, state.WorkloadReference.GetName(), state.Namespace) f := huh.NewForm(huh.NewGroup(huh.NewConfirm().Title(resourceDescription).Value(&confirmDelete))) diff --git a/pkg/workloads/apply.go b/pkg/workloads/apply.go index 7dda876..4e747c5 100644 --- a/pkg/workloads/apply.go +++ b/pkg/workloads/apply.go @@ -15,11 +15,13 @@ package workloads import ( + "bytes" "context" "errors" "fmt" "os" "path" + "slices" "strings" "text/template" @@ -49,8 +51,12 @@ func ApplyWorkload( ) error { var resources []runtime.Object + var namespaceResource *corev1.Namespace + var configMapResource *corev1.ConfigMap + var err error + if execFlags.CreateNamespace { - namespaceResource, err := generateNamespaceManifestIfNotExists(ctx, k8sClient, templateContext.Meta.Namespace) + namespaceResource, err = generateNamespaceManifestIfNotExists(ctx, k8sClient, templateContext.Meta.Namespace) if err != nil { return fmt.Errorf("failed to generate namespace resource: %w", err) } @@ -60,7 +66,7 @@ func ApplyWorkload( } if execFlags.Path != "" { - configMapResource, err := generateConfigMapManifest(execFlags.Path, workload, templateContext.Meta) + configMapResource, err = generateConfigMapManifest(execFlags.WorkloadFiles, workload, templateContext.Meta) if err != nil { return fmt.Errorf("failed to generate configmap resource: %w", err) } @@ -68,6 +74,8 @@ func ApplyWorkload( resources = append(resources, configMapResource) templateContext.Meta.HasConfigMap = true } + } else if execFlags.OverlayPath != "" { + return fmt.Errorf("overlay path set without setting the path") } workloadTemplate, err := getWorkloadTemplate(execFlags, workload) @@ -75,14 +83,17 @@ func ApplyWorkload( return fmt.Errorf("failed to get workload template: %w", err) } - templateResources, err := generateManifests(k8sClient, workloadTemplate, templateContext, workload) + workloadResource, err := generateWorkloadManifest(workloadTemplate, templateContext, workload) if err != nil { - return fmt.Errorf("Check workload type. Failed to generate manifests: %w", err) + return fmt.Errorf("check workload type, failed to generate manifests: %w", err) } - if len(templateResources) == 0 { - return fmt.Errorf("failed to generate manifests: no resources found") + + additionalWorkloadManifests, err := workload.GenerateAdditionalResourceManifests(k8sClient, templateContext) + if err != nil { + return fmt.Errorf("failed to generate additional resource manifests: %w", err) } - resources = append(resources, templateResources...) + resources = append(resources, workloadResource) + resources = append(resources, additionalWorkloadManifests...) s, err := k8s.GetScheme() if err != nil { @@ -91,11 +102,42 @@ func ApplyWorkload( if execFlags.DryRun { printResources(&s, resources) - } else { - if err := applyResources(resources, ctx, k8sClient); err != nil { - return fmt.Errorf("failed to apply resources: %w", err) + return nil + } + + if err := applyResources(resources, ctx, k8sClient); err != nil { + return fmt.Errorf("failed to apply resources: %w", err) + } + + scheme, err := k8s.GetScheme() + if err != nil { + return fmt.Errorf("failed to get k8s scheme: %w", err) + } + + if configMapResource != nil { + logrus.Debug("Config map is set, linking it to the workload") + + owner := workloadResource.DeepCopyObject().(client.Object) + err := k8sClient.Get(ctx, client.ObjectKey{Name: owner.GetName(), Namespace: owner.GetNamespace()}, owner) + if err != nil { + return fmt.Errorf("failed to fetch owner resource %s/%s: %w", owner.GetNamespace(), owner.GetName(), err) + } + + // Ensure the UID is available + if owner.GetUID() == "" { + return fmt.Errorf("owner resource %s/%s has no valid UID", owner.GetNamespace(), owner.GetName()) + } + workloadResource = owner + } + + // Attach config map and PVC to the workload, if they are defined + if configMapResource != nil { + logrus.Debug("Updating the config map's owner reference") + if err := updateOwnerReference(ctx, k8sClient, configMapResource, workloadResource, &scheme); err != nil { + return fmt.Errorf("failed to update owner reference of config map: %w", err) } } + return nil } @@ -161,20 +203,59 @@ func generateNamespaceManifestIfNotExists( }, nil } +func isBinaryFile(content []byte) bool { + return bytes.Contains(content, []byte{0}) +} + // generateConfigMapManifest adds a config map resource -func generateConfigMapManifest(path string, workload Workload, metaConfig MetaFlags) (*corev1.ConfigMap, error) { - configMap, err := k8s.GenerateConfigMapFromDir(path, metaConfig.Name, metaConfig.Namespace, workload.IgnoreFiles()) - if err != nil { - return nil, fmt.Errorf("failed to generate ConfigMap: %w", err) +func generateConfigMapManifest(files map[string]string, workload Workload, metaConfig MetaFlags) (*corev1.ConfigMap, error) { + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: metaConfig.Name, + Namespace: metaConfig.Namespace, + }, + Data: map[string]string{}, + } + + skipFiles := workload.IgnoreFiles() + + for fileName, filePath := range files { + if slices.Contains(skipFiles, fileName) { + continue + } + + info, err := os.Stat(filePath) + if err != nil { + return nil, fmt.Errorf("failed to stat file %s: %w", filePath, err) + } + + if info.Size() > 950e3 { + logrus.Warnf("Skipping file %s in %s as it is too large", fileName, filePath) + continue + } + + content, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read file %s: %w", filePath, err) + } + + // Skip binary files + if isBinaryFile(content) { + logrus.Warnf("Skipping binary file %s in %s", fileName, filePath) + continue + } + configMap.Data[fileName] = string(content) } - if configMap != nil { + + if len(configMap.Data) > 0 { return configMap, nil } + return nil, nil } -// generateManifests prepares a list of Kubernetes manifests to apply -func generateManifests(k8sClient client.Client, workloadTemplate []byte, templateContext WorkloadTemplateConfig, workload Workload) ([]runtime.Object, error) { +// generateWorkloadManifest prepares the main workload manifest +func generateWorkloadManifest(workloadTemplate []byte, templateContext WorkloadTemplateConfig, workload Workload) (client.Object, error) { parsedTemplate, err := template.New("main").Funcs(sprig.TxtFuncMap()).Parse(string(workloadTemplate)) if err != nil { return nil, fmt.Errorf("failed to parse template: %w", err) @@ -203,18 +284,13 @@ func generateManifests(k8sClient client.Client, workloadTemplate []byte, templat return nil, fmt.Errorf("failed to convert manifest, ensure it is of the correct type") } - additionalWorkloadManifests, err := workload.GenerateAdditionalResourceManifests(k8sClient, templateContext) - if err != nil { - return nil, fmt.Errorf("failed to generate additional resource manifests: %w", err) - } - - return append(additionalWorkloadManifests, []runtime.Object{converted}...), nil + return converted, nil } // printResources prints each Kubernetes manifest in an array func printResources(s *runtime.Scheme, resources []runtime.Object) { - for _, resource := range resources { - clientObject := resource.(client.Object) + for _, resource_ := range resources { + clientObject := resource_.(client.Object) cleanedResource, err := k8s.MinimalizeAndConvertToYAML(s, clientObject) if err != nil { @@ -229,11 +305,11 @@ func printResources(s *runtime.Scheme, resources []runtime.Object) { // applyResources applies (creates or updates if possible) each Kubernetes object within an array func applyResources(resources []runtime.Object, ctx context.Context, k8sClient client.Client) error { - for _, resource := range resources { + for _, resource_ := range resources { // Ensure the resource implements client.Object - obj, ok := resource.(client.Object) + obj, ok := resource_.(client.Object) if !ok { - return fmt.Errorf("resource does not implement client.Object: %T", resource) + return fmt.Errorf("resource does not implement client.Object: %T", resource_) } // Access metadata for logging @@ -242,7 +318,7 @@ func applyResources(resources []runtime.Object, ctx context.Context, k8sClient c return fmt.Errorf("failed to access metadata for resource: %w", err) } - logrus.Debugf("Applying resource %T: %s/%s", resource, objMeta.GetNamespace(), objMeta.GetName()) + logrus.Debugf("Applying resource %T: %s/%s", resource_, objMeta.GetNamespace(), objMeta.GetName()) // Check if the resource exists key := client.ObjectKey{ @@ -250,7 +326,7 @@ func applyResources(resources []runtime.Object, ctx context.Context, k8sClient c Name: objMeta.GetName(), } - existing := resource.DeepCopyObject().(client.Object) + existing := resource_.DeepCopyObject().(client.Object) err = k8sClient.Get(ctx, key, existing) @@ -267,26 +343,53 @@ func applyResources(resources []runtime.Object, ctx context.Context, k8sClient c return fmt.Errorf("failed to create resource %s/%s: %w", objMeta.GetNamespace(), objMeta.GetName(), err) } - logrus.Infof("resource %s/%s created successfully", objMeta.GetNamespace(), objMeta.GetName()) + logrus.Infof("resource %T: %s/%s created successfully", resource_, objMeta.GetNamespace(), objMeta.GetName()) + + } + logrus.Info("To monitor and manage your workloads interactively, run $ kaiwo list -n mynamespace") + + return nil +} + +func updateOwnerReference(ctx context.Context, k8sClient client.Client, dependent client.Object, owner client.Object, scheme *runtime.Scheme) error { + // Fetch the latest version of the dependent object (PVC or Namespace) + existing := dependent.DeepCopyObject().(client.Object) + err := k8sClient.Get(ctx, client.ObjectKey{Name: existing.GetName(), Namespace: existing.GetNamespace()}, existing) + if err != nil { + return fmt.Errorf("failed to fetch existing resource %s/%s: %w", existing.GetNamespace(), existing.GetName(), err) + } - continue + gvk := owner.GetObjectKind().GroupVersionKind() + if gvk.Empty() { + // Fetch GVK from the scheme if not set + gvks, _, err := scheme.ObjectKinds(owner) + if err != nil || len(gvks) == 0 { + return fmt.Errorf("failed to determine GVK for owner: %w", err) + } + gvk = gvks[0] // Use the first GVK found + } - // TODO: Rethink update logic which now fails with "immutable field" errors - // Resource already exists, update it - // existing, err := c.Resource(gvr).Namespace(namespace).Get(ctx, resource.GetName(), metav1.GetOptions{}) - // if err != nil { - // return fmt.Errorf("failed to get existing %s/%s: %w", resource.GetKind(), resource.GetName(), err) - // } + // Set OwnerReference + ownerRef := metav1.OwnerReference{ + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, + Name: owner.GetName(), + UID: owner.GetUID(), + Controller: boolPtr(true), + BlockOwnerDeletion: boolPtr(true), + } - // resource.SetResourceVersion(existing.GetResourceVersion()) - // _, err = c.Resource(gvr).Namespace(namespace).Update(ctx, resource, metav1.UpdateOptions{}) - // if err != nil { - // return fmt.Errorf("failed to update %s/%s: %w", resource.GetKind(), resource.GetName(), err) - // } + existing.SetOwnerReferences([]metav1.OwnerReference{ownerRef}) - // logrus.Infof("%s/%s updated successfully", resource.GetKind(), resource.GetName()) + // Update the dependent resource with new OwnerReference + err = k8sClient.Update(ctx, existing) + if err != nil { + return fmt.Errorf("failed to update owner reference for %s/%s: %w", existing.GetNamespace(), existing.GetName(), err) } - logrus.Info("To monitor and manage your workloads interactively, run $ kaiwo list -n mynamespace") + logrus.Debugf("Updated OwnerReference for %s/%s\n", existing.GetNamespace(), existing.GetName()) return nil } + +// Helper function for boolean pointer +func boolPtr(b bool) *bool { return &b } diff --git a/pkg/workloads/config.go b/pkg/workloads/config.go index b2e0b12..2071c5e 100644 --- a/pkg/workloads/config.go +++ b/pkg/workloads/config.go @@ -21,9 +21,12 @@ import ( ) const ( - KaiwoconfigFilename = "kaiwoconfig" - EnvFilename = "env" - KaiwoUsernameLabel = "kaiwo-cli/username" + KaiwoconfigFilename = "kaiwoconfig" + EnvFilename = "env" + KaiwoUsernameLabel = "kaiwo-cli/username" + KaiwoDefaultStorageClassNameLabel = "kaiwo-cli/default-storage-class-name" + KaiwoDefaultStorageQuantityLabel = "kaiwo-cli/default-storage-quantity" + CustomTemplateValuesFilename = "custom-template-values.yaml" ) // WorkloadTemplateConfig is the config context that is passed to the workload templates @@ -61,6 +64,13 @@ type SchedulingFlags struct { // CalculatedNumReplicas refers to the number of replicas, calculated from the available GPUs per node CalculatedNumReplicas int + + Storage *StorageSchedulingFlags +} + +type StorageSchedulingFlags struct { + Quantity string + StorageClassName string } // MetaFlags contain flags that are shared by all workloads @@ -105,12 +115,13 @@ type ExecFlags struct { // Path to workload folder Path string + // OverlayPath contains specific files that override files in Path + OverlayPath string + // The key used to store the GPU count per node in the resource flavor ResourceFlavorGpuNodeLabelKey string - // Path to custom config file - CustomConfigPath string - - // Path to env file if not in Path - EnvFilePath string + // WorkloadFiles list the files that are considered to be part of the workload after merging Path and OverlayPath + // The map is from the workload path (how the workload would see it) to the true relative path (how the CLI client sees it) + WorkloadFiles map[string]string } diff --git a/pkg/workloads/deployments/deployment.go b/pkg/workloads/deployments/deployment.go index ac0e667..8c07add 100644 --- a/pkg/workloads/deployments/deployment.go +++ b/pkg/workloads/deployments/deployment.go @@ -19,7 +19,6 @@ import ( _ "embed" "fmt" "os" - "path/filepath" "strings" "github.com/sirupsen/logrus" @@ -43,7 +42,7 @@ type DeploymentFlags struct { } func (deployment Deployment) GenerateTemplateContext(execFlags workloads.ExecFlags) (any, error) { - contents, err := os.ReadFile(filepath.Join(execFlags.Path, EntrypointFilename)) + contents, err := os.ReadFile(execFlags.WorkloadFiles[EntrypointFilename]) if err != nil { if os.IsNotExist(err) { logrus.Warnln("No entrypoint file found. Expecting entrypoint in image") @@ -62,7 +61,7 @@ func (deployment Deployment) GenerateTemplateContext(execFlags workloads.ExecFla return DeploymentFlags{Entrypoint: entrypoint}, nil } -func (deployment Deployment) ConvertObject(object runtime.Object) (runtime.Object, bool) { +func (deployment Deployment) ConvertObject(object runtime.Object) (client.Object, bool) { obj, ok := object.(*appsv1.Deployment) return obj, ok } diff --git a/pkg/workloads/deployments/deployment.yaml.tmpl b/pkg/workloads/deployments/deployment.yaml.tmpl index 7aa1a26..9bd3af4 100644 --- a/pkg/workloads/deployments/deployment.yaml.tmpl +++ b/pkg/workloads/deployments/deployment.yaml.tmpl @@ -31,6 +31,8 @@ spec: - {{ .Workload.Entrypoint }} {{- end }} env: + - name: HF_HOME + value: /workload/.cache/huggingface {{- if .Meta.EnvVars }} {{- range .Meta.EnvVars }} {{- if .Value }} @@ -59,6 +61,8 @@ spec: amd.com/gpu: "{{ .Scheduling.TotalRequestedGPUs }}" {{ end }} volumeMounts: + - mountPath: /workload + name: {{ .Meta.Name }}-main {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -67,12 +71,28 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - mountPath: /workload - name: workload + - mountPath: /workload/mounted + name: workload-mount {{- end }} - mountPath: /dev/shm name: dshm volumes: + {{- if .Scheduling.Storage }} + - name: {{ .Meta.Name }}-main + ephemeral: + volumeClaimTemplate: + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: {{ .Scheduling.Storage.StorageClassName }} + resources: + requests: + storage: {{ .Scheduling.Storage.Quantity }} + {{- else }} + - name: {{ .Meta.Name }}-main + emptyDir: + medium: Memory + sizeLimit: 10Mi + {{- end }} {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -84,7 +104,7 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - name: workload + - name: workload-mount configMap: name: {{ .Meta.Name }} {{- end }} diff --git a/pkg/workloads/jobs/job.go b/pkg/workloads/jobs/job.go index e0af869..9ef0b8b 100644 --- a/pkg/workloads/jobs/job.go +++ b/pkg/workloads/jobs/job.go @@ -19,7 +19,6 @@ import ( _ "embed" "fmt" "os" - "path/filepath" "strings" "github.com/sirupsen/logrus" @@ -43,7 +42,7 @@ type JobFlags struct { } func (job Job) GenerateTemplateContext(execFlags workloads.ExecFlags) (any, error) { - contents, err := os.ReadFile(filepath.Join(execFlags.Path, EntrypointFilename)) + contents, err := os.ReadFile(execFlags.WorkloadFiles[EntrypointFilename]) if err != nil { if os.IsNotExist(err) { logrus.Warnln("No entrypoint file found. Expecting entrypoint in image") @@ -68,7 +67,7 @@ func (job Job) DefaultTemplate() ([]byte, error) { return JobTemplate, nil } -func (job Job) ConvertObject(object runtime.Object) (runtime.Object, bool) { +func (job Job) ConvertObject(object runtime.Object) (client.Object, bool) { obj, ok := object.(*batchv1.Job) return obj, ok } diff --git a/pkg/workloads/jobs/job.yaml.tmpl b/pkg/workloads/jobs/job.yaml.tmpl index 02d25f3..f503782 100644 --- a/pkg/workloads/jobs/job.yaml.tmpl +++ b/pkg/workloads/jobs/job.yaml.tmpl @@ -26,6 +26,8 @@ spec: - {{ .Workload.Entrypoint }} {{- end }} env: + - name: HF_HOME + value: /workload/.cache/huggingface {{- if .Meta.EnvVars }} {{- range .Meta.EnvVars }} {{- if .Value }} @@ -50,6 +52,8 @@ spec: cpu: "{{ mul .Scheduling.TotalRequestedGPUs 4 }}" amd.com/gpu: "{{ .Scheduling.TotalRequestedGPUs }}" volumeMounts: + - mountPath: /workload + name: {{ .Meta.Name }}-main {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -58,12 +62,28 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - mountPath: /workload - name: workload + - mountPath: /workload/mounted + name: workload-mount {{- end }} - mountPath: /dev/shm name: dshm volumes: + {{- if .Scheduling.Storage }} + - name: {{ .Meta.Name }}-main + ephemeral: + volumeClaimTemplate: + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: {{ .Scheduling.Storage.StorageClassName }} + resources: + requests: + storage: {{ .Scheduling.Storage.Quantity }} + {{- else }} + - name: {{ .Meta.Name }}-main + emptyDir: + medium: Memory + sizeLimit: 10Mi + {{- end }} {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -75,12 +95,11 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - name: workload + - name: workload-mount configMap: name: {{ .Meta.Name }} {{- end }} - name: dshm emptyDir: medium: Memory - sizeLimit: 200Gi - + sizeLimit: 200Gi \ No newline at end of file diff --git a/pkg/workloads/ray/deployment.go b/pkg/workloads/ray/deployment.go index 010b847..0f81eca 100644 --- a/pkg/workloads/ray/deployment.go +++ b/pkg/workloads/ray/deployment.go @@ -19,7 +19,6 @@ import ( _ "embed" "fmt" "os" - "path/filepath" "strings" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" @@ -43,9 +42,7 @@ var DeploymentTemplate []byte const ServeconfigFilename = "serveconfig" func (deployment Deployment) GenerateTemplateContext(execFlags workloads.ExecFlags) (any, error) { - logrus.Debugf("Loading ray service from %s", execFlags.Path) - - contents, err := os.ReadFile(filepath.Join(execFlags.Path, ServeconfigFilename)) + contents, err := os.ReadFile(execFlags.WorkloadFiles[ServeconfigFilename]) if err != nil { return nil, fmt.Errorf("failed to read serveconfig file: %w", err) } @@ -53,7 +50,36 @@ func (deployment Deployment) GenerateTemplateContext(execFlags workloads.ExecFla return DeploymentFlags{Serveconfig: strings.TrimSpace(string(contents))}, nil } -func (deployment Deployment) ConvertObject(object runtime.Object) (runtime.Object, bool) { +//func (deployment Deployment) BuildObject(flags workloads.WorkloadTemplateConfig) (client.Object, error) { +// obj := &rayv1.RayService{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: flags.Meta.Name, +// Namespace: flags.Meta.Namespace, +// Labels: map[string]string{ +// "kaiwo-cli/username": flags.Meta.User, +// }, +// }, +// Spec: rayv1.RayServiceSpec{ +// ServeConfigV2: "", +// RayClusterSpec: rayv1.RayClusterSpec{ +// // EnableInTreeAutoscaling: true, +// HeadGroupSpec: rayv1.HeadGroupSpec{ +// RayStartParams: map[string]string{ +// "dashboard-host": "0.0.0.0", +// }, +// Template: corev1.PodTemplateSpec{}, +// }, +// }, +// }, +// } +// return obj, nil +//} +// +//func buildPodSpec(metaFlags workloads.MetaFlags, ports []corev1.ContainerPort) corev1.PodSpec { +// return corev1.PodSpec{} +//} + +func (deployment Deployment) ConvertObject(object runtime.Object) (client.Object, bool) { obj, ok := object.(*rayv1.RayService) return obj, ok diff --git a/pkg/workloads/ray/deployment.yaml.tmpl b/pkg/workloads/ray/deployment.yaml.tmpl index e5dc9ca..76a2eca 100644 --- a/pkg/workloads/ray/deployment.yaml.tmpl +++ b/pkg/workloads/ray/deployment.yaml.tmpl @@ -24,6 +24,8 @@ spec: image: {{ .Meta.Image }} imagePullPolicy: Always env: + - name: HF_HOME + value: /workload/.cache/huggingface {{- if .Meta.EnvVars }} {{- range .Meta.EnvVars }} {{- if .Value }} @@ -46,6 +48,8 @@ spec: cpu: "2" memory: "16Gi" volumeMounts: + - mountPath: /workload + name: {{ .Meta.Name }}-main {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -54,8 +58,8 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - mountPath: /workload/app - name: workload + - mountPath: /workload/mounted + name: workload-mount {{- end }} - mountPath: /dev/shm name: dshm @@ -69,6 +73,22 @@ spec: - containerPort: 8000 name: serve volumes: + {{- if .Scheduling.Storage }} + - name: {{ .Meta.Name }}-main + ephemeral: + volumeClaimTemplate: + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: {{ .Scheduling.Storage.StorageClassName }} + resources: + requests: + storage: 10Mi + {{- else }} + - name: {{ .Meta.Name }}-main + emptyDir: + medium: Memory + sizeLimit: 10Mi + {{- end }} {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -80,7 +100,7 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - name: workload + - name: workload-mount configMap: name: {{ .Meta.Name }} {{- end }} @@ -109,6 +129,8 @@ spec: exec: command: ["/bin/sh", "-c", "ray stop"] env: + - name: HF_HOME + value: /workload/.cache/huggingface {{- if .Meta.EnvVars }} {{- range .Meta.EnvVars }} {{- if .Value }} @@ -133,6 +155,8 @@ spec: memory: "{{ mul .Scheduling.CalculatedGPUsPerReplica 32 }}Gi" amd.com/gpu: "{{ .Scheduling.CalculatedGPUsPerReplica }}" volumeMounts: + - mountPath: /workload + name: {{ .Meta.Name }}-main {{- if .Meta.SecretVolumes }} {{- range .SecretVolumes }} - name: {{ .Name }} @@ -141,12 +165,28 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - mountPath: /workload/app - name: workload + - mountPath: /workload/mounted + name: workload-mount {{- end }} - mountPath: /dev/shm name: dshm volumes: + {{- if .Scheduling.Storage }} + - name: {{ .Meta.Name }}-main + ephemeral: + volumeClaimTemplate: + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: {{ .Scheduling.Storage.StorageClassName }} + resources: + requests: + storage: {{ .Scheduling.Storage.Quantity }} + {{- else }} + - name: {{ .Meta.Name }}-main + emptyDir: + medium: Memory + sizeLimit: 10Mi + {{- end }} {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -158,7 +198,7 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - name: workload + - name: workload-mount configMap: name: {{ .Meta.Name }} {{- end }} diff --git a/pkg/workloads/ray/job.go b/pkg/workloads/ray/job.go index c986013..be35bd4 100644 --- a/pkg/workloads/ray/job.go +++ b/pkg/workloads/ray/job.go @@ -19,12 +19,12 @@ import ( _ "embed" "fmt" "os" - "path/filepath" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/silogen/kaiwo/pkg/workloads" @@ -42,7 +42,7 @@ type JobFlags struct { } func (job Job) GenerateTemplateContext(execFlags workloads.ExecFlags) (any, error) { - contents, err := os.ReadFile(filepath.Join(execFlags.Path, EntrypointFilename)) + contents, err := os.ReadFile(execFlags.WorkloadFiles[EntrypointFilename]) if err != nil { return nil, fmt.Errorf("failed to read entrypoint file: %w", err) } @@ -57,7 +57,7 @@ func (job Job) DefaultTemplate() ([]byte, error) { return JobTemplate, nil } -func (job Job) ConvertObject(object runtime.Object) (runtime.Object, bool) { +func (job Job) ConvertObject(object runtime.Object) (client.Object, bool) { obj, ok := object.(*rayv1.RayJob) return obj, ok } diff --git a/pkg/workloads/ray/job.yaml.tmpl b/pkg/workloads/ray/job.yaml.tmpl index c20156c..ccb0ec4 100644 --- a/pkg/workloads/ray/job.yaml.tmpl +++ b/pkg/workloads/ray/job.yaml.tmpl @@ -17,6 +17,10 @@ spec: rayStartParams: {} template: spec: + securityContext: + runAsUser: 1000 + runAsGroup: 1000 + fsGroup: 1000 {{- if .Meta.ImagePullSecret }} imagePullSecrets: - name: {{ .Meta.ImagePullSecret }} @@ -26,6 +30,8 @@ spec: image: {{ .Meta.Image }} imagePullPolicy: Always env: + - name: HF_HOME + value: /workload/.cache/huggingface {{- if .Meta.EnvVars }} {{- range .Meta.EnvVars }} {{- if .Value }} @@ -55,6 +61,8 @@ spec: - containerPort: 10001 name: client volumeMounts: + - mountPath: /workload + name: {{ .Meta.Name }}-main {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -63,12 +71,28 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - mountPath: /workload - name: workload + - mountPath: /workload/mounted + name: workload-mount {{- end }} - mountPath: /dev/shm name: dshm volumes: + {{- if .Scheduling.Storage }} + - name: {{ .Meta.Name }}-main + ephemeral: + volumeClaimTemplate: + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: {{ .Scheduling.Storage.StorageClassName }} + resources: + requests: + storage: 10Mi + {{- else }} + - name: {{ .Meta.Name }}-main + emptyDir: + medium: Memory + sizeLimit: 10Mi + {{- end }} {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -80,7 +104,7 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - name: workload + - name: workload-mount configMap: name: {{ .Meta.Name }} {{- end }} @@ -97,6 +121,10 @@ spec: rayStartParams: {} template: spec: + securityContext: + runAsUser: 1000 + runAsGroup: 1000 + fsGroup: 1000 {{- if .Meta.ImagePullSecret }} imagePullSecrets: - name: {{ .Meta.ImagePullSecret }} @@ -106,6 +134,8 @@ spec: image: {{ .Meta.Image }} imagePullPolicy: Always env: + - name: HF_HOME + value: /workload/.cache/huggingface {{- if .Meta.EnvVars }} {{- range .Meta.EnvVars }} {{- if .Value }} @@ -137,6 +167,8 @@ spec: memory: "{{ mul .Scheduling.CalculatedGPUsPerReplica 32 }}Gi" amd.com/gpu: "{{ .Scheduling.CalculatedGPUsPerReplica }}" volumeMounts: + - mountPath: /workload + name: {{ .Meta.Name }}-main {{- if .Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -145,12 +177,28 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - mountPath: /workload - name: workload + - mountPath: /workload/mounted + name: workload-mount {{- end }} - mountPath: /dev/shm name: dshm volumes: + {{- if .Scheduling.Storage }} + - name: {{ .Meta.Name }}-main + ephemeral: + volumeClaimTemplate: + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: {{ .Scheduling.Storage.StorageClassName }} + resources: + requests: + storage: {{ .Scheduling.Storage.Quantity }} + {{- else }} + - name: {{ .Meta.Name }}-main + emptyDir: + medium: Memory + sizeLimit: 10Mi + {{- end }} {{- if.Meta.SecretVolumes }} {{- range .Meta.SecretVolumes }} - name: {{ .Name }} @@ -162,7 +210,7 @@ spec: {{- end }} {{- end }} {{- if .Meta.HasConfigMap }} - - name: workload + - name: workload-mount configMap: name: {{ .Meta.Name }} {{- end }} diff --git a/pkg/workloads/workload.go b/pkg/workloads/workload.go index f803a62..dd88740 100644 --- a/pkg/workloads/workload.go +++ b/pkg/workloads/workload.go @@ -33,7 +33,7 @@ type Workload interface { // DefaultTemplate returns a default template to use for this workload DefaultTemplate() ([]byte, error) - ConvertObject(object runtime.Object) (runtime.Object, bool) + ConvertObject(object runtime.Object) (client.Object, bool) // IgnoreFiles lists the files that should be ignored in the ConfigMap IgnoreFiles() []string diff --git a/workloads/inference/LLMs/offline-inference/vllm-batch-single-multinode/entrypoint b/workloads/inference/LLMs/offline-inference/vllm-batch-single-multinode/entrypoint index 7a84f97..f20e898 100644 --- a/workloads/inference/LLMs/offline-inference/vllm-batch-single-multinode/entrypoint +++ b/workloads/inference/LLMs/offline-inference/vllm-batch-single-multinode/entrypoint @@ -1 +1 @@ -python main.py \ No newline at end of file +python mounted/main.py \ No newline at end of file diff --git a/workloads/inference/LLMs/online-inference/vllm-online-single-multinode/serveconfig b/workloads/inference/LLMs/online-inference/vllm-online-single-multinode/serveconfig index 074db37..289a8b8 100644 --- a/workloads/inference/LLMs/online-inference/vllm-online-single-multinode/serveconfig +++ b/workloads/inference/LLMs/online-inference/vllm-online-single-multinode/serveconfig @@ -1,7 +1,7 @@ applications: - name: llm route_prefix: / - import_path: app:deployment + import_path: mounted:deployment deployments: - name: VLLMDeployment autoscaling_config: diff --git a/workloads/training/LLMs/full-parameter-pretraining/full-param-zero3-single-multinode/entrypoint b/workloads/training/LLMs/full-parameter-pretraining/full-param-zero3-single-multinode/entrypoint index ce82531..77e60f8 100644 --- a/workloads/training/LLMs/full-parameter-pretraining/full-param-zero3-single-multinode/entrypoint +++ b/workloads/training/LLMs/full-parameter-pretraining/full-param-zero3-single-multinode/entrypoint @@ -1,6 +1,6 @@ -python main.py +python mounted/main.py --model-name=meta-llama/Llama-3.1-8B-Instruct ---ds-config=./zero_3_offload_optim_param.json +--ds-config=./mounted/zero_3_offload_optim_param.json --bucket=silogen-dev-ray --num-epochs=2 --num-devices=$NUM_GPUS diff --git a/workloads/training/LLMs/lora-supervised-finetuning/lora-sft-zero3-single-multinode/entrypoint b/workloads/training/LLMs/lora-supervised-finetuning/lora-sft-zero3-single-multinode/entrypoint index 64443b9..a7cdcc7 100644 --- a/workloads/training/LLMs/lora-supervised-finetuning/lora-sft-zero3-single-multinode/entrypoint +++ b/workloads/training/LLMs/lora-supervised-finetuning/lora-sft-zero3-single-multinode/entrypoint @@ -1,6 +1,7 @@ -python main.py +python mounted/main.py --model-name=meta-llama/Llama-3.1-8B-Instruct ---ds-config=./zero_3_offload_optim_param.json +--ds-config=./mounted/zero_3_offload_optim_param.json +--lora-config=./mounted/lora-llama.json --bucket=silogen-dev-ray --num-epochs=2 --lora