-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloads.py
459 lines (342 loc) · 14.2 KB
/
loads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
from twisted.internet.protocol import ProcessProtocol
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue, succeed, maybeDeferred
from twisted.internet.error import ProcessDone
from twisted.internet.utils import getProcessOutput
from twisted.internet import reactor
from twisted.internet import task
from random import randrange
import os, time, random
from time import ctime
ZFS = b"/sbin/zfs"
LARGE_MODE = True
if LARGE_MODE:
CHANGE_FILES_COUNT = 2 ** 20
else:
CHANGE_FILES_COUNT = 2 ** 8
def _summarize(proto):
if proto.endedReason.check(ProcessDone):
print "\t(%s)ended successfully" % (proto.pid,)
else:
print "\t(%s)ended:\t%s" % (proto.pid, proto.endedReason.getErrorMessage())
if proto.out:
print "\toutput:\t", b"".join(proto.out)[:80]
if proto.err:
print "\terrput:\t", b"".join(proto.err)
class BaseLoad(object):
tag = 'zfs-perf-test'
def __init__(self, root, zpool):
"""
@param root: A string giving the root path of the zpool to benchmark in.
@param zpool: The zfs pool name to benchmark in.
"""
self.root = root
self.zpool = zpool
self.filesystem = b'hcfs/%s-%d' % (self.tag, randrange(2 ** 16),)
self.cooperativeTask = None
self._done = None
self._stopFlag = False
self._cooperator = None
def _startCooperativeTask(self):
if self._cooperator is not None:
raise Exception("Don't start me twice!")
self._cooperator = task.cooperate(self._generator())
self._done = self._cooperator.whenDone()
self._done.addBoth(self._cleanup)
def _cleanup(self, passthrough):
return passthrough
def _stopCooperativeTask(self):
self._stopFlag = True
return self._done
def _generator(self):
"""
Return a generator which loops until self._stopFlag becomes false,
running a job on each interation.
"""
while not self._stopFlag:
yield self._oneStep()
self._cooperator = None
def _oneStep(self):
# Override me!
return 1
def start(self, benchmarkFilesystem):
"""
@param benchmarkFilesystem: The name of the filesystem being benchmarked.
"""
# Runs in reactor thread. Return a Deferred that fires when
# load is started. Load runs until stop is called.
pass
def stop(self):
# Runs in reactor thread. Return a Deferred that fires when
# load is stopped.
return maybeDeferred(self._stopCooperativeTask)
@inlineCallbacks
def _create_filesystem(self, filesystem):
fqfn = b"%s/%s" % (self.zpool, filesystem)
yield self._run(ZFS, b"create", fqfn)
yield self._run(
ZFS, b"set",
b"mountpoint=%s/%s" % (self.root, filesystem),
fqfn)
yield self._run(
ZFS, b"set",
b"atime=off",
fqfn)
def _create_snapshot(self, filesystem, name):
return self._run(
ZFS, b"snapshot", b"%s/%s@%s" % (self.zpool, filesystem, name))
def _create_changes(self, filesystem):
pattern = (
b"she slit the sheet the sheet she slit and on the slitted sheet "
b"she sits.") * 64
path = "%s/%s" % (self.root, filesystem)
if not os.path.exists(path):
os.mkdir(path)
for i in range(CHANGE_FILES_COUNT):
fObj = open(b"%s/%s/data.%d" % (self.root, filesystem, i), "w")
fObj.write(pattern)
fObj.close()
pattern = pattern[1:] + pattern[0]
@inlineCallbacks
def _record_changes(self, filesystem, start, end):
output_filename = b"%s_%s_%s" % (filesystem.replace('/', '_'), start, end)
fObj = open(output_filename, "w")
yield self._run(
ZFS, b"send", b"-I",
b"%s/%s@%s" % (self.zpool, filesystem, start),
b"%s/%s@%s" % (self.zpool, filesystem, end),
childFDs={0: 'w', 1: fObj.fileno(), 2: 'r'})
fObj.close()
returnValue(output_filename)
def _destroy_snapshot(self, filesystem, name):
return self._run(
ZFS, b"destroy", b"%s/%s@%s" % (self.zpool, filesystem, name))
def _destroy_filesystem(self, filesystem):
return self._run(
ZFS, b"destroy", b"-r", b"%s/%s" % (self.zpool, filesystem))
def _receive_snapshot(self, filesystem, input_filename):
class ReceiveProto(ProcessProtocol):
command = [ZFS, b"recv", b"-F", b"-d", b"%(zpool)s"]
@classmethod
def run(cls, reactor):
proto = cls()
proto.finished = Deferred()
proto.fObj = open(input_filename, 'r')
command = [arg % dict(zpool=self.zpool, filesystem=filesystem)
for arg
in cls.command]
reactor.spawnProcess(
proto, command[0], command, childFDs={0: proto.fObj.fileno(), 1: 'r', 2: 'r'})
proto.pid = proto.transport.pid
print "command (%d):\t%s" % (proto.pid, command), '<', input_filename
return proto
def connectionMade(self):
self.out = []
self.err = []
def outReceived(self, data):
self.out.append(data)
def errReceived(self, data):
self.err.append(data)
def kill(self):
self.transport.signalProcess("KILL")
def wait(self):
return self.finished
def processEnded(self, reason):
self.endedReason = reason
print ctime(), "Load ended!"
self.fObj.close()
_summarize(self)
self.finished.callback(self)
return ReceiveProto.run(reactor).finished
def _run(self, *command, **kwargs):
class Collector(ProcessProtocol):
finished = None
@classmethod
def run(cls, reactor):
proto = cls()
proto.finished = Deferred()
reactor.spawnProcess(proto, command[0], command, **kwargs)
proto.pid = proto.transport.pid
return proto
def connectionMade(self):
self.out = []
self.err = []
def outReceived(self, data):
self.out.append(data)
def errReceived(self, data):
self.err.append(data)
def processEnded(self, reason):
self.endedReason = reason
self.finished.callback(self)
proto = Collector.run(reactor)
print "command (%d):\t %s %s" % (proto.pid, command, kwargs)
d = proto.finished
d.addCallback(_summarize)
return d
# rename - filesystem
class RenameFilesystemLoad(BaseLoad):
"""
Forever 'loop' while renaming a filesystem backwards and forwards
between hpool/hcfs-trash and back...
"""
tag = 'zfs-rename-test'
@inlineCallbacks
def start(self, benchmarkFilesystem):
"""
Return a C{Deferred} which fires when the filesystem is created.
"""
yield self._create_filesystem(self.filesystem)
if not os.path.ismount("/hcfs-trash"):
yield self._run(ZFS, "create", "hpool/hcfs-trash")
yield self._run(ZFS, "set", "mountpoint=/hcfs-trash", "hpool/hcfs-trash")
# Now kick off the thing which runs forever in a loop.
self._startCooperativeTask()
@inlineCallbacks
def _oneStep(self):
trash_timestamp = str(time.time())
# Trash the filesystem (cnp'd almost directly from safemounthandler for
# maximum realism, except added split sadness)
args = {
'zpool': self.zpool, 'timestamp': trash_timestamp, 'filesystem': self.filesystem,
'trash-filesystem': self.filesystem.replace('hcfs/', 'hcfs-trash/'),
}
def cmd(s):
return (s % args).split(" ")
yield self._run(ZFS, *cmd("rename %(zpool)s/%(filesystem)s %(zpool)s/%(trash-filesystem)s-%(timestamp)s"))
yield self._run(ZFS, *cmd("set mountpoint=/%(trash-filesystem)s-%(timestamp)s %(zpool)s/%(trash-filesystem)s-%(timestamp)s"))
# Un-trash the filesystem
yield self._run(ZFS, *cmd("set mountpoint=/%(filesystem)s %(zpool)s/%(trash-filesystem)s-%(timestamp)s"))
yield self._run(ZFS, *cmd("rename %(zpool)s/%(trash-filesystem)s-%(timestamp)s %(zpool)s/%(filesystem)s"))
class PruneSnapshots(BaseLoad):
tag = 'zfs-prune-test'
random = random.Random(x=1)
@inlineCallbacks
def _newSnapshot(self):
yield self._create_snapshot(self.filesystem, str(self.snapshotCounter))
yield self._create_changes(self.filesystem)
self.snapshots.append(self.snapshotCounter)
self.snapshotCounter += 1
@inlineCallbacks
def start(self, benchmarkFilesystem):
yield self._create_filesystem(self.filesystem)
self.snapshots = []
self.snapshotCounter = 0
# If we were smartarses we could use self.snapshotCounter as the
# assigment target here, but we prefer readable code ;-)
for i in range(10):
yield self._newSnapshot()
# Now kick off the thing which runs forever in a loop.
self._startCooperativeTask()
@inlineCallbacks
def _oneStep(self):
yield self._newSnapshot()
# Randomly pick a snapshot and take it out.
target = self.random.choice(self.snapshots)
yield self._destroy_snapshot(self.filesystem, target)
self.snapshots.remove(target)
# recv - other filesystem - small case
class LotsOfTinySnapshots(BaseLoad):
"""
"""
COUNT = 50
@inlineCallbacks
def start(self, benchmarkFilesystem):
# Get rid of any leftovers from previous runs
yield self._destroy_filesystem(self.filesystem)
yield self._create_filesystem(self.filesystem)
self.snapshots = []
# Generate a bunch of snapshots to later replay
previous = b'0'
yield self._create_snapshot(self.filesystem, previous)
for i in range(1, self.COUNT):
# Stuff some bytes into it to make the snapshot interesting
self._create_changes(self.filesystem)
# Take the snapshot
yield self._create_snapshot(self.filesystem, bytes(i))
# Dump it into a file for later replay
snapshot = yield self._record_changes(self.filesystem, bytes(previous), bytes(i))
previous = i
self.snapshots.append(snapshot)
# Delete all of the snapshots just taken
yield self._reset_snapshots()
# Start the process of replaying them
self._startCooperativeTask()
@inlineCallbacks
def _reset_snapshots(self):
"""
Delete all the snapshots we received, reload the list of
snapshots that we can now receive.
"""
# Keep the first snapshot, since that's what the first saved
# incremental stream is based on.
for i in range(1, self.COUNT):
yield self._destroy_snapshot(self.filesystem, bytes(i))
self.snapshots_for_replay = self.snapshots[:]
def _oneStep(self):
if not self.snapshots_for_replay:
return self._reset_snapshots()
snapshot = self.snapshots_for_replay.pop(0)
return self._receive_snapshot(self.filesystem, snapshot)
@inlineCallbacks
def _cleanup(self, passthrough):
# Delete the entire filesystem we were using
yield self._destroy_filesystem(self.filesystem)
# And delete all the saved incremental streams we created too
for s in self.snapshots:
os.remove(s)
returnValue(passthrough)
# recv - not-exist
# snapshot - other filesystem
# snapshot - same filesystem
class SnapshotUsedFilesystemLoad(BaseLoad):
_iteration = 0
def _oneStep(self):
self._iteration += 1
return self._create_snapshot(self.benchmarkFilesystem, bytes(self._iteration))
def start(self, benchmarkFilesystem):
self.benchmarkFilesystem = benchmarkFilesystem
self._task = self._startCooperativeTask()
return succeed(None)
# recv - other filesystem - large case
class ReplayLargeLoad(BaseLoad):
"""
"""
process = None
def _oneStep(self):
# Run a "zfs recv". If it finishes, destroy the received snapshot and
# run the same "zfs recv" again. Continue until poked from the outside
# to stop. Call this in the reactor thread.
yield self._destroy_snapshot(self.filesystem, b'end')
self.process = self._receive_snapshot(self.filesystem, self._snapshot)
yield self.process.finished
self.process = None
@inlineCallbacks
def start(self, benchmarkFilesystem):
# Get rid of any leftovers from previous runs
yield self._destroy_filesystem(self.filesystem)
yield self._create_filesystem(self.filesystem)
# Take a snapshot of that filesystem to later replay onto
yield self._create_snapshot(self.filesystem, b'start')
# Make some changes so we have a sizable change log to replay
yield self._create_changes(self.filesystem)
# Take the new snapshot
yield self._create_snapshot(self.filesystem, b'end')
# Record the changes into a file to replay from
self._snapshot = yield self._record_changes(
self.filesystem, b'start', b'end')
# Unmount the filesystem before receiving into it.
yield getProcessOutput(
ZFS, [b"umount", b"%s/%s" % (self.zpool, self.filesystem)])
# Replay the change log asynchronously
print ctime(), 'Load started'
self._startCooperativeTask()
def _cleanup(self, passthrough):
os.remove(self._snapshot)
return passthrough
def stop(self):
# Stop whatever command is currently in progress and wait for it to
# actually exit.
print ctime(), "Killing load and waiting.."
if self.process is not None:
# Kill the currently running zfs recv
self.process.kill()
return BaseLoad.stop(self)