Skip to content

Commit

Permalink
Add a variant of the ray data processing job with GCSFuse CSI driver
Browse files Browse the repository at this point in the history
This is a variant of the ray data processing job (`ray_data_image_resize.py`) which leverages GKE GCSFuse CSI driver
  • Loading branch information
saikat-royc committed Oct 1, 2024
1 parent d8ffec4 commit 5b4b18f
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: ray-data-image-resize-gcsfuse
spec:
entrypoint: python ray-operator/config/samples/ray-data-image-resize/ray_data_image_resize_gcsfuse.py
runtimeEnvYAML: |
pip:
- torch
- torchvision
- numpy
working_dir: "https://github.com/ray-project/kuberay/archive/master.zip"
env_vars:
BUCKET_PREFIX: images
shutdownAfterJobFinishes: true
ttlSecondsAfterFinished: 30
rayClusterSpec:
headGroupSpec:
rayStartParams:
disable-usage-stats: 'true'
template:
metadata:
annotations:
gke-gcsfuse/cpu-limit: '0'
gke-gcsfuse/ephemeral-storage-limit: '0'
gke-gcsfuse/memory-limit: '0'
gke-gcsfuse/volumes: 'true'
spec:
containers:
- image: rayproject/ray:2.9.3
name: ray-head
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
resources:
requests:
cpu: '1'
memory: 4Gi
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
- name: dshm
mountPath: /dev/shm
- mountPath: /data
name: gcs-fuse-csi-ephemeral
volumes:
- emptyDir: {}
name: ray-logs
- name: dshm
emptyDir:
medium: Memory
- csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
# replace the bucketName to Google Cloud bucket of your choice. For non-public bucket, ensure access control is setup for the pod by following https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver#authentication
bucketName: ray-images
mountOptions: implicit-dirs,anonymous-access,uid=1000,gid=100,metadata-cache:ttl-secs:-1,metadata-cache:stat-cache-max-size-mb:-1,metadata-cache:type-cache-max-size-mb:-1
skipCSIBucketAccessCheck: 'true'
name: gcs-fuse-csi-ephemeral
rayVersion: 2.9.3
workerGroupSpecs:
- groupName: worker-group
maxReplicas: 3
minReplicas: 1
rayStartParams: {}
replicas: 3
template:
metadata:
annotations:
gke-gcsfuse/cpu-limit: '0'
gke-gcsfuse/ephemeral-storage-limit: '0'
gke-gcsfuse/memory-limit: '0'
gke-gcsfuse/volumes: 'true'
spec:
containers:
- image: rayproject/ray:2.9.3
name: ray-worker
resources:
requests:
cpu: '1'
memory: 4Gi
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
- name: dshm
mountPath: /dev/shm
- mountPath: /data
name: gcs-fuse-csi-ephemeral
volumes:
- emptyDir: {}
name: ray-logs
- name: dshm
emptyDir:
medium: Memory
- csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
# replace the bucketName to Google Cloud bucket of your choice. For non-public bucket, ensure access control is setup for the pod by following https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver#authentication
bucketName: ray-images
mountOptions: implicit-dirs,anonymous-access,uid=1000,gid=100,metadata-cache:ttl-secs:-1,metadata-cache:stat-cache-max-size-mb:-1,metadata-cache:type-cache-max-size-mb:-1
skipCSIBucketAccessCheck: 'true'
name: gcs-fuse-csi-ephemeral
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from typing import Dict, List
import numpy as np
import ray
import os

import torch
from torchvision import transforms
from PIL import Image

allowed_extensions = ('.png', '.jpg', '.jpeg', '.tif', '.tiff', '.bmp', '.gif')
bucket_prefix = os.environ["BUCKET_PREFIX"]
prefix = "/data/" + bucket_prefix

def find_image_files(directory):
image_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.lower().endswith(allowed_extensions):
# print ("found file ", file)
image_files.append(os.path.join(root, file))

return image_files

class ReadImageFiles:
def __call__(self, text_batch: List[str]):
Image.MAX_IMAGE_PIXELS = None
text = text_batch['item']
images = []
for t in text:
a = np.array(Image.open(t))
images.append(a)

return {'results': list(zip(text, images))}

class TransformImages:
def __init__(self):
self.transform = transforms.Compose(
[transforms.ToTensor(), transforms.Resize((256, 256)), transforms.ConvertImageDtype(torch.float)]
)
def __call__(self, image_batch: Dict[str, List]):
images = image_batch['results']
images_transformed = []
# input is a tuple of (filepath str, image ndarray)
for t in images:
images_transformed.append(self.transform(t[1]))

return {'results': images_transformed}

def main():
"""
This is a CPU-only job that reads images from a Google Cloud Storage bucket and resizes them.
The bucket is mounted as a volume to the underlying pod by the GKE GCSFuse CSI driver.
"""
ray.init()
print("Enumerate files in prefix ", prefix)
image_files = find_image_files(prefix)
print("For prefix ", prefix, " number of image_files", len(image_files))
if len(image_files) == 0:
print ("no files to process")
return

dataset = ray.data.from_items(image_files)
dataset = dataset.flat_map(lambda row: [{'item': row['item']}])
dataset = dataset.map_batches(ReadImageFiles, batch_size=16, concurrency=2)
dataset = dataset.map_batches(TransformImages, batch_size=16, concurrency=2)

dataset_iter = dataset.iter_batches(batch_size=None)
for _ in dataset_iter:
pass
print("done")

if __name__ == "__main__":
main()

0 comments on commit 5b4b18f

Please sign in to comment.