-
Notifications
You must be signed in to change notification settings - Fork 373
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a variant of the ray data processing job with GCSFuse CSI driver (#…
…2401) This is a variant of the ray data processing job (`ray_data_image_resize.py`) which leverages GKE GCSFuse CSI driver
- Loading branch information
1 parent
d8ffec4
commit 3e20a9d
Showing
2 changed files
with
179 additions
and
0 deletions.
There are no files selected for viewing
106 changes: 106 additions & 0 deletions
106
ray-operator/config/samples/ray-data-image-resize/ray-data-image-resize-gcsfusecsi-job.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
apiVersion: ray.io/v1 | ||
kind: RayJob | ||
metadata: | ||
name: ray-data-image-resize-gcsfuse | ||
spec: | ||
entrypoint: python ray-operator/config/samples/ray-data-image-resize/ray_data_image_resize_gcsfuse.py | ||
runtimeEnvYAML: | | ||
pip: | ||
- torch | ||
- torchvision | ||
- numpy | ||
working_dir: "https://github.com/ray-project/kuberay/archive/master.zip" | ||
env_vars: | ||
BUCKET_PREFIX: images | ||
shutdownAfterJobFinishes: true | ||
ttlSecondsAfterFinished: 30 | ||
rayClusterSpec: | ||
headGroupSpec: | ||
rayStartParams: | ||
disable-usage-stats: 'true' | ||
template: | ||
metadata: | ||
annotations: | ||
gke-gcsfuse/cpu-limit: '0' | ||
gke-gcsfuse/ephemeral-storage-limit: '0' | ||
gke-gcsfuse/memory-limit: '0' | ||
gke-gcsfuse/volumes: 'true' | ||
spec: | ||
containers: | ||
- image: rayproject/ray:2.9.3 | ||
name: ray-head | ||
ports: | ||
- containerPort: 6379 | ||
name: gcs-server | ||
- containerPort: 8265 | ||
name: dashboard | ||
- containerPort: 10001 | ||
name: client | ||
resources: | ||
requests: | ||
cpu: '1' | ||
memory: 4Gi | ||
volumeMounts: | ||
- mountPath: /tmp/ray | ||
name: ray-logs | ||
- name: dshm | ||
mountPath: /dev/shm | ||
- mountPath: /data | ||
name: gcs-fuse-csi-ephemeral | ||
volumes: | ||
- emptyDir: {} | ||
name: ray-logs | ||
- name: dshm | ||
emptyDir: | ||
medium: Memory | ||
- csi: | ||
driver: gcsfuse.csi.storage.gke.io | ||
volumeAttributes: | ||
# replace the bucketName to the Google Cloud Storage bucket of your choice. For non-public bucket, ensure access control is setup for the pod by following https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver#authentication | ||
bucketName: ray-images | ||
mountOptions: implicit-dirs,anonymous-access,uid=1000,gid=100,metadata-cache:ttl-secs:-1,metadata-cache:stat-cache-max-size-mb:-1,metadata-cache:type-cache-max-size-mb:-1 | ||
skipCSIBucketAccessCheck: 'true' | ||
name: gcs-fuse-csi-ephemeral | ||
rayVersion: 2.9.3 | ||
workerGroupSpecs: | ||
- groupName: worker-group | ||
maxReplicas: 3 | ||
minReplicas: 1 | ||
rayStartParams: {} | ||
replicas: 3 | ||
template: | ||
metadata: | ||
annotations: | ||
gke-gcsfuse/cpu-limit: '0' | ||
gke-gcsfuse/ephemeral-storage-limit: '0' | ||
gke-gcsfuse/memory-limit: '0' | ||
gke-gcsfuse/volumes: 'true' | ||
spec: | ||
containers: | ||
- image: rayproject/ray:2.9.3 | ||
name: ray-worker | ||
resources: | ||
requests: | ||
cpu: '1' | ||
memory: 4Gi | ||
volumeMounts: | ||
- mountPath: /tmp/ray | ||
name: ray-logs | ||
- name: dshm | ||
mountPath: /dev/shm | ||
- mountPath: /data | ||
name: gcs-fuse-csi-ephemeral | ||
volumes: | ||
- emptyDir: {} | ||
name: ray-logs | ||
- name: dshm | ||
emptyDir: | ||
medium: Memory | ||
- csi: | ||
driver: gcsfuse.csi.storage.gke.io | ||
volumeAttributes: | ||
# replace the bucketName to the Google Cloud Storage bucket of your choice. For non-public bucket, ensure access control is setup for the pod by following https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver#authentication | ||
bucketName: ray-images | ||
mountOptions: implicit-dirs,anonymous-access,uid=1000,gid=100,metadata-cache:ttl-secs:-1,metadata-cache:stat-cache-max-size-mb:-1,metadata-cache:type-cache-max-size-mb:-1 | ||
skipCSIBucketAccessCheck: 'true' | ||
name: gcs-fuse-csi-ephemeral |
73 changes: 73 additions & 0 deletions
73
ray-operator/config/samples/ray-data-image-resize/ray_data_image_resize_gcsfuse.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
from typing import Dict, List | ||
import numpy as np | ||
import ray | ||
import os | ||
|
||
import torch | ||
from torchvision import transforms | ||
from PIL import Image | ||
|
||
allowed_extensions = ('.png', '.jpg', '.jpeg', '.tif', '.tiff', '.bmp', '.gif') | ||
bucket_prefix = os.environ["BUCKET_PREFIX"] | ||
prefix = "/data/" + bucket_prefix | ||
|
||
def find_image_files(directory): | ||
image_files = [] | ||
for root, dirs, files in os.walk(directory): | ||
for file in files: | ||
if file.lower().endswith(allowed_extensions): | ||
# print ("found file ", file) | ||
image_files.append(os.path.join(root, file)) | ||
|
||
return image_files | ||
|
||
class ReadImageFiles: | ||
def __call__(self, text_batch: List[str]): | ||
Image.MAX_IMAGE_PIXELS = None | ||
text = text_batch['item'] | ||
images = [] | ||
for t in text: | ||
a = np.array(Image.open(t)) | ||
images.append(a) | ||
|
||
return {'results': list(zip(text, images))} | ||
|
||
class TransformImages: | ||
def __init__(self): | ||
self.transform = transforms.Compose( | ||
[transforms.ToTensor(), transforms.Resize((256, 256)), transforms.ConvertImageDtype(torch.float)] | ||
) | ||
def __call__(self, image_batch: Dict[str, List]): | ||
images = image_batch['results'] | ||
images_transformed = [] | ||
# input is a tuple of (filepath str, image ndarray) | ||
for t in images: | ||
images_transformed.append(self.transform(t[1])) | ||
|
||
return {'results': images_transformed} | ||
|
||
def main(): | ||
""" | ||
This is a CPU-only job that reads images from a Google Cloud Storage bucket and resizes them. | ||
The bucket is mounted as a volume to the underlying pod by the GKE GCSFuse CSI driver. | ||
""" | ||
ray.init() | ||
print("Enumerate files in prefix ", prefix) | ||
image_files = find_image_files(prefix) | ||
print("For prefix ", prefix, " number of image_files", len(image_files)) | ||
if len(image_files) == 0: | ||
print ("no files to process") | ||
return | ||
|
||
dataset = ray.data.from_items(image_files) | ||
dataset = dataset.flat_map(lambda row: [{'item': row['item']}]) | ||
dataset = dataset.map_batches(ReadImageFiles, batch_size=16, concurrency=2) | ||
dataset = dataset.map_batches(TransformImages, batch_size=16, concurrency=2) | ||
|
||
dataset_iter = dataset.iter_batches(batch_size=None) | ||
for _ in dataset_iter: | ||
pass | ||
print("done") | ||
|
||
if __name__ == "__main__": | ||
main() |