forked from seomoz/qless-py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathforgetful-bench.py
executable file
·203 lines (178 loc) · 5.27 KB
/
forgetful-bench.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#! /usr/bin/env python
import argparse
import logging
import random
import threading
import time
import reqless
# First off, read the arguments
parser = argparse.ArgumentParser(description="Run forgetful workers on contrived jobs.")
parser.add_argument(
"--forgetfulness",
dest="forgetfulness",
default=0.1,
type=float,
help="What portion of jobs should be randomly dropped by workers",
)
parser.add_argument(
"--host",
dest="host",
default="localhost",
help="The host to use when connecting to the remote data structure server",
)
parser.add_argument(
"--port",
dest="port",
default=6379,
type=int,
help="The port to use when connecting to the remote data structure server",
)
parser.add_argument(
"--stages",
dest="stages",
default=1,
type=int,
help="How many times to requeue jobs",
)
parser.add_argument(
"--jobs",
dest="numJobs",
default=1000,
type=int,
help="How many jobs to schedule for the test",
)
parser.add_argument(
"--workers",
dest="numWorkers",
default=10,
type=int,
help="How many workers should do the work",
)
parser.add_argument(
"--retries",
dest="retries",
default=5,
type=int,
help="How many retries to give each job",
)
parser.add_argument(
"--quiet",
dest="verbose",
default=True,
action="store_false",
help="Reduce all the output",
)
parser.add_argument(
"--no-flush",
dest="flush",
default=True,
action="store_false",
help="Don't flush the remote data structure server after running",
)
args = parser.parse_args()
logger = logging.getLogger("reqless-bench")
formatter = logging.Formatter("[%(asctime)s] %(threadName)s => %(message)s")
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(formatter)
logger.addHandler(handler)
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARN)
# Our reqless client
client = reqless.client(host=args.host, port=args.port)
class ForgetfulWorker(threading.Thread):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
# This is to fake out thread-level workers
tmp = reqless.client(host=args.host, port=args.port)
tmp.worker += "-" + self.getName()
self.q = tmp.queue("testing")
def run(self):
while len(self.q):
job = self.q.pop()
if not job:
# Sleep a little bit
time.sleep(0.1)
logger.debug("No jobs available. Sleeping.")
continue
# Randomly drop a job?
random_value = random.random() # noqa: DUO102
if random_value < args.forgetfulness:
logger.debug("Randomly dropping job!")
continue
else:
logger.debug("Completing job!")
job["stages"] -= 1
if job["stages"] > 0:
job.complete("testing")
else:
job.complete()
# Make sure that the data structure server instance is empty first
if len(client.database.keys("*")):
print("Must begin with empty data structure server")
exit(1)
client.config.set("heartbeat", 1)
# This is how much CPU the data structure server had used /before/
cpu_before = (
client.database.info()["used_cpu_user"] + client.database.info()["used_cpu_sys"]
)
# This is how long it took to add the jobs
put_time = -time.time()
# Alright, let's make a bunch of jobs
testing = client.queue("testing")
jids = [
testing.put(
reqless.Job,
{"test": "benchmark", "count": c, "stages": args.stages},
retries=args.retries,
)
for c in range(args.numJobs)
]
put_time += time.time()
# This is how long it took to run the workers
work_time = -time.time()
# And now let's make some workers to deal with 'em!
workers = [ForgetfulWorker() for i in range(args.numWorkers)]
for worker in workers:
worker.start()
for worker in workers:
worker.join()
work_time += time.time()
def histo(histo_list):
count = sum(histo_list)
histo_list_truthy = [bucket for bucket in histo_list if bucket]
for index in range(len(histo_list_truthy)):
print(
"\t\t%2i, %10.9f, %i"
% (index, float(histo_list_truthy[index]) / count, histo_list_truthy[index])
)
# Now we'll print out some interesting stats
stats = client.queue("testing").stats()
print("Wait:")
print("\tCount: %i" % stats["wait"]["count"])
print("\tMean : %fs" % stats["wait"]["mean"])
print("\tSDev : %f" % stats["wait"]["std"])
print("\tWait Time Histogram:")
histo(stats["wait"]["histogram"])
print("Run:")
print("\tCount: %i" % stats["run"]["count"])
print("\tMean : %fs" % stats["run"]["mean"])
print("\tSDev : %f" % stats["run"]["std"])
print("\tCompletion Time Histogram:")
histo(stats["run"]["histogram"])
print("=" * 50)
print("Put jobs : %fs" % put_time)
print("Do jobs : %fs" % work_time)
info = client.database.info()
print("Data Structure Server Mem: %s" % info["used_memory_human"])
print("Data Structure Server Lua: %s" % info["used_memory_lua"])
print(
"Data Structure Server CPU: %fs"
% (info["used_cpu_user"] + info["used_cpu_sys"] - cpu_before)
)
# Flush the database when we're done
if args.flush:
print("Flushing")
client.database.flushdb()