Skip to content

Commit

Permalink
Zero time decision task upgrades and docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesArruda committed Mar 5, 2025
1 parent 848b1e2 commit e080631
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 3 deletions.
115 changes: 115 additions & 0 deletions docs/source/user_guide/how_tos/decision_tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,118 @@ The difference between making and rehearsing the decision is covered in the tuto
is called during normal operations of UPSTAGE, and the latter is called during a
rehearsal of the task or network. It is up the user to ensure that no side-effects occur during the
rehearsal that would touch non-rehearsing state, actors, or other data.

There is one class variable that can be set on each subclass of the decision task, which is ``DO_NOT_HOLD``:

.. code-block:: python
class Thinker(UP.DecisionTask):
DO_NOT_HOLD = True # default is False
def make_decision(): ...
This feature lets the user turn off zero time holding on decision tasks, which causes decision
tasks to move right into the next task without allowing anything else to run. For examples and
reasoning, see the following section.

This feature is most applicable for avoiding race conditions in decision making where a
follow-on task can alter the simulation with its first yield such that other decisions
make would become incorrect [#f1]_. This would only occur for equally-timed decision processes,
and only for the first yield in a Task that follows the decision. For example, deciding
which ``Store`` to queue on may result in an Actor waiting when no wait was expected.

Zero Time Considerations
------------------------

Decision tasks are meant to not advance the clock, but they do cause a zero-time timeout to be
created. This is done to provide other events in the queue the chance to complete at the same
time step before the task network proceeds for the current actor.

Here is a short example of the default behavior:

.. code-block:: python
import upstage_des.api as UP
class Waiter(UP.Task):
def task(self, *, actor):
print(f"{self.env.now:.1f} >> {actor.name} in Waiter")
yield UP.Wait(1.0)
class Runner(UP.Task):
def task(self, *, actor):
print(f"{self.env.now:.1f} >> {actor.name} in Runner")
yield UP.Wait(2.0)
class Thinker(UP.DecisionTask):
def make_decision(self, *, actor):
print(f"{self.env.now:.1f} >> {actor.name} in Thinker")
if "one" in actor.name:
self.set_actor_task_queue(actor, ["Waiter"])
else:
self.set_actor_task_queue(actor, ["Runner"])
net = UP.TaskNetworkFactory(
name="Example Net",
task_classes={"Waiter": Waiter, "Runner":Runner, "Thinker":Thinker},
task_links={
"Waiter":UP.TaskLinks(default="Thinker", allowed=["Thinker"]),
"Thinker":UP.TaskLinks(default="", allowed=["Waiter", "Runner"]),
"Runner":UP.TaskLinks(default="Thinker", allowed=["Thinker"]),
},
)
with UP.EnvironmentContext() as env:
a = UP.Actor(name="Actor one", debug_log=True)
b = UP.Actor(name="Actor two", debug_log=True)
for actor in [a,b]:
n = net.make_network()
actor.add_task_network(n)
actor.start_network_loop(n.name, "Waiter")
env.run(until=2)
The result is:

.. code-block:: python
>>> 0.0 >> Actor one in Waiter
>>> 0.0 >> Actor two in Waiter
>>> 1.0 >> Actor one in Thinker
>>> 1.0 >> Actor two in Thinker
>>> 1.0 >> Actor one in Waiter
>>> 1.0 >> Actor two in Runner
Even though ``Actor one`` gets to the decision task first, the internal timeout
preserves ordering of the stops. This would happen even if there was no timeout,
because UPSTAGE yields on the decision task as a simpy process.

If we were to skip yielding on the process of a ``DecisionTask``, then this ordering
of output would result:

.. code-block:: python
...
# The only modification is to add DO_NOT_HOLD = True
class Thinker(UP.DecisionTask):
DO_NOT_HOLD = True
def make_decision(self, *, actor):
...
>>> 0.0 >> Actor one in Waiter
>>> 0.0 >> Actor two in Waiter
>>> 1.0 >> Actor one in Thinker
>>> 1.0 >> Actor one in Waiter
>>> 1.0 >> Actor two in Thinker
>>> 1.0 >> Actor two in Runner
Note that ``Actor one`` starts the ``Waiter`` task (and stops at the first yield inside)
before ``Actor two`` gets to its decision task.

Turning off the hold using ``DO_NOT_HOLD = True`` gives a guarantee to ``Actor two`` that
the simulation they see in ``Thinker`` is what they will encounter in the first yield in
``Runner``.


.. [#f1] Needing this feature may be a code smell, depending on the situation. Take care
to check that other ways of deciding and queueing might be better suited.
12 changes: 12 additions & 0 deletions src/upstage_des/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ def run(self, *, actor: "Actor") -> Generator[SimpyEvent | Process, Any, None]:
class DecisionTask(Task):
"""A task used for decision processes."""

DO_NOT_HOLD = False

def task(self, *, actor: Any) -> TASK_TYPE:
"""Define the process this task follows."""
raise SimulationError("No need to call `task` on a DecisionTask")
Expand Down Expand Up @@ -602,6 +604,16 @@ def run(self, *, actor: "Actor") -> Generator[SimpyEvent, None, None]:
assert isinstance(self.env, SimpyEnv)
yield self.env.timeout(0.0)

def run_skip(self, *, actor: "Actor") -> None:
"""Run the decision task with no clock reference.
Task networks will use this method if SKIP_WAIT is True.
Args:
actor (Actor): The actor making decisions
"""
self.make_decision(actor=actor)


class TerminalTask(Task):
"""A rehearsal-safe task that cannot exit, i.e., it is terminal.
Expand Down
12 changes: 9 additions & 3 deletions src/upstage_des/task_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from simpy import Process

from upstage_des.base import SimulationError
from upstage_des.task import Task, TerminalTask, process
from upstage_des.task import DecisionTask, Task, TerminalTask, process

REH_ACTOR = TypeVar("REH_ACTOR", bound="Actor")

Expand Down Expand Up @@ -124,9 +124,15 @@ def loop(
self._current_task_inst = task_instance
self._current_task_inst._set_network_name(self.name)
self._current_task_inst._set_network_ref(self)
self._current_task_proc = self._current_task_inst.run(actor=actor)

yield self._current_task_proc
if (
isinstance(self._current_task_inst, DecisionTask)
and self._current_task_inst.DO_NOT_HOLD
):
self._current_task_inst.run_skip(actor=actor)
else:
self._current_task_proc = self._current_task_inst.run(actor=actor)
yield self._current_task_proc

next_name = self._next_task_name(task_name, actor)
self._current_task_name = next_name
Expand Down
82 changes: 82 additions & 0 deletions src/upstage_des/test/test_task_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,3 +830,85 @@ def task(self, *, actor: Thing) -> TASK_GEN:
thing.add_task_network(factory.make_network())
new_thing = thing.rehearse_network("fact", ["ThingWait", "ThingWait"])
assert new_thing.env.now == new_thing.the_time, "Bad rehearsal env time"


def test_decision_task_hold() -> None:
# Test the conditions found in https://github.com/gtri/upstage/issues/35
# Looks at zero time holds vs pass-through decision tasks

# Test for new behavior first.
data = []

class Waiter(Task):
def task(self, *, actor: Actor) -> TASK_GEN:
data.append(f"{self.env.now:.1f} >> {actor.name} in Waiter")
yield Wait(1.0)

class Runner(Task):
def task(self, *, actor: Actor) -> TASK_GEN:
data.append(f"{self.env.now:.1f} >> {actor.name} in Runner")
yield Wait(2.0)

class Thinker(DecisionTask):
DO_NOT_HOLD = True

def make_decision(self, *, actor: Actor) -> None:
data.append(f"{self.env.now:.1f} >> {actor.name} in Thinker")
if "one" in actor.name:
self.set_actor_task_queue(actor, ["Waiter"])
else:
self.set_actor_task_queue(actor, ["Runner"])

net = TaskNetworkFactory(
name="Example Net",
task_classes={"Waiter": Waiter, "Runner": Runner, "Thinker": Thinker},
task_links={
"Waiter": TaskLinks(default="Thinker", allowed=["Thinker"]),
"Thinker": TaskLinks(default="", allowed=["Waiter", "Runner"]),
"Runner": TaskLinks(default="Thinker", allowed=["Thinker"]),
},
)
with EnvironmentContext() as env:
a = Actor(name="Actor one", debug_log=True)
b = Actor(name="Actor two", debug_log=True)

for actor in [a, b]:
n = net.make_network()
actor.add_task_network(n)
actor.start_network_loop(n.name, "Waiter")

env.run(until=2)

expected = [
"0.0 >> Actor one in Waiter",
"0.0 >> Actor two in Waiter",
"1.0 >> Actor one in Thinker",
"1.0 >> Actor one in Waiter",
"1.0 >> Actor two in Thinker",
"1.0 >> Actor two in Runner",
]
assert data == expected

# Reset data in place, test for default behavior
data[:] = []

Thinker.DO_NOT_HOLD = False
with EnvironmentContext() as env:
a = Actor(name="Actor one", debug_log=True)
b = Actor(name="Actor two", debug_log=True)

for actor in [a, b]:
n = net.make_network()
actor.add_task_network(n)
actor.start_network_loop(n.name, "Waiter")

env.run(until=2)
expected = [
"0.0 >> Actor one in Waiter",
"0.0 >> Actor two in Waiter",
"1.0 >> Actor one in Thinker",
"1.0 >> Actor two in Thinker",
"1.0 >> Actor one in Waiter",
"1.0 >> Actor two in Runner",
]
assert data == expected

0 comments on commit e080631

Please sign in to comment.