Skip to content

Commit

Permalink
Add Ray data bench and reduce rust realm data copy (#42)
Browse files Browse the repository at this point in the history
add ray_single_node_image_loader_microbenchmark.py from https://gist.github.com/stephanie-wang/88500403e701537568383ef2e181768c in blog https://www.anyscale.com/blog/fast-flexible-scalable-data-loading-for-ml-training-with-ray-data
reduce data copies in rust make multi http calls
  • Loading branch information
lucyge2022 authored May 9, 2024
1 parent c52b28f commit 8c377b3
Show file tree
Hide file tree
Showing 9 changed files with 998 additions and 224 deletions.
12 changes: 12 additions & 0 deletions bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ def init_main_parser():
required=False,
help="The host address(es) for etcd",
)
parser.add_argument(
"--use-alluxiocommon",
action="store_true",
default=False,
help="Whether to use AlluxioCommon native extensions.",
)
parser.add_argument(
"--page-size",
type=str,
default=False,
help="Size in KB or MB",
)
return parser


Expand Down
12 changes: 8 additions & 4 deletions benchmark/AbstractBench.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,14 @@ def get_protocol(self, full_path: str) -> str:

def init(self):
# protocol = self.get_protocol(self.args.path)
alluxio_options = {
# "alluxio.common.extension.enable": "True",
"alluxio.worker.page.store.page.size": "1MB"
}
alluxio_options = {}
if self.args.use_alluxiocommon:
alluxio_options["alluxio.common.extension.enable"] = "True"
if self.args.page_size:
alluxio_options[
"alluxio.worker.page.store.page.size"
] = self.args.page_size
print(f"options for AlluxioFileSystem:{alluxio_options}")
self.alluxio_fs = AlluxioFileSystem(
etcd_hosts=self.args.etcd_hosts,
worker_hosts=self.args.worker_hosts,
Expand Down
59 changes: 57 additions & 2 deletions benchmark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ It's a light-weight benchmark that requires minimum dependency and mimic actual

Try out:
```commandline
python bench.py --etcd_hosts=localhost --numjobs=1 --runtime=10 --testsuite=FSSPEC --path=s3://ai-ref-arch/small-dataset --op=cat_file --bs=262144
python bench.py --etcd_hosts=localhost --numjobs=1 --runtime=10 --testsuite=FSSPEC --path=s3://ai-ref-arch/small-dataset --op=cat_file --bs=256KB
```

Actual bench:
Expand All @@ -20,7 +20,7 @@ python bench.py --etcd_hosts=localhost --numjobs=2 --runtime=20 --testsuite=FSSP
Substitute `path` to `s3://ai-ref-arch/10G-xgboost-data/` for data `op` `cat_file` `open_seq_read` `open_random_read`.
Tune your preferred buffer size.
```commandline
python bench.py --etcd_hosts=localhost --numjobs=2 --runtime=20 --testsuite=FSSPEC --path=s3://ai-ref-arch/10G-xgboost-data/ --op=open_seq_read --bs=262144
python bench.py --etcd_hosts=localhost --numjobs=2 --runtime=20 --testsuite=FSSPEC --path=s3://ai-ref-arch/10G-xgboost-data/ --op=open_seq_read --bs=256KB
```

## Alluxio FSSpec Traffic Pattern Benchmark
Expand All @@ -42,3 +42,58 @@ python bench.py --etcd_hosts=localhost --numjobs=2 --runtime=20 --testsuite=FSSP

Note that Alluxio FSSpec benchmarks with directory traverse logics. If the target directories are too big, then it may take a long time to traverse the dataset.
Recommend file/dir number > numjobs so that each process has enough file/dir to repeatedly running operations on top.


## Ray single node image loader micronbenchmark

#### Prerequisite
A running Ray cluster (single node is fine)
e.g. a single ray docker container (current recommend Ray 2.9.3 version as latest version seems unstable(2024/05/09))

docker run -it --rm rayproject/ray:2.9.3 /bin/bash
[Note] on top of this there's a list of dependency install:
pip install torch --no-cache-dir
pip install torchvision
pip install tensorflow
pip install mosaicml-streaming
pip install pydantic==1.10.12
pip install protobuf==3.20.2
pip install xgboost
pip install xgboost_ray
pip install s3fs
sudo apt-get update
sudo apt-get install -y awscli dstat vim screen
sudo apt-get update
sudo apt-get install -y awscli dstat vim screen iproute2
pip install s3fs
sudo apt-get install iproute2

[Internal] Use this ECR image for latest stable image with all dependencies installed:
`533267169037.dkr.ecr.us-east-1.amazonaws.com/alluxioray-lucy:lateststable`

Try out:
1) Start ray cluster (adjust relevant flags number accordingly)


nohup ray start --head --memory=$((16 * 2**30)) --object-store-memory=$((4 * 2**30)) --dashboard-host=0.0.0.0 --metrics-export-port=8080 --block --num-cpus=14 2>&1 > /home/ray/ray.out &

make sure it is up by checking with `ray status`

2) Start the microbenchmark

- With Ray data + s3 parquet dataset:


python3 benchmark/ray_single_node_image_loader_microbenchmark.py --parquet-data-root s3://ai-ref-arch/imagenet-mini-parquet/train/75e5c08ce5913cf8f513a/

- With Ray data + s3 parquet dataset on alluxio:


1) on alluxio home, do load first:
bin/alluxio job load --path s3://ai-ref-arch/imagenet-mini-parquet/train/75e5c08ce5913cf8f513a/ --submit
2) run the microbench
python3 benchmark/ray_single_node_image_loader_microbenchmark.py \
--parquet-data-root s3://ai-ref-arch/imagenet-mini-parquet/train/75e5c08ce5913cf8f513a/ \
--use-alluxio --alluxio-worker-hosts <hostname> \
--alluxio-page-size <e.g. 32MB> \
[--use-alluxiocommon (optional, to use RUST extension)]
15 changes: 9 additions & 6 deletions benchmark/bench/AlluxioFSSpecBench.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import random
from enum import Enum

import humanfriendly

from benchmark.AbstractBench import AbstractAlluxioFSSpecTraverseBench
from benchmark.AbstractBench import AbstractArgumentParser
from benchmark.AbstractBench import Metrics
Expand All @@ -25,9 +27,9 @@ def __init__(self, main_parser):
)
self.parser.add_argument(
"--bs",
type=int,
type=str,
default=256 * 1024,
help="Buffer size for read operations.",
help="Buffer size for read operations, in KB or MB.",
)

def parse_args(self, args=None, namespace=None):
Expand All @@ -41,6 +43,7 @@ def __init__(self, process_id, num_process, args, **kwargs):
super().__init__(process_id, num_process, args, **kwargs)

def execute(self):
self.buffer_size = humanfriendly.parse_size(self.args.bs, binary=True)
if self.args.op == Op.ls.value:
self.bench_ls(self.next_dir())
elif self.args.op == Op.info.value:
Expand All @@ -67,7 +70,7 @@ def bench_info(self, file_path, size):
def bench_cat_file(self, file_path, file_size):
file_read = 0
while file_read < file_size:
read_bytes = min(self.args.bs, file_size - file_read)
read_bytes = min(self.buffer_size, file_size - file_read)
self.alluxio_fs.cat_file(file_path, 0, read_bytes)
file_read += read_bytes
self.metrics.update(Metrics.TOTAL_OPS, 1)
Expand All @@ -76,7 +79,7 @@ def bench_cat_file(self, file_path, file_size):
def bench_open_seq_read(self, file_path, file_size):
with self.alluxio_fs.open(file_path, "rb") as f:
while True:
data = f.read(self.args.bs)
data = f.read(self.buffer_size)
if not data:
break
self.metrics.update(Metrics.TOTAL_OPS, 1)
Expand All @@ -86,10 +89,10 @@ def bench_open_random_read(self, file_path, file_size):
bytes_read = 0
total_ops = 0
with self.alluxio_fs.open(file_path, "rb") as f:
bytes_to_read = min(file_size, self.args.bs)
bytes_to_read = min(file_size, self.buffer_size)
while bytes_read < bytes_to_read:
offset = random.nextInt(file_size)
read_bytes = min(self.args.bs, file_size - offset)
read_bytes = min(self.buffer_size, file_size - offset)
f.seek(offset)
data = f.read(read_bytes)
bytes_read += len(data)
Expand Down
11 changes: 6 additions & 5 deletions benchmark/bench/RayBench.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,22 @@ def __init__(self, main_parser):
)

def parse_args(self, args=None, namespace=None):
args = self.parser.parse_args(args, namespace)
return args
parsed_args = self.parser.parse_args(args, namespace)
return parsed_args


class RayBench(AbstractBench):
def __init__(self, process_id, num_process, args, **kwargs):
super().__init__(process_id, num_process, args, **kwargs)
self.args = args

def init(self):
self.validate_args()
self.alluxio_fs = AlluxioFileSystem(
etcd_hosts=self.args.etcd_hosts,
worker_hosts=self.args.worker_hosts,
)

def init(self):
self.validate_args()

def execute(self):
if self.args.op == Op.read_parquet.name:
print(f"Executing AlluxioRESTBench! Op:{self.args.op}")
Expand Down
Loading

0 comments on commit 8c377b3

Please sign in to comment.