diff --git a/mephisto/data_model/task_run.py b/mephisto/data_model/task_run.py index 476cc917b..01230d6a5 100644 --- a/mephisto/data_model/task_run.py +++ b/mephisto/data_model/task_run.py @@ -125,6 +125,26 @@ class TaskRunArgs: ) }, ) + unit_scheduling_strategy: str = field( + default="FIFO", + metadata={ + "help": ( + "Strategy that determines the scheduling strategy for units." + "Can be 'FIFO', 'LIFO' or 'Random'." + ) + } + ) + scheduler_prefer_assigned_assignments: bool = field( + default=True, + metadata={ + "help": ( + "Determines if units in assignments with at least one already" + "assigned unit are preferred over all other units. This is" + "usefull for concurrent tasks as this leads to shorter wait" + "times." + ) + } + ) post_install_script: str = field( default="", diff --git a/mephisto/operations/worker_pool.py b/mephisto/operations/worker_pool.py index 3b88e431d..cd81536b0 100644 --- a/mephisto/operations/worker_pool.py +++ b/mephisto/operations/worker_pool.py @@ -24,7 +24,7 @@ GOLD_UNIT_INDEX, ) from mephisto.operations.datatypes import LiveTaskRun, WorkerFailureReasons -from mephisto.operations.unit_scheduler import FIFOUnitScheduler +from mephisto.operations.unit_scheduler import FIFOUnitScheduler, LIFOUnitScheduler, RandomUnitScheduler from typing import Sequence, Dict, Union, Optional, List, Any, TYPE_CHECKING @@ -119,7 +119,18 @@ def register_run(self, live_run: "LiveTaskRun") -> None: self._live_run is None ), "Cannot associate more than one live run to a worker pool at a time" self._live_run = live_run - self.unit_scheduler = FIFOUnitScheduler(live_run.task_run, prefer_assigned_assignments=True) + scheduling_strategy = live_run.task_run.args.task.unit_scheduling_strategy + prefer_assigned_assignments = live_run.task_run.args.task.scheduler_prefer_assigned_assignments + + if(scheduling_strategy == "FIFO"): + self.unit_scheduler = FIFOUnitScheduler(live_run.task_run, prefer_assigned_assignments) + elif(scheduling_strategy == "LIFO"): + self.unit_scheduler = LIFOUnitScheduler(live_run.task_run, prefer_assigned_assignments) + elif(scheduling_strategy == "Random"): + self.unit_scheduler = RandomUnitScheduler(live_run.task_run, prefer_assigned_assignments) + else: + raise "Unknown scheduling strategy" + def get_live_run(self) -> "LiveTaskRun": """Get the associated live run for this worker pool, asserting it's set"""