From 3e20a9d912fde0796d5ef27a06a1761fd3eb06cb Mon Sep 17 00:00:00 2001 From: Saikat Roychowdhury Date: Mon, 30 Sep 2024 21:51:42 -0700 Subject: [PATCH] Add a variant of the ray data processing job with GCSFuse CSI driver (#2401) This is a variant of the ray data processing job (`ray_data_image_resize.py`) which leverages GKE GCSFuse CSI driver --- .../ray-data-image-resize-gcsfusecsi-job.yaml | 106 ++++++++++++++++++ .../ray_data_image_resize_gcsfuse.py | 73 ++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 ray-operator/config/samples/ray-data-image-resize/ray-data-image-resize-gcsfusecsi-job.yaml create mode 100644 ray-operator/config/samples/ray-data-image-resize/ray_data_image_resize_gcsfuse.py diff --git a/ray-operator/config/samples/ray-data-image-resize/ray-data-image-resize-gcsfusecsi-job.yaml b/ray-operator/config/samples/ray-data-image-resize/ray-data-image-resize-gcsfusecsi-job.yaml new file mode 100644 index 0000000000..c32d75274a --- /dev/null +++ b/ray-operator/config/samples/ray-data-image-resize/ray-data-image-resize-gcsfusecsi-job.yaml @@ -0,0 +1,106 @@ +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: ray-data-image-resize-gcsfuse +spec: + entrypoint: python ray-operator/config/samples/ray-data-image-resize/ray_data_image_resize_gcsfuse.py + runtimeEnvYAML: | + pip: + - torch + - torchvision + - numpy + working_dir: "https://github.com/ray-project/kuberay/archive/master.zip" + env_vars: + BUCKET_PREFIX: images + shutdownAfterJobFinishes: true + ttlSecondsAfterFinished: 30 + rayClusterSpec: + headGroupSpec: + rayStartParams: + disable-usage-stats: 'true' + template: + metadata: + annotations: + gke-gcsfuse/cpu-limit: '0' + gke-gcsfuse/ephemeral-storage-limit: '0' + gke-gcsfuse/memory-limit: '0' + gke-gcsfuse/volumes: 'true' + spec: + containers: + - image: rayproject/ray:2.9.3 + name: ray-head + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + resources: + requests: + cpu: '1' + memory: 4Gi + volumeMounts: + - mountPath: /tmp/ray + name: ray-logs + - name: dshm + mountPath: /dev/shm + - mountPath: /data + name: gcs-fuse-csi-ephemeral + volumes: + - emptyDir: {} + name: ray-logs + - name: dshm + emptyDir: + medium: Memory + - csi: + driver: gcsfuse.csi.storage.gke.io + volumeAttributes: + # replace the bucketName to the Google Cloud Storage bucket of your choice. For non-public bucket, ensure access control is setup for the pod by following https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver#authentication + bucketName: ray-images + mountOptions: implicit-dirs,anonymous-access,uid=1000,gid=100,metadata-cache:ttl-secs:-1,metadata-cache:stat-cache-max-size-mb:-1,metadata-cache:type-cache-max-size-mb:-1 + skipCSIBucketAccessCheck: 'true' + name: gcs-fuse-csi-ephemeral + rayVersion: 2.9.3 + workerGroupSpecs: + - groupName: worker-group + maxReplicas: 3 + minReplicas: 1 + rayStartParams: {} + replicas: 3 + template: + metadata: + annotations: + gke-gcsfuse/cpu-limit: '0' + gke-gcsfuse/ephemeral-storage-limit: '0' + gke-gcsfuse/memory-limit: '0' + gke-gcsfuse/volumes: 'true' + spec: + containers: + - image: rayproject/ray:2.9.3 + name: ray-worker + resources: + requests: + cpu: '1' + memory: 4Gi + volumeMounts: + - mountPath: /tmp/ray + name: ray-logs + - name: dshm + mountPath: /dev/shm + - mountPath: /data + name: gcs-fuse-csi-ephemeral + volumes: + - emptyDir: {} + name: ray-logs + - name: dshm + emptyDir: + medium: Memory + - csi: + driver: gcsfuse.csi.storage.gke.io + volumeAttributes: + # replace the bucketName to the Google Cloud Storage bucket of your choice. For non-public bucket, ensure access control is setup for the pod by following https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver#authentication + bucketName: ray-images + mountOptions: implicit-dirs,anonymous-access,uid=1000,gid=100,metadata-cache:ttl-secs:-1,metadata-cache:stat-cache-max-size-mb:-1,metadata-cache:type-cache-max-size-mb:-1 + skipCSIBucketAccessCheck: 'true' + name: gcs-fuse-csi-ephemeral diff --git a/ray-operator/config/samples/ray-data-image-resize/ray_data_image_resize_gcsfuse.py b/ray-operator/config/samples/ray-data-image-resize/ray_data_image_resize_gcsfuse.py new file mode 100644 index 0000000000..27804af399 --- /dev/null +++ b/ray-operator/config/samples/ray-data-image-resize/ray_data_image_resize_gcsfuse.py @@ -0,0 +1,73 @@ +from typing import Dict, List +import numpy as np +import ray +import os + +import torch +from torchvision import transforms +from PIL import Image + +allowed_extensions = ('.png', '.jpg', '.jpeg', '.tif', '.tiff', '.bmp', '.gif') +bucket_prefix = os.environ["BUCKET_PREFIX"] +prefix = "/data/" + bucket_prefix + +def find_image_files(directory): + image_files = [] + for root, dirs, files in os.walk(directory): + for file in files: + if file.lower().endswith(allowed_extensions): + # print ("found file ", file) + image_files.append(os.path.join(root, file)) + + return image_files + +class ReadImageFiles: + def __call__(self, text_batch: List[str]): + Image.MAX_IMAGE_PIXELS = None + text = text_batch['item'] + images = [] + for t in text: + a = np.array(Image.open(t)) + images.append(a) + + return {'results': list(zip(text, images))} + +class TransformImages: + def __init__(self): + self.transform = transforms.Compose( + [transforms.ToTensor(), transforms.Resize((256, 256)), transforms.ConvertImageDtype(torch.float)] + ) + def __call__(self, image_batch: Dict[str, List]): + images = image_batch['results'] + images_transformed = [] + # input is a tuple of (filepath str, image ndarray) + for t in images: + images_transformed.append(self.transform(t[1])) + + return {'results': images_transformed} + +def main(): + """ + This is a CPU-only job that reads images from a Google Cloud Storage bucket and resizes them. + The bucket is mounted as a volume to the underlying pod by the GKE GCSFuse CSI driver. + """ + ray.init() + print("Enumerate files in prefix ", prefix) + image_files = find_image_files(prefix) + print("For prefix ", prefix, " number of image_files", len(image_files)) + if len(image_files) == 0: + print ("no files to process") + return + + dataset = ray.data.from_items(image_files) + dataset = dataset.flat_map(lambda row: [{'item': row['item']}]) + dataset = dataset.map_batches(ReadImageFiles, batch_size=16, concurrency=2) + dataset = dataset.map_batches(TransformImages, batch_size=16, concurrency=2) + + dataset_iter = dataset.iter_batches(batch_size=None) + for _ in dataset_iter: + pass + print("done") + +if __name__ == "__main__": + main()