Skip to content

Commit

Permalink
Add random strategy for group scheduling
Browse files Browse the repository at this point in the history
Signed-off-by: Colin Wilk <[email protected]>
  • Loading branch information
kliwniloc committed Aug 3, 2024
1 parent afbda90 commit c8c7893
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
17 changes: 11 additions & 6 deletions src/benchmarking_tool/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""

import datetime
import random as r
import time
import uuid
from itertools import product
Expand Down Expand Up @@ -69,7 +70,7 @@ class Coordinator:
Retrieves the filename used in benchmark tasks.
"""

def __schedule_every_2_hours(self) -> None:
def __schedule_every_2_hours(self, random: bool = False) -> None:
for t in [
"00",
"02",
Expand All @@ -84,17 +85,21 @@ def __schedule_every_2_hours(self) -> None:
"20",
"22",
]:
schedule.every().day.at(f"{t}:00").do(self.trigger_benchmark)
schedule.every().day.at(f"{t}:00").do(
lambda: self.trigger_benchmark(random)
)

def __init__(self) -> None:
pass

def trigger_benchmark(self) -> None:
def trigger_benchmark(self, random: bool = False) -> None:
trigger_time: datetime.datetime = datetime.datetime.now()
wave_id: str = str(uuid.uuid4())
l.info("Triggering Benchmark")
for worker_group, filename in product(self.worker_groups, self.filenames):
workers: set[Any] = worker.get_workers(worker_group)
workers: list[Any] = list(worker.get_workers(worker_group))
if random: # Pick one element if we use random strategy
workers = [r.choice(workers)]
for worker_instance in workers:
queue_name = worker_instance.decode()
task = (
Expand All @@ -110,8 +115,8 @@ def trigger_benchmark(self) -> None:
)
task.get() # Wait until the task is done

def run(self) -> NoReturn:
self.__schedule_every_2_hours()
def run(self, random: bool = False) -> NoReturn:
self.__schedule_every_2_hours(random)
l.info("Coordinator Started")
while True:
schedule.run_pending()
Expand Down
9 changes: 7 additions & 2 deletions src/benchmarking_tool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@
help="Trigger a Job immediately and then exit. Useful for testing",
action="store_true",
)
parser_coordinator.add_argument(
"--random",
help="Chooses a node from the group at random instead of running on all nodes",
action="store_true",
)

if __name__ == "__main__":
args = parser.parse_args()
Expand All @@ -179,7 +184,7 @@
Coordinator().set_worker_groups(args.groups).set_filenames(args.filenames)
)
if args.trigger:
coordinator.trigger_benchmark()
coordinator.trigger_benchmark(args.random)

else:
coordinator.run()
coordinator.run(args.random)

0 comments on commit c8c7893

Please sign in to comment.