forked from ChunxuTang/AI-Reference-Architecture
-
Notifications
You must be signed in to change notification settings - Fork 0
/
benchmark-data-loading.py
287 lines (261 loc) · 9.48 KB
/
benchmark-data-loading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""
A script to benchmark data loading of the ImageNet dataset in the PyTorch
data loader. Note that it still requires some CPU data processing to convert
the images to PyTorch tensors.
Example usage:
POSIX API
- python3 benchmark-data-loading.py -e 5 -b 128 -w 16
- python3 benchmark-data-loading.py -e 5 -b 128 -w 16 -a posix -p /mnt/alluxio/fuse/imagenet-mini/val
ALLUXIO REST API
- python3 benchmark-data-loading.py -e 5 -b 128 -w 16 -a alluxio -p s3://ref-arch/imagenet-mini/val --etcd localhost
- Configure ETCD user,password add -o alluxio.etcd.username=alluxio,alluxio.etcd.password=alluxio
- python3 benchmark-data-loading.py -e 5 -b 128 -w 16 -a alluxio -p s3://ref-arch/imagenet-mini/val --alluxioworkers host1,host2
- Configure a different page size, add -o alluxio.worker.page.store.page.size=20MB
ALLUXIO S3 API
- python3 benchmark-data-loading.py -e 5 -b 128 -w 16 -a alluxios3 -p s3://ref-arch/imagenet-mini/val -d s3://ref-arch/ --alluxioworkers localhost
S3 API
- - python3 benchmark-data-loading.py -e 5 -b 128 -w 16 -a s3 -p s3://ref-arch/imagenet-mini/val
"""
import argparse
import logging
import time
import warnings
from enum import Enum
from logging.config import fileConfig
import boto3
import torch
import torchvision.transforms as transforms
from alluxio import AlluxioFileSystem
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from datasets.alluxio import AlluxioDataset
from datasets.alluxios3 import AlluxioS3
from datasets.alluxios3 import AlluxioS3Dataset
from datasets.s3 import S3ImageDataset
log_conf_path = "./conf/logging.conf"
fileConfig(log_conf_path, disable_existing_loggers=True)
_logger = logging.getLogger("BenchmarkDataLoading")
# Explicitly disable the PIL.TiffImagePlugin logger as it also uses
# the StreamHandler which will overrun the console output.
logging.getLogger("PIL.TiffImagePlugin").disabled = True
warnings.filterwarnings("ignore", category=UserWarning)
class APIType(Enum):
POSIX = "posix"
ALLUXIO = "alluxio"
ALLUXIOS3 = "alluxios3"
S3 = "s3"
def get_args():
parser = argparse.ArgumentParser(
description="Benchmark PyTorch Data Loading on ImageNet Dataset"
)
parser.add_argument(
"-n", "--name", help="Experiment name", default="data loading"
)
parser.add_argument(
"-e", "--epoch", help="Number of epochs", default=5, type=int
)
parser.add_argument(
"-b", "--batch", help="Batch size", default=64, type=int
)
parser.add_argument(
"-w", "--worker", help="Number of workers", default=4, type=int
)
parser.add_argument(
"-a",
"--api",
help="The API to use. default is posix",
choices=[e.value for e in APIType],
default=APIType.POSIX.value,
)
parser.add_argument(
"-p",
"--path",
help="Local POSIX PATH if API type is POSIX, full ufs path if "
"ALLUXIO/ALLUXIOS3 API (e.g.s3://ref-arch/imagenet-mini/val)",
default="./data/imagenet-mini/val",
)
parser.add_argument(
"-d",
"--doraroot",
help="AlluxioS3 API require Dora root ufs address to do path transformation",
default="s3://ref-arch/",
)
parser.add_argument(
"--etcd",
help="Alluxio API require ETCD hostname or Alluxio worker adddresses --alluxioworkers host1,host2,host3",
default="localhost",
)
parser.add_argument(
"-aw",
"--alluxioworkers",
help="Alluxio S3 API require worker hostnames in format of host1,host2,host3",
default="localhost",
)
parser.add_argument(
"-o",
"--options",
help="Additional Alluxio property key value pars in format of key1=value1,key2=value2",
default="",
)
return parser.parse_args()
def parse_options(options_str):
options_dict = {}
if options_str:
key_value_pairs = options_str.split(",")
for pair in key_value_pairs:
key, value = pair.split("=")
options_dict[key.strip()] = value.strip()
return options_dict
class BenchmarkRunner:
def __init__(
self,
name,
num_epochs,
batch_size,
num_workers,
api,
path,
dora_root,
etcd_host,
alluxio_workers,
options,
):
self.name = name
self.num_epochs = num_epochs
self.batch_size = batch_size
self.num_workers = num_workers
self.api = api
self.path = path
self.dora_root = dora_root
self.etcd_host = etcd_host
self.alluxio_workers = alluxio_workers
self.options = options
def benchmark_data_loading(self):
self._check_device()
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
transform = transforms.Compose(
[
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean, std),
]
)
dataset = None
if self.api == APIType.ALLUXIO.value:
alluxio_file_system = None
if self.etcd_host is None:
_logger.debug(
f"Using alluxio dataset with ETCD host {self.etcd_host} and ufs path {self.path} "
)
alluxio_file_system = AlluxioFileSystem(
etcd_host=self.etcd_host,
options=self.options,
concurrency=self.num_workers,
logger=_logger,
)
else:
_logger.debug(
f"Using alluxio dataset with worker address {self.alluxio_workers} and ufs path {self.path} "
)
alluxio_file_system = AlluxioFileSystem(
worker_hosts=self.alluxio_workers,
options=self.options,
concurrency=self.num_workers,
logger=_logger,
)
dataset = AlluxioDataset(
alluxio_file_system=alluxio_file_system,
dataset_path=self.path,
transform=transform,
logger=_logger,
)
elif self.api == APIType.POSIX.value:
_logger.debug(
f"Using POSIX API ImageFolder dataset with path {self.path}"
)
dataset = ImageFolder(root=self.path, transform=transform)
elif self.api == APIType.ALLUXIOS3.value:
_logger.debug("Using alluxio S3 API dataset")
alluxio_s3 = AlluxioS3(
self.alluxio_workers,
self.dora_root,
_logger,
)
dataset = AlluxioS3Dataset(
alluxio_s3=alluxio_s3,
dataset_path=self.path,
transform=transform,
logger=_logger,
)
else:
s3 = boto3.client("s3")
# For MINIO,please change to something similar to
# s3 = boto3.client('s3', endpoint_url='http://10.0.6.242:9000', aws_access_key_id='minioadmin', aws_secret_access_key='minioadmin', verify=False)
bucket_name, prefix = self._parse_s3_path(self.path)
dataset = S3ImageDataset(s3, bucket_name, prefix, transform)
loader = DataLoader(
dataset,
batch_size=self.batch_size,
shuffle=False,
num_workers=self.num_workers,
)
start_time = time.perf_counter()
for epoch in range(self.num_epochs):
epoch_start = time.perf_counter()
for _, _ in loader:
pass
epoch_end = time.perf_counter()
_logger.debug(
f"Epoch {epoch}: {epoch_end - epoch_start:0.4f} seconds"
)
end_time = time.perf_counter()
_logger.debug(f"Data loading in {end_time - start_time:0.4f} seconds")
self._summarize(end_time - start_time)
def _check_device(self):
try:
device = (
"cuda"
if torch.cuda.is_available()
else "mps"
if torch.backends.mps.is_available()
else "cpu"
)
_logger.debug(f"Using {device}")
except AttributeError:
device = "cpu"
_logger.warning(
"Failed to access 'torch.backends.mps'. Defaulting to 'cpu'."
)
def _summarize(self, elapsed_time):
_logger.info(f"[Summary] experiment: {self.name} | path: {self.path}")
_logger.info(
f"num_epochs: {self.num_epochs} | batch_size: {self.batch_size} | "
f"num_workers: {self.num_workers} | time: {elapsed_time:0.4f}"
)
def _parse_s3_path(self, s3_path):
assert s3_path.startswith(
"s3://"
), "The provided path is not a valid S3 path."
path_without_scheme = s3_path[5:]
parts = path_without_scheme.split("/", 1)
bucket_name = parts[0]
prefix = parts[1] if len(parts) > 1 else ""
return bucket_name, prefix
if __name__ == "__main__":
args = get_args()
options_dict = parse_options(args.options)
benchmark_runner = BenchmarkRunner(
name=args.name,
num_epochs=args.epoch,
batch_size=args.batch,
num_workers=args.worker,
api=args.api,
path=args.path,
dora_root=args.doraroot,
etcd_host=args.etcd,
alluxio_workers=args.alluxioworkers,
options=options_dict,
)
benchmark_runner.benchmark_data_loading()