Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero time decision task upgrades and docs. #44

Merged
merged 1 commit into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions docs/source/user_guide/how_tos/decision_tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,118 @@ The difference between making and rehearsing the decision is covered in the tuto
is called during normal operations of UPSTAGE, and the latter is called during a
rehearsal of the task or network. It is up the user to ensure that no side-effects occur during the
rehearsal that would touch non-rehearsing state, actors, or other data.

There is one class variable that can be set on each subclass of the decision task, which is ``DO_NOT_HOLD``:

.. code-block:: python

class Thinker(UP.DecisionTask):
DO_NOT_HOLD = True # default is False
def make_decision(): ...

This feature lets the user turn off zero time holding on decision tasks, which causes decision
tasks to move right into the next task without allowing anything else to run. For examples and
reasoning, see the following section.

This feature is most applicable for avoiding race conditions in decision making where a
follow-on task can alter the simulation with its first yield such that other decisions
make would become incorrect [#f1]_. This would only occur for equally-timed decision processes,
and only for the first yield in a Task that follows the decision. For example, deciding
which ``Store`` to queue on may result in an Actor waiting when no wait was expected.

Zero Time Considerations
------------------------

Decision tasks are meant to not advance the clock, but they do cause a zero-time timeout to be
created. This is done to provide other events in the queue the chance to complete at the same
time step before the task network proceeds for the current actor.

Here is a short example of the default behavior:

.. code-block:: python

import upstage_des.api as UP

class Waiter(UP.Task):
def task(self, *, actor):
print(f"{self.env.now:.1f} >> {actor.name} in Waiter")
yield UP.Wait(1.0)

class Runner(UP.Task):
def task(self, *, actor):
print(f"{self.env.now:.1f} >> {actor.name} in Runner")
yield UP.Wait(2.0)

class Thinker(UP.DecisionTask):
def make_decision(self, *, actor):
print(f"{self.env.now:.1f} >> {actor.name} in Thinker")
if "one" in actor.name:
self.set_actor_task_queue(actor, ["Waiter"])
else:
self.set_actor_task_queue(actor, ["Runner"])

net = UP.TaskNetworkFactory(
name="Example Net",
task_classes={"Waiter": Waiter, "Runner":Runner, "Thinker":Thinker},
task_links={
"Waiter":UP.TaskLinks(default="Thinker", allowed=["Thinker"]),
"Thinker":UP.TaskLinks(default="", allowed=["Waiter", "Runner"]),
"Runner":UP.TaskLinks(default="Thinker", allowed=["Thinker"]),
},
)

with UP.EnvironmentContext() as env:
a = UP.Actor(name="Actor one", debug_log=True)
b = UP.Actor(name="Actor two", debug_log=True)

for actor in [a,b]:
n = net.make_network()
actor.add_task_network(n)
actor.start_network_loop(n.name, "Waiter")

env.run(until=2)

The result is:

.. code-block:: python

>>> 0.0 >> Actor one in Waiter
>>> 0.0 >> Actor two in Waiter
>>> 1.0 >> Actor one in Thinker
>>> 1.0 >> Actor two in Thinker
>>> 1.0 >> Actor one in Waiter
>>> 1.0 >> Actor two in Runner

Even though ``Actor one`` gets to the decision task first, the internal timeout
preserves ordering of the stops. This would happen even if there was no timeout,
because UPSTAGE yields on the decision task as a simpy process.

If we were to skip yielding on the process of a ``DecisionTask``, then this ordering
of output would result:

.. code-block:: python

...
# The only modification is to add DO_NOT_HOLD = True
class Thinker(UP.DecisionTask):
DO_NOT_HOLD = True
def make_decision(self, *, actor):
...

>>> 0.0 >> Actor one in Waiter
>>> 0.0 >> Actor two in Waiter
>>> 1.0 >> Actor one in Thinker
>>> 1.0 >> Actor one in Waiter
>>> 1.0 >> Actor two in Thinker
>>> 1.0 >> Actor two in Runner

Note that ``Actor one`` starts the ``Waiter`` task (and stops at the first yield inside)
before ``Actor two`` gets to its decision task.

Turning off the hold using ``DO_NOT_HOLD = True`` gives a guarantee to ``Actor two`` that
the simulation they see in ``Thinker`` is what they will encounter in the first yield in
``Runner``.


.. [#f1] Needing this feature may be a code smell, depending on the situation. Take care
to check that other ways of deciding and queueing might be better suited.
12 changes: 12 additions & 0 deletions src/upstage_des/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ def run(self, *, actor: "Actor") -> Generator[SimpyEvent | Process, Any, None]:
class DecisionTask(Task):
"""A task used for decision processes."""

DO_NOT_HOLD = False

def task(self, *, actor: Any) -> TASK_TYPE:
"""Define the process this task follows."""
raise SimulationError("No need to call `task` on a DecisionTask")
Expand Down Expand Up @@ -602,6 +604,16 @@ def run(self, *, actor: "Actor") -> Generator[SimpyEvent, None, None]:
assert isinstance(self.env, SimpyEnv)
yield self.env.timeout(0.0)

def run_skip(self, *, actor: "Actor") -> None:
"""Run the decision task with no clock reference.

Task networks will use this method if SKIP_WAIT is True.

Args:
actor (Actor): The actor making decisions
"""
self.make_decision(actor=actor)


class TerminalTask(Task):
"""A rehearsal-safe task that cannot exit, i.e., it is terminal.
Expand Down
12 changes: 9 additions & 3 deletions src/upstage_des/task_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from simpy import Process

from upstage_des.base import SimulationError
from upstage_des.task import Task, TerminalTask, process
from upstage_des.task import DecisionTask, Task, TerminalTask, process

REH_ACTOR = TypeVar("REH_ACTOR", bound="Actor")

Expand Down Expand Up @@ -124,9 +124,15 @@ def loop(
self._current_task_inst = task_instance
self._current_task_inst._set_network_name(self.name)
self._current_task_inst._set_network_ref(self)
self._current_task_proc = self._current_task_inst.run(actor=actor)

yield self._current_task_proc
if (
isinstance(self._current_task_inst, DecisionTask)
and self._current_task_inst.DO_NOT_HOLD
):
self._current_task_inst.run_skip(actor=actor)
else:
self._current_task_proc = self._current_task_inst.run(actor=actor)
yield self._current_task_proc

next_name = self._next_task_name(task_name, actor)
self._current_task_name = next_name
Expand Down
82 changes: 82 additions & 0 deletions src/upstage_des/test/test_task_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,3 +830,85 @@ def task(self, *, actor: Thing) -> TASK_GEN:
thing.add_task_network(factory.make_network())
new_thing = thing.rehearse_network("fact", ["ThingWait", "ThingWait"])
assert new_thing.env.now == new_thing.the_time, "Bad rehearsal env time"


def test_decision_task_hold() -> None:
# Test the conditions found in https://github.com/gtri/upstage/issues/35
# Looks at zero time holds vs pass-through decision tasks

# Test for new behavior first.
data = []

class Waiter(Task):
def task(self, *, actor: Actor) -> TASK_GEN:
data.append(f"{self.env.now:.1f} >> {actor.name} in Waiter")
yield Wait(1.0)

class Runner(Task):
def task(self, *, actor: Actor) -> TASK_GEN:
data.append(f"{self.env.now:.1f} >> {actor.name} in Runner")
yield Wait(2.0)

class Thinker(DecisionTask):
DO_NOT_HOLD = True

def make_decision(self, *, actor: Actor) -> None:
data.append(f"{self.env.now:.1f} >> {actor.name} in Thinker")
if "one" in actor.name:
self.set_actor_task_queue(actor, ["Waiter"])
else:
self.set_actor_task_queue(actor, ["Runner"])

net = TaskNetworkFactory(
name="Example Net",
task_classes={"Waiter": Waiter, "Runner": Runner, "Thinker": Thinker},
task_links={
"Waiter": TaskLinks(default="Thinker", allowed=["Thinker"]),
"Thinker": TaskLinks(default="", allowed=["Waiter", "Runner"]),
"Runner": TaskLinks(default="Thinker", allowed=["Thinker"]),
},
)
with EnvironmentContext() as env:
a = Actor(name="Actor one", debug_log=True)
b = Actor(name="Actor two", debug_log=True)

for actor in [a, b]:
n = net.make_network()
actor.add_task_network(n)
actor.start_network_loop(n.name, "Waiter")

env.run(until=2)

expected = [
"0.0 >> Actor one in Waiter",
"0.0 >> Actor two in Waiter",
"1.0 >> Actor one in Thinker",
"1.0 >> Actor one in Waiter",
"1.0 >> Actor two in Thinker",
"1.0 >> Actor two in Runner",
]
assert data == expected

# Reset data in place, test for default behavior
data[:] = []

Thinker.DO_NOT_HOLD = False
with EnvironmentContext() as env:
a = Actor(name="Actor one", debug_log=True)
b = Actor(name="Actor two", debug_log=True)

for actor in [a, b]:
n = net.make_network()
actor.add_task_network(n)
actor.start_network_loop(n.name, "Waiter")

env.run(until=2)
expected = [
"0.0 >> Actor one in Waiter",
"0.0 >> Actor two in Waiter",
"1.0 >> Actor one in Thinker",
"1.0 >> Actor two in Thinker",
"1.0 >> Actor one in Waiter",
"1.0 >> Actor two in Runner",
]
assert data == expected