Skip to content

Commit

Permalink
Workload pvcs (#72)
Browse files Browse the repository at this point in the history
* pvc support for workloads, owner references for deletion propagation

* fixed warnings

* minor refactoring

* minor refactoring

* log level update

* missing : in templates

* add storage by default, override to remove

* add storage class name to pvc only if specified

* error if storageclass not specified and default does not exist

* fix volume mount conflict

* fix entrypoint files

* fix verbosity level for submit and serve commands

* default storage class and amount in namespace labels, docs update

* fix linter error

* pr fixes

* more PR review changes

* And more PR changes

* try volumeclaimtemplates in rayjob

* update workloads to use ephemeral storage via storage class and volume claim templates

---------

Co-authored-by: Antti-Ville Suni <[email protected]>
Co-authored-by: AV Suni <[email protected]>
  • Loading branch information
3 people authored Feb 4, 2025
1 parent 46a8118 commit fd0cc4c
Show file tree
Hide file tree
Showing 25 changed files with 494 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Please delete options that are not relevant.
# Checklist:

- [ ] My code follows the style guidelines of this project. See [contributing-guidelines.md](./../contributing-guidelines.md)
- [ ] Existing workload examples run after my changes (if applicable)
- [ ] Existing workload examples run to completion after my changes (if applicable)
- [ ] I have performed a self-review of my code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
Expand Down
20 changes: 10 additions & 10 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}",
"args": ["submit", "-p", "workloads/training/LLMs/lora-supervised-finetuning/lora-sft-zero3-single-multinode", "--ray", "-g", "4", "--dry-run"],
"program": "${workspaceFolder}/cmd/cli/main.go",
"args": ["submit", "-p", "${workspaceFolder}/workloads/training/LLMs/lora-supervised-finetuning/lora-sft-zero3-single-multinode", "--ray", "-g", "4", "--dry-run", "--storage=100Gi,longhorn"],
"env": {
"GO111MODULE": "on"
},
Expand All @@ -18,8 +18,8 @@
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}",
"args": ["serve", "-p", "workloads/inference/LLMs/online-inference/vllm-online-single-multinode", "--ray", "--replicas", "1", "--gpus-per-replica", "4", "--dry-run"],
"program": "${workspaceFolder}/cmd/cli/main.go",
"args": ["serve", "-p", "${workspaceFolder}/workloads/inference/LLMs/online-inference/vllm-online-single-multinode", "--ray", "--replicas", "1", "--gpus-per-replica", "4", "--dry-run"],
"env": {
"GO111MODULE": "on"
},
Expand All @@ -30,8 +30,8 @@
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}",
"args": ["submit", "-p", "workloads/training/LLMs/bert/hf-accelerate-bert", "-g", "4", "--dry-run"],
"program": "${workspaceFolder}/cmd/cli/main.go",
"args": ["submit", "-p", "${workspaceFolder}/workloads/training/LLMs/bert/hf-accelerate-bert", "-g", "4", "--dry-run"],
"env": {
"GO111MODULE": "on"
},
Expand All @@ -42,7 +42,7 @@
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}",
"program": "${workspaceFolder}/cmd/cli/main.go",
"args": ["submit", "-i", "ghcr.io/silogen/rocm-ray:v0.4", "-g", "4"],
"env": {
"GO111MODULE": "on"
Expand All @@ -54,8 +54,8 @@
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}",
"args": ["submit", "-p", "workloads/training/LLMs/lora-supervised-finetuning/ds-zero3-single-multinode", "--ray", "-g", "4"],
"program": "${workspaceFolder}/cmd/cli/main.go",
"args": ["submit", "-p", "${workspaceFolder}/workloads/training/LLMs/lora-supervised-finetuning/ds-zero3-single-multinode", "--ray", "-g", "4"],
"env": {
"GO111MODULE": "on"
},
Expand All @@ -66,7 +66,7 @@
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}",
"program": "${workspaceFolder}/cmd/cli/main.go",
"args": ["monitor", "deployment/avsuni-gpu-monitoring", "-n", "av-test"],
"env": {
"GO111MODULE": "on"
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,22 @@ You can access this in the template via
{{ .Custom.parent.child }}
```

### Storage

You can use the Kaiwo CLI to instruct a workload to use storage from a given storage class. If you do not provide any input for the CLI, the following default values are used:

* The storage class name is read from the specified namespace's label `kaiwo-cli/default-storage-class`
* The storage amount is read from the specified namespace's label `kaiwo-cli/default-storage-quantity`

If these values do not exist, an exception is raised. If you are using the cluster-admins examples from this repository, you can modify the namespace at [cluster-admins/kueue/cluster-queue.yaml](cluster-admins/kueue/cluster-queue.yaml) and add these values. If you want to skip adding storage, you must explicitly add the `--no-storage` flag.

To specify storage, you can use the flags:

* `--storage=2Gi` to specify the amount of storage and to use the default storage class name from the namespace labels
* `--storage=2Gi,mystorageclass` to specify both the amount of storage and the storage class name

Note that the storage created is ephemeral and meant for caching, which means that it gets removed when the underlying pods get removed. However, the ephemeral storage is provisioned via a storage class, which ensures that the space requested is available and reserved for all pods before the workload starts.

## Interacting with workloads

While Kaiwo's primary purpose is to deploy workloads, it can also be used as a light tool to discover and interact with running workloads.
Expand Down
4 changes: 1 addition & 3 deletions cluster-admins/kueue/cluster-queue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
spec:
namespaceSelector: {} # match all.
resourceGroups:
- coveredResources: ["cpu", "memory", "amd.com/gpu", "ephemeral-storage"]
- coveredResources: ["cpu", "memory", "amd.com/gpu"]
flavors:
- name: base-gpu-flavour
resources:
Expand All @@ -15,5 +15,3 @@ spec:
nominalQuota: 1800Gi
- name: "amd.com/gpu"
nominalQuota: 16
- name: "ephemeral-storage"
nominalQuota: 2000Gi
97 changes: 89 additions & 8 deletions pkg/cli/apply/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"strconv"
"strings"

"k8s.io/apimachinery/pkg/api/errors"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -84,15 +89,50 @@ func RunApply(workload workloads.Workload, workloadMeta any) error {
}

// Prepare scheduling flags
dynamicClient, err := k8s.GetClient()
clients, err := k8s.GetKubernetesClients()
if err != nil {
return fmt.Errorf("error fetching Kubernetes client: %w", err)
return fmt.Errorf("error getting k8s clients: %w", err)
}

ctx := context.TODO()
schedulingFlags := GetSchedulingFlags()
schedulingFlags, err := GetSchedulingFlags()
if err != nil {
return fmt.Errorf("error getting scheduling flags: %w", err)
}

if schedulingFlags.Storage != nil {
if schedulingFlags.Storage.StorageClassName == "" || schedulingFlags.Storage.Quantity == "" {
logrus.Info("Storage class name and / or quantity not provided, checking namespace labels for defaults")

defaultStorageFlags, err := findDefaultStorageFlags(ctx, *clients.Clientset, metaFlags.Namespace)
if err != nil {
return fmt.Errorf("error checking for storage defaults: %w", err)
}

if schedulingFlags.Storage.StorageClassName == "" {
if defaultStorageFlags.StorageClassName == "" {
return fmt.Errorf("storage requested, but no storage class name provided and no default exists in the namespace '%s' label '%s'", metaFlags.Namespace, workloads.KaiwoDefaultStorageClassNameLabel)
}
schedulingFlags.Storage.StorageClassName = defaultStorageFlags.StorageClassName
}
if schedulingFlags.Storage.Quantity == "" {
if defaultStorageFlags.Quantity == "" {
return fmt.Errorf("storage requested, but no quantity provided and no default exists in the namespace '%s' label '%s'", metaFlags.Namespace, workloads.KaiwoDefaultStorageQuantityLabel)
}
schedulingFlags.Storage.Quantity = defaultStorageFlags.Quantity
}
}

storageClassExists, err := doesStorageClassExist(ctx, *clients.Clientset, schedulingFlags.Storage.StorageClassName)
if err != nil {
return fmt.Errorf("error checking if storage class exists: %w", err)
}
if !storageClassExists {
return fmt.Errorf("storage class '%s' does not exist", schedulingFlags.Storage.StorageClassName)
}
}

if err := fillSchedulingFlags(ctx, dynamicClient, &schedulingFlags, execFlags.ResourceFlavorGpuNodeLabelKey, metaFlags.EnvVars); err != nil {
if err := fillSchedulingFlags(ctx, clients.Client, schedulingFlags, execFlags.ResourceFlavorGpuNodeLabelKey, metaFlags.EnvVars); err != nil {
return fmt.Errorf("error filling scheduling flags: %w", err)
}
logrus.Debugf("Successfully loaded scheduling info from Kubernetes")
Expand All @@ -106,18 +146,59 @@ func RunApply(workload workloads.Workload, workloadMeta any) error {
WorkloadMeta: workloadMeta,
Workload: workloadConfig,
Meta: metaFlags,
Scheduling: schedulingFlags,
Scheduling: *schedulingFlags,
Custom: customConfig,
}

// Apply the workload
if err := workloads.ApplyWorkload(ctx, dynamicClient, workload, execFlags, templateContext); err != nil {
if err := workloads.ApplyWorkload(ctx, clients.Client, workload, execFlags, templateContext); err != nil {
return fmt.Errorf("error applying workload: %w", err)
}

return nil
}

func findDefaultStorageFlags(ctx context.Context, clientset kubernetes.Clientset, namespace string) (*workloads.StorageSchedulingFlags, error) {
namespaceObject, err := clientset.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
logrus.Warnf("Namespace does not exist, cannot check for storage defaults. Either ensure that the namespace exists and has default values, specify the storage class and amount explicitly, or specify --no-storage to skip adding storage.")
return nil, fmt.Errorf("failed to find default storage class or quantity for namespace that does not exist: %s", namespace)
}
return nil, fmt.Errorf("error getting namespace: %w", err)
}

flags := &workloads.StorageSchedulingFlags{}

defaultStorageClassName, ok := namespaceObject.Labels[workloads.KaiwoDefaultStorageClassNameLabel]
if ok {
logrus.Debugf("Default storage class discovered: %s", defaultStorageClassName)
flags.StorageClassName = defaultStorageClassName
} else {
logrus.Debugf("Default storage class not found")
}
defaultStorageQuantity, ok := namespaceObject.Labels[workloads.KaiwoDefaultStorageQuantityLabel]
if ok {
logrus.Debugf("Default storage quantity discovered: %s", defaultStorageQuantity)
flags.Quantity = defaultStorageQuantity
} else {
logrus.Debugf("Default storage quantity not found")
}

return flags, nil
}

func doesStorageClassExist(ctx context.Context, clientset kubernetes.Clientset, storageClassName string) (bool, error) {
_, err := clientset.StorageV1().StorageClasses().Get(ctx, storageClassName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, fmt.Errorf("error getting storage class %s: %w", storageClassName, err)
}
return true, nil
}

// loadCustomConfig loads custom configuration data from a file
func loadCustomConfig(path string) (any, error) {
logrus.Debugln("Loading custom config")
Expand Down Expand Up @@ -196,11 +277,11 @@ func fillSchedulingFlags(

if schedulingFlags.RequestedReplicas > 0 && schedulingFlags.RequestedGPUsPerReplica > 0 {
if schedulingFlags.RequestedGPUsPerReplica > schedulingFlags.GPUsAvailablePerNode {
return fmt.Errorf("You requested %d GPUs per replica, but there are only %d GPUs available per node",
return fmt.Errorf("you requested %d GPUs per replica, but there are only %d GPUs available per node",
schedulingFlags.RequestedGPUsPerReplica, schedulingFlags.GPUsAvailablePerNode)
}
if schedulingFlags.TotalRequestedGPUs > 0 {
return fmt.Errorf("Cannot set requested gpus with --gpus when --replicas and --gpus-per-replica are set")
return fmt.Errorf("cannot set requested gpus with --gpus when --replicas and --gpus-per-replica are set")
}
schedulingFlags.CalculatedNumReplicas = schedulingFlags.RequestedReplicas
schedulingFlags.CalculatedGPUsPerReplica = schedulingFlags.RequestedGPUsPerReplica
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/apply/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func BuildServeCmd() *cobra.Command {
Use: "serve",
Short: "Serve a deployment process",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.Parent().PersistentPreRunE(cmd, args); err != nil {
return err
}
return PreRunLoadConfig(cmd, args)
},
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/apply/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func BuildSubmitCmd() *cobra.Command {
Use: "submit",
Short: "Submit a job",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.Parent().PersistentPreRunE(cmd, args); err != nil {
return err
}
return PreRunLoadConfig(cmd, args)
},
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down
71 changes: 67 additions & 4 deletions pkg/cli/apply/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"fmt"
"os"
"path/filepath"
"strings"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -67,7 +70,7 @@ func GetExecFlags() workloads.ExecFlags {

const (
defaultNamespace = "kaiwo"
defaultImage = "ghcr.io/silogen/rocm-ray:v0.5"
defaultImage = "ghcr.io/silogen/rocm-ray:v0.6"
)

var (
Expand Down Expand Up @@ -103,22 +106,82 @@ var (
gpus int
replicas int
gpusPerReplica int
storage string
noStorage bool
)

// AddSchedulingFlags adds flags related to (Kueue) scheduling
func AddSchedulingFlags(cmd *cobra.Command) {
cmd.Flags().IntVarP(&gpus, "gpus", "g", 0, "Number of GPUs requested for the workload")
cmd.Flags().IntVarP(&replicas, "replicas", "", 0, "Number of replicas requested for the workload")
cmd.Flags().IntVarP(&gpusPerReplica, "gpus-per-replica", "", 0, "Number of GPUs requested per replica")
cmd.Flags().StringVarP(
&storage,
"storage",
"",
"default",
"Storage requested for the workload, use: --storage=storageQuantity,storageClassName, --storage=storageQuantity to use the default storage class, or --storage=default (the default) to use defaults for both storage class and amount. "+
fmt.Sprintf("The default storage class and amount can be configured in the namespace's labels (keys %s and %s). ", workloads.KaiwoDefaultStorageClassNameLabel, workloads.KaiwoDefaultStorageQuantityLabel)+
"If you do not want to include storage, you must pass --no-storage explicitly.",
)
cmd.Flags().BoolVarP(&noStorage, "no-storage", "", false, "Don't use storage for the workload")
}

// GetSchedulingFlags initializes the scheduling flags with the number of GPUs requested
func GetSchedulingFlags() workloads.SchedulingFlags {
return workloads.SchedulingFlags{
func GetSchedulingFlags() (*workloads.SchedulingFlags, error) {
flags := &workloads.SchedulingFlags{
TotalRequestedGPUs: gpus,
RequestedReplicas: replicas,
RequestedGPUsPerReplica: gpusPerReplica,
}

if storage != "default" && noStorage {
return nil, fmt.Errorf("you must specify --storage or --no-storage, not both")
}

if noStorage {
logrus.Info("No storage requested for workload")
return flags, nil
}

if storage == "" {
return nil, fmt.Errorf("you must specify --storage or --no-storage")
}

requestedStorage := ""
storageClassName := ""

if storage != "default" {
split := strings.Split(storage, ",")

if len(split) > 2 {
return nil, fmt.Errorf("invalid storage specifier %s", storage)
}
if len(split) > 1 {
storageClassName = split[1]
logrus.Infof("Requested storage class name %s", storageClassName)
} else {
logrus.Info("You did not pass a storage class name, the default storage class will be used if it exists")
}
if len(split) > 0 {
requestedStorage = split[0]

if _, err := resource.ParseQuantity(requestedStorage); err != nil {
return nil, fmt.Errorf("invalid storage quantity %s", requestedStorage)
}

logrus.Infof("Requested storage %s", requestedStorage)
} else {
logrus.Infof("You did not pass a storage quantity, the default amount (%s) will be used", requestedStorage)
}
}

flags.Storage = &workloads.StorageSchedulingFlags{
Quantity: requestedStorage,
StorageClassName: storageClassName,
}

return flags, nil
}

type Config struct {
Expand Down Expand Up @@ -192,7 +255,7 @@ func ApplyConfigToFlags(cmd *cobra.Command, config *Config) {
setFlag("gpus-per-replica", fmt.Sprintf("%d", config.RequestedGPUsPerReplica))
}

func PreRunLoadConfig(cmd *cobra.Command, args []string) error {
func PreRunLoadConfig(cmd *cobra.Command, _ []string) error {
if path == "" {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tui/list/pod/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func runSelectAndDoAction(_ context.Context, _ k8s.KubernetesClients, state *tui

var (
viewLogsAction runAction = "View logs"
monitorAction runAction = "Monitor"
monitorAction runAction = "Monitor GPUs"
commandAction runAction = "Run command"
)

Expand Down
Loading

0 comments on commit fd0cc4c

Please sign in to comment.