Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit scheduling strategies #1066

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions mephisto/data_model/task_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,26 @@ class TaskRunArgs:
)
},
)
unit_scheduling_strategy: str = field(
default="FIFO",
metadata={
"help": (
"Strategy that determines the scheduling strategy for units."
"Can be 'FIFO', 'LIFO' or 'Random'."
)
},
)
scheduler_prefer_assigned_assignments: bool = field(
default=True,
metadata={
"help": (
"Determines if units in assignments with at least one already"
"assigned unit are preferred over all other units. This is"
"usefull for concurrent tasks as this leads to shorter wait"
"times."
)
},
)

post_install_script: str = field(
default="",
Expand Down
101 changes: 101 additions & 0 deletions mephisto/operations/unit_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from abc import ABC, abstractmethod
from mephisto.data_model.task_run import TaskRun
from random import shuffle
from mephisto.data_model.constants.assignment_state import AssignmentState


class UnitScheduler(ABC):
def __init__(self, task_run: "TaskRun", prefer_assigned_assignments: bool):
self.task_run = task_run
self.prefer_assigned_assignments = prefer_assigned_assignments

def reserve_unit(self, available_units):
"""
This method wraps the internal strategy.

If 'prefer_assigned_assignments' is False, this method behaves exactly
like the internal strategy.

If 'prefer_assigned_assignments' is True, the scheduler prefers
assignments with assigned units over unassigned assignments. This is
usefull when dealing with concurrent assignments.
"""
if not self.prefer_assigned_assignments:
return self.reserve_unit(available_units)
else:
assignments = self.task_run.get_assignments()

def hasAssignedUnit(assignment):
"""
Returns if a given assignment has assigned units. Note that this
is different from 'assignemt.get_status() == assigned' as the
get_status() would return launched if the assignment has any
unit that is launched but not assigned.
"""
units = assignment.get_units()
statuses = set(unit.get_status() for unit in units)
return any([s == AssignmentState.ASSIGNED for s in statuses])

ids_of_assigned_assignments = [
assignment.db_id for assignment in assignments if hasAssignedUnit(assignment)
]
units_in_assigned_assignments = list(
filter(
lambda unit: unit.assignment_id in ids_of_assigned_assignments, available_units
)
)
other_units = list(
filter(
lambda unit: (not (unit.assignment_id in ids_of_assigned_assignments)),
available_units,
)
)

res = self._reserve_unit(units_in_assigned_assignments)
if res != None:
return res
else:
return self._reserve_unit(other_units)

@abstractmethod
def _reserve_unit(self, available_units):
"""
Implementations of this method should choose one of 'available_units'
according to their scheduling strategy, reserve it in the task run and return it.
If there are no available_units left or none of them can succesfully be reserved
this method returns 'None'.
"""


class FIFOUnitScheduler(UnitScheduler):
def _reserve_unit(self, available_units):
reserved_unit = None

while len(available_units) > 0 and reserved_unit is None:
unit = available_units.pop(0)
reserved_unit = self.task_run.reserve_unit(unit)

return reserved_unit


class LIFOUnitScheduler(UnitScheduler):
def _reserve_unit(self, available_units):
reserved_unit = None

while len(available_units) > 0 and reserved_unit is None:
unit = available_units.pop()
reserved_unit = self.task_run.reserve_unit(unit)

return reserved_unit


class RandomUnitScheduler(UnitScheduler):
def _reserve_unit(self, available_units):
reserved_unit = None
shuffle(available_units)

while len(available_units) > 0 and reserved_unit is None:
unit = available_units.pop()
reserved_unit = self.task_run.reserve_unit(unit)

return reserved_unit
40 changes: 32 additions & 8 deletions mephisto/operations/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
GOLD_UNIT_INDEX,
)
from mephisto.operations.datatypes import LiveTaskRun, WorkerFailureReasons
from mephisto.operations.unit_scheduler import (
FIFOUnitScheduler,
LIFOUnitScheduler,
RandomUnitScheduler,
)

from typing import Sequence, Dict, Union, Optional, List, Any, TYPE_CHECKING

Expand Down Expand Up @@ -118,6 +123,21 @@ def register_run(self, live_run: "LiveTaskRun") -> None:
self._live_run is None
), "Cannot associate more than one live run to a worker pool at a time"
self._live_run = live_run
scheduling_strategy = live_run.task_run.args.task.unit_scheduling_strategy
prefer_assigned_assignments = (
live_run.task_run.args.task.scheduler_prefer_assigned_assignments
)

if scheduling_strategy == "FIFO":
self.unit_scheduler = FIFOUnitScheduler(live_run.task_run, prefer_assigned_assignments)
elif scheduling_strategy == "LIFO":
self.unit_scheduler = LIFOUnitScheduler(live_run.task_run, prefer_assigned_assignments)
elif scheduling_strategy == "Random":
self.unit_scheduler = RandomUnitScheduler(
live_run.task_run, prefer_assigned_assignments
)
else:
raise "Unknown scheduling strategy"

def get_live_run(self) -> "LiveTaskRun":
"""Get the associated live run for this worker pool, asserting it's set"""
Expand Down Expand Up @@ -191,10 +211,14 @@ async def _assign_unit_to_agent(

logger.debug(f"Worker {worker.db_id} is being assigned one of {len(units)} units.")

reserved_unit = None
while len(units) > 0 and reserved_unit is None:
unit = units.pop(0)
reserved_unit = task_run.reserve_unit(unit)
## replace this block of code
# reserved_unit = None
# while len(units) > 0 and reserved_unit is None:
# unit = units.pop(0)
# reserved_unit = task_run.reserve_unit(unit)
## block end
reserved_unit = self.unit_scheduler.reserve_unit(units)

if reserved_unit is None:
AGENT_DETAILS_COUNT.labels(response="no_available_units").inc()
live_run.client_io.enqueue_agent_details(
Expand All @@ -211,7 +235,7 @@ async def _assign_unit_to_agent(
crowd_provider.AgentClass.new_from_provider_data,
self.db,
worker,
unit,
reserved_unit,
crowd_data,
),
)
Expand Down Expand Up @@ -246,14 +270,14 @@ async def _assign_unit_to_agent(
self.agents[agent.get_agent_id()] = agent

# Launch individual tasks
if unit.unit_index < 0 or not live_run.task_runner.is_concurrent:
if reserved_unit.unit_index < 0 or not live_run.task_runner.is_concurrent:
# Run the unit
live_run.task_runner.execute_unit(
unit,
reserved_unit,
agent,
)
else:
assignment = await loop.run_in_executor(None, unit.get_assignment)
assignment = await loop.run_in_executor(None, reserved_unit.get_assignment)

# Set status to waiting
agent.update_status(AgentState.STATUS_WAITING)
Expand Down
Loading