Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
CFSNM committed Jan 29, 2025
2 parents 17cced4 + 25f24d5 commit d42d0ed
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 15 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ require (
github.com/matoous/go-nanoid/v2 v2.1.0
github.com/onsi/gomega v1.32.0
github.com/project-codeflare/appwrapper v0.8.0
github.com/project-codeflare/codeflare-common v0.0.0-20241211130338-efe4f3e6f904
github.com/project-codeflare/codeflare-common v0.0.0-20250128135036-f501cd31fe8b
github.com/prometheus/client_golang v1.20.4
github.com/prometheus/common v0.57.0
github.com/ray-project/kuberay/ray-operator v1.1.0-alpha.0
github.com/ray-project/kuberay/ray-operator v1.1.1
k8s.io/api v0.30.8
k8s.io/apimachinery v0.30.8
k8s.io/client-go v0.30.8
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/project-codeflare/appwrapper v0.8.0 h1:vWHNtXUtHutN2EzYb6rryLdESnb8iDXsCokXOuNYXvg=
github.com/project-codeflare/appwrapper v0.8.0/go.mod h1:FMQ2lI3fz6LakUVXgN1FTdpsc3BBkNIZZgtMmM9J5UM=
github.com/project-codeflare/codeflare-common v0.0.0-20241211130338-efe4f3e6f904 h1:brU4j1V4o+z/sw0TGi360Wdjk1TEQ313ynBRGqSTaNU=
github.com/project-codeflare/codeflare-common v0.0.0-20241211130338-efe4f3e6f904/go.mod h1:v7XKwaDoCspsHQlWJNarO7gOpR+iumSS+c1bWs3kJOI=
github.com/project-codeflare/codeflare-common v0.0.0-20250128135036-f501cd31fe8b h1:MOmv/aLx/kcHd7PBErx8XNSTW180s8Slf/uVM0uV4rw=
github.com/project-codeflare/codeflare-common v0.0.0-20250128135036-f501cd31fe8b/go.mod h1:DPSv5khRiRDFUD43SF8da+MrVQTWmxNhuKJmwSLOyO0=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
Expand All @@ -394,8 +394,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/ray-project/kuberay/ray-operator v1.1.0-alpha.0 h1:m3knC3mjkQEmMj61DY73210mKVSWEGtFKn0uQ6RLwao=
github.com/ray-project/kuberay/ray-operator v1.1.0-alpha.0/go.mod h1:ZqyKKvMP5nKDldQoKmur+Wcx7wVlV9Q98phFqHzr+KY=
github.com/ray-project/kuberay/ray-operator v1.1.1 h1:mVOA1ddS9aAsPvhhHrpf0ZXgTzccIAyTbeYeDqtcfAk=
github.com/ray-project/kuberay/ray-operator v1.1.1/go.mod h1:ZqyKKvMP5nKDldQoKmur+Wcx7wVlV9Q98phFqHzr+KY=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down
53 changes: 46 additions & 7 deletions tests/kfto/kfto_mnist_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, accelerator Accelerator, image string,
namespace := test.NewTestNamespace()

mnist := ReadFile(test, "resources/mnist.py")
download_mnist_dataset := ReadFile(test, "resources/download_mnist_datasets.py")
requirementsFileName := ReadFile(test, requirementsFile)

if accelerator.isGpu() {
Expand All @@ -69,9 +70,9 @@ func runKFTOPyTorchMnistJob(t *testing.T, accelerator Accelerator, image string,
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1)
}
config := CreateConfigMap(test, namespace.Name, map[string][]byte{
// MNIST Ray Notebook
"mnist.py": mnist,
"requirements.txt": requirementsFileName,
"mnist.py": mnist,
"download_mnist_datasets.py": download_mnist_dataset,
"requirements.txt": requirementsFileName,
})

// Create training PyTorch job
Expand Down Expand Up @@ -117,6 +118,12 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
backend = "gloo"
}

storage_bucket_endpoint, storage_bucket_endpoint_exists := GetStorageBucketDefaultEndpoint()
storage_bucket_access_key_id, storage_bucket_access_key_id_exists := GetStorageBucketAccessKeyId()
storage_bucket_secret_key, storage_bucket_secret_key_exists := GetStorageBucketSecretKey()
storage_bucket_name, storage_bucket_name_exists := GetStorageBucketName()
storage_bucket_mnist_dir, storage_bucket_mnist_dir_exists := GetStorageBucketMnistDir()

tuningJob := &kftov1.PyTorchJob{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Expand Down Expand Up @@ -162,8 +169,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
echo "Downloading MNIST dataset..." && \
python3 -c "from torchvision.datasets import MNIST; from torchvision.transforms import Compose, ToTensor; \
MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \
python3 /mnt/files/download_mnist_datasets.py --dataset_path "/tmp/datasets/mnist" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
echo -e "\n\n Starting training..." && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
Expand Down Expand Up @@ -247,8 +253,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
echo "Downloading MNIST dataset..." && \
python3 -c "from torchvision.datasets import MNIST; from torchvision.transforms import Compose, ToTensor; \
MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \
python3 /mnt/files/download_mnist_datasets.py --dataset_path "/tmp/datasets/mnist" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
echo -e "\n\n Starting training..." && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
Expand Down Expand Up @@ -344,6 +349,40 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
}
}

// Use storage bucket to download the MNIST datasets if required environment variables are provided, else use default MNIST mirror references as the fallback
if storage_bucket_endpoint_exists && storage_bucket_access_key_id_exists && storage_bucket_secret_key_exists && storage_bucket_name_exists && storage_bucket_mnist_dir_exists {
storage_bucket_env_vars := []corev1.EnvVar{
{
Name: "AWS_DEFAULT_ENDPOINT",
Value: storage_bucket_endpoint,
},
{
Name: "AWS_ACCESS_KEY_ID",
Value: storage_bucket_access_key_id,
},
{
Name: "AWS_SECRET_ACCESS_KEY",
Value: storage_bucket_secret_key,
},
{
Name: "AWS_STORAGE_BUCKET",
Value: storage_bucket_name,
},
{
Name: "AWS_STORAGE_BUCKET_MNIST_DIR",
Value: storage_bucket_mnist_dir,
},
}

// Append the list of environment variables for the worker container
for _, envVar := range storage_bucket_env_vars {
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env = upsert(tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env, envVar, withEnvVarName(envVar.Name))
}

} else {
test.T().Logf("Skipped usage of S3 storage bucket, because required environment variables aren't provided!\nRequired environment variables : AWS_DEFAULT_ENDPOINT, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_STORAGE_BUCKET, AWS_STORAGE_BUCKET_MNIST_DIR")
}

tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)
Expand Down
90 changes: 90 additions & 0 deletions tests/kfto/resources/download_mnist_datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os, gzip, shutil
from minio import Minio
from torchvision import datasets
from torchvision.transforms import Compose, ToTensor

def main(dataset_path):
# Download and Load dataset
if all(var in os.environ for var in ["AWS_DEFAULT_ENDPOINT","AWS_ACCESS_KEY_ID","AWS_SECRET_ACCESS_KEY","AWS_STORAGE_BUCKET","AWS_STORAGE_BUCKET_MNIST_DIR"]):
print("Using provided storage bucket to download datasets...")
dataset_dir = os.path.join(dataset_path, "MNIST/raw")
endpoint = os.environ.get("AWS_DEFAULT_ENDPOINT")
access_key = os.environ.get("AWS_ACCESS_KEY_ID")
secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY")
bucket_name = os.environ.get("AWS_STORAGE_BUCKET")
print(f"Storage bucket endpoint: {endpoint}")
print(f"Storage bucket name: {bucket_name}\n")

# remove prefix if specified in storage bucket endpoint url
secure = True
if endpoint.startswith("https://"):
endpoint = endpoint[len("https://") :]
elif endpoint.startswith("http://"):
endpoint = endpoint[len("http://") :]
secure = False

client = Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
cert_check=False,
secure=secure
)
if not os.path.exists(dataset_dir):
os.makedirs(dataset_dir)
else:
print(f"Directory '{dataset_dir}' already exists")

# To download datasets from storage bucket's specific directory, use prefix to provide directory name
prefix=os.environ.get("AWS_STORAGE_BUCKET_MNIST_DIR")
print(f"Storage bucket MNIST directory prefix: {prefix}\n")

# download all files from prefix folder of storage bucket recursively
for item in client.list_objects(
bucket_name, prefix=prefix, recursive=True
):
file_name=item.object_name[len(prefix)+1:]
dataset_file_path = os.path.join(dataset_dir, file_name)
print(f"Downloading dataset file {file_name} to {dataset_file_path}..")
if not os.path.exists(dataset_file_path):
client.fget_object(
bucket_name, item.object_name, dataset_file_path
)
# Unzip files --
## Sample zipfilepath : ../data/MNIST/raw/t10k-images-idx3-ubyte.gz
with gzip.open(dataset_file_path, "rb") as f_in:
filename=file_name.split(".")[0] #-> t10k-images-idx3-ubyte
file_path=("/".join(dataset_file_path.split("/")[:-1])) #->../data/MNIST/raw
full_file_path=os.path.join(file_path,filename) #->../data/MNIST/raw/t10k-images-idx3-ubyte
print(f"Extracting {dataset_file_path} to {file_path}..")

with open(full_file_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
print(f"Dataset file downloaded : {full_file_path}\n")
# delete zip file
os.remove(dataset_file_path)
else:
print(f"File-path '{dataset_file_path}' already exists")
download_datasets = False
else:
print("Using default MNIST mirror references to download datasets ...")
print("Skipped usage of S3 storage bucket, because required environment variables aren't provided!\nRequired environment variables : AWS_DEFAULT_ENDPOINT, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_STORAGE_BUCKET, AWS_STORAGE_BUCKET_MNIST_DIR")
download_datasets = True

datasets.MNIST(
dataset_path,
train=False,
download=download_datasets,
transform=Compose([ToTensor()])
)

if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="MNIST dataset download")
parser.add_argument('--dataset_path', type=str, default="../data", help='Path to MNIST datasets (default: ../data)')

args = parser.parse_args()

main(
dataset_path=args.dataset_path,
)
3 changes: 2 additions & 1 deletion tests/kfto/resources/requirements-rocm.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
torchvision==0.19.0
tensorboard==2.18.0
fsspec[http]==2024.6.1
numpy==2.0.2
numpy==2.0.2
minio==7.2.13
3 changes: 2 additions & 1 deletion tests/kfto/resources/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
torchvision==0.19.0
tensorboard==2.18.0
fsspec[http]==2024.6.1
numpy==2.0.2
numpy==2.0.2
minio==7.2.13
18 changes: 18 additions & 0 deletions tests/kfto/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,21 @@ func OpenShiftPrometheusGpuUtil(test Test, pod corev1.Pod, gpu Accelerator) func
return util
}
}

type compare[T any] func(T, T) bool

func upsert[T any](items []T, item T, predicate compare[T]) []T {
for i, t := range items {
if predicate(t, item) {
items[i] = item
return items
}
}
return append(items, item)
}

func withEnvVarName(name string) compare[corev1.EnvVar] {
return func(e1, e2 corev1.EnvVar) bool {
return e1.Name == name
}
}

0 comments on commit d42d0ed

Please sign in to comment.