From 3d913bdbc21a245201f992460f62eb6ed1d16236 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Mon, 15 Jul 2024 11:57:27 +0900 Subject: [PATCH 1/3] bump to 0.2.1.dev0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1df108a..57f9a58 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "asyncgui-ext-synctools" -version = "0.2.0" +version = "0.2.1.dev0" description = "Inter-task sychronization and communication." authors = ["Nattōsai Mitō "] license = "MIT" From 66c1d4907519681c7b7369064dc7892e09a5bbc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Mon, 15 Jul 2024 11:55:30 +0900 Subject: [PATCH 2/3] add Queue --- src/asyncgui_ext/synctools/all.py | 4 +- src/asyncgui_ext/synctools/queue.py | 357 ++++++++++++++++++++++++++++ tests/test_queue.py | 350 +++++++++++++++++++++++++++ 3 files changed, 709 insertions(+), 2 deletions(-) create mode 100644 src/asyncgui_ext/synctools/queue.py create mode 100644 tests/test_queue.py diff --git a/src/asyncgui_ext/synctools/all.py b/src/asyncgui_ext/synctools/all.py index f964b1c..3659064 100644 --- a/src/asyncgui_ext/synctools/all.py +++ b/src/asyncgui_ext/synctools/all.py @@ -1,6 +1,6 @@ __all__ = ( - 'Event', 'Box', + 'Event', 'Box', 'Queue', ) from .event import Event from .box import Box - +from .queue import Queue diff --git a/src/asyncgui_ext/synctools/queue.py b/src/asyncgui_ext/synctools/queue.py new file mode 100644 index 0000000..afab13a --- /dev/null +++ b/src/asyncgui_ext/synctools/queue.py @@ -0,0 +1,357 @@ +''' +.. versionadded:: 0.2.1 + +.. code:: + + import asyncgui as ag + from asyncgui_ext.synctools.queue import Queue + + async def producer(q): + for c in "ABC": + await q.put(c) + print('produced', c) + + async def consumer(q): + async for c in q: + print('consumed', c) + + q = Queue(capacity=1) + ag.start(producer(q)) + ag.start(consumer(q)) + +.. code:: text + + produced A + produced B + consumed A + produced C + consumed B + consumed C + +癖 -- Quirk -- +----------------- + +.. code:: + + async def async_fn1(q, consumed): + await q.put('A') + await q.put('B') + item = await q.get() + consumed.append(item) + await q.put('C') + item = await q.get() + consumed.append(item) + + async def async_fn2(q, consumed): + item = await q.get() + consumed.append(item) + + consumed = [] + q = Queue(capacity=1) + ag.start(async_fn1(q, consumed)) + ag.start(async_fn2(q, consumed)) + print(consumed) + +.. code:: text + + ['B', 'C', 'A'] + +上記の出力を見てわかるように ``A``, ``B``, ``C`` の順でキューに入れたのに +``consumed`` には ``B``, ``C``, ``A`` の順で入っています。 +このような事が起こるのは ``asyncgui`` が自身ではメインループを持たない故にタイマー機能を提供できない事に起因します。 +なので外部のタイマー機能を利用する事でこの問題を解消する選択肢を用意する予定なのですが、それまではこういうものだと諦めてください。 +因みに ``Kivy`` を使っているのであれば ``Kivy`` のタイマー機能を用いる事でこの問題を解決済みの ``asynckivy-ext-queue`` +というモジュールが既にあるので氣になる人はそちらをご利用ください。 +''' + +__all__ = ( + 'QueueException', 'WouldBlock', 'Closed', + 'Queue', 'Order', 'QueueState', + ) +import typing as T +import enum +import heapq +from functools import partial +from collections import deque + +from asyncgui import AsyncEvent + + +class QueueState(enum.Enum): + ''' + Enum class that represents the state of the Queue. + ''' + + OPENED = enum.auto() + ''' + All operations are allowed. + + :meta hide-value: + ''' + + HALF_CLOSED = enum.auto() + ''' + Putting an item into the queue is not allowed. + + :meta hide-value: + ''' + + CLOSED = enum.auto() + ''' + Putting an item into the queue is not allowed. + Getting an item from the queue is not allowed. + + :meta hide-value: + ''' + + +class QueueException(Exception): + '''Base class of all the queue-related exceptions.''' + + +class WouldBlock(QueueException): + '''Raised by X_nowait functions if X would block.''' + + +class Closed(QueueException): + ''' + Occurs when: + + * one tries to get an item from a queue that is in the ``CLOSED`` state. + * one tries to get an item from an **empty** queue that is in the ``HALF_CLOSED`` state. + * one tries to put an item into a queue that is in the ``CLOSED`` or ``HALF_CLOSED`` state. + ''' + + +Item: T.TypeAlias = T.Any +Order = T.Literal['fifo', 'lifo', 'small-first'] + + +class Queue: + ''' + :param capacity: Cannot be zero. Unlimited if None. + ''' + def __init__(self, *, capacity: int | None=None, order: Order='fifo'): + if capacity is None: + pass + elif (not isinstance(capacity, int)) or capacity < 1: + raise ValueError(f"'capacity' must be either a positive integer or None. (was {capacity!r})") + self._init_container(capacity, order) + self._state = QueueState.OPENED + self._putters = deque[tuple[AsyncEvent, Item]]() + self._getters = deque[AsyncEvent]() + self._capacity = capacity + self._order = order + self._is_transferring = False + + def _init_container(self, capacity, order): + # If the capacity is 1, there is no point in reordering items. + # Therefore, for performance reasons, treat the order as 'lifo'. + if capacity == 1 or order == 'lifo': + c = [] + c_get = c.pop + c_put = c.append + elif order == 'fifo': + c = deque(maxlen=capacity) + c_get = c.popleft + c_put = c.append + elif order == 'small-first': + c = [] + c_get = partial(heapq.heappop, c) + c_put = partial(heapq.heappush, c) + else: + raise ValueError(f"'order' must be one of 'lifo', 'fifo' or 'small-first'. (was {order!r})") + self._c = c + self._c_get = c_get + self._c_put = c_put + + def __len__(self) -> int: + return len(self._c) + + size = property(__len__) + '''Number of items in the queue. This equals to ``len(queue)``. ''' + + @property + def capacity(self) -> int | None: + '''Number of items allowed in the queue. None if unbounded.''' + return self._capacity + + @property + def is_empty(self) -> bool: + return not self._c + + @property + def is_full(self) -> bool: + return len(self._c) == self._capacity + + @property + def order(self) -> Order: + return self._order + + async def get(self) -> T.Awaitable[Item]: + ''' + .. code-block:: + + item = await queue.get() + ''' + if self._state is QueueState.CLOSED: + raise Closed + if self._state is QueueState.HALF_CLOSED and self.is_empty: + raise Closed + + if self._is_transferring or self.is_empty: + event = AsyncEvent() + self._getters.append(event) + exc, item = (await event.wait())[0] + if exc is not None: + raise exc + return item + + item = self._c_get() + if self._putters: + self._transfer_items() + return item + + def get_nowait(self) -> Item: + ''' + .. code-block:: + + item = queue.get_nowait() + ''' + if self._state is QueueState.CLOSED: + raise Closed + if self.is_empty: + if self._state is QueueState.HALF_CLOSED: + raise Closed + raise WouldBlock + + item = self._c_get() + if (not self._is_transferring) and self._putters: + self._transfer_items() + return item + + async def put(self, item) -> T.Awaitable: + ''' + .. code-block:: + + await queue.put(item) + ''' + if self._state is not QueueState.OPENED: + raise Closed + + if self._is_transferring or self.is_full: + event = AsyncEvent() + self._putters.append((event, item, )) + exc = (await event.wait())[0][0] + if exc is not None: + raise exc + return + + self._c_put(item) + if self._getters: + self._transfer_items() + + def put_nowait(self, item): + ''' + .. code-block:: + + queue.put_nowait(item) + ''' + if self._state is not QueueState.OPENED: + raise Closed + if self.is_full: + raise WouldBlock + + self._c_put(item) + if (not self._is_transferring) and self._getters: + self._transfer_items() + + def half_close(self): + ''' + Partially closes the queue. + Putting an item into it is no longer allowed. + ''' + if self._state is not QueueState.OPENED: + return + self._state = QueueState.HALF_CLOSED + + Closed_ = Closed + for putter, __ in self._putters: + putter.fire(Closed_) + if not self.is_empty: + return + for getter in self._getters: + getter.fire(Closed_, None) + + def close(self): + ''' + Fully closes the queue. + Putting an item into it is no longer allowed. + Getting an item from it is no longer allowed. + All the items it holds will be discarded. + ''' + if self._state is QueueState.CLOSED: + return + self._state = QueueState.CLOSED + self._c.clear() + + Closed_ = Closed + for putter, __ in self._putters: + putter.fire(Closed_) + for getter in self._getters: + getter.fire(Closed_, None) + + async def __aiter__(self): + ''' + Repeats getting an item from the queue until it gets closed. + + .. code-block:: + + async for item in queue: + ... + + This is equivalent to: + + .. code-block:: + + try: + while True: + item = await queue.get() + ... + except Closed: + pass + ''' + try: + while True: + yield await self.get() + except Closed: + pass + + def _transfer_items(self): + assert not self._is_transferring + self._is_transferring = True + try: + # LOAD_FAST + c_put = self._c_put + c_get = self._c_get + putters = self._putters + getters = self._getters + next_putter = putters.popleft + next_getter = getters.popleft + + while True: + while (not self.is_full) and putters: + putter, item = next_putter() + if (cb := putter._callback) is not None: + c_put(item) + cb(None) + if (not getters) or self.is_empty: + break + while (not self.is_empty) and getters: + getter = next_getter() + if (cb := getter._callback) is not None: + cb(None, c_get()) + if (not putters) or self.is_full: + break + finally: + self._is_transferring = False diff --git a/tests/test_queue.py b/tests/test_queue.py new file mode 100644 index 0000000..0cda569 --- /dev/null +++ b/tests/test_queue.py @@ -0,0 +1,350 @@ +import itertools + +import pytest +import asyncgui as ag +from asyncgui_ext.synctools.queue import Queue, Closed, WouldBlock + +p = pytest.mark.parametrize +p_order = p('order', ('lifo', 'fifo', 'small-first')) +p_capacity = p('capacity', [1, 2, None, ]) +p_capacity2 = p('capacity', [1, 2, 3, 4, None, ]) +p_limited_capacity = p('capacity', [1, 2, ]) + + +@p('capacity', (-1, 0, 0.0, 1.0, -1.0, '1', )) +def test_invalid_capacity_value(capacity): + with pytest.raises(ValueError): + Queue(capacity=capacity) + + +@p_capacity +@p_order +def test_container_type(capacity, order): + q = Queue(capacity=capacity, order=order) + if capacity != 1 and order == 'fifo': + from collections import deque + assert isinstance(q._c, deque) + else: + assert isinstance(q._c, list) + + +@p_capacity +@p('nowait', [True, False, ]) +def test_put_an_item_into_a_closed_queue(capacity, nowait): + q = Queue(capacity=capacity) + q.close() + with pytest.raises(Closed): + q.put_nowait('Z') if nowait else ag.start(q.put('Z')) + + +@p_capacity +@p('nowait', [True, False, ]) +def test_put_an_item_into_a_half_closed_queue(capacity, nowait): + q = Queue(capacity=capacity) + q.half_close() + with pytest.raises(Closed): + q.put_nowait('Z') if nowait else ag.start(q.put('Z')) + + +@p_capacity +@p('nowait', [True, False, ]) +def test_get_an_item_from_a_closed_queue(capacity, nowait): + q = Queue(capacity=capacity) + q.close() + with pytest.raises(Closed): + q.get_nowait() if nowait else ag.start(q.get()) + + +@p_capacity +@p('nowait', [True, False, ]) +def test_get_an_item_from_a_half_closed_queue(capacity, nowait): + q = Queue(capacity=capacity) + q.half_close() + with pytest.raises(Closed): + q.get_nowait() if nowait else ag.start(q.get()) + + +@p_capacity2 +def test_put_and_get_in_the_same_task(capacity): + + async def async_fn(): + q = Queue(capacity=capacity) + await q.put('A') + return await q.get() + + task = ag.start(async_fn()) + assert task.result == 'A' + + +@p_capacity2 +def test_put_and_get(capacity): + q = Queue(capacity=capacity) + putter = ag.start(q.put('A')) + getter = ag.start(q.get()) + assert putter.finished + assert getter.result == 'A' + + +@p_capacity2 +def test_get_and_put(capacity): + q = Queue(capacity=capacity) + getter = ag.start(q.get()) + putter = ag.start(q.put('A')) + assert putter.finished + assert getter.result == 'A' + + +@p_capacity2 +@p('close', [True, False]) +def test_async_for(capacity, close): + + async def producer(q, items): + for i in items: + await q.put(i) + + async def consumer(q): + return ''.join([item async for item in q]) + + q = Queue(capacity=capacity) + c = ag.start(consumer(q)) + p = ag.start(producer(q, 'ABC')) + assert p.finished + assert not c.finished + q.close() if close else q.half_close() + assert c.result == 'ABC' + + +@p('close', [True, False, ]) +@p_capacity2 +def test_close_a_queue_while_it_holding_a_getter(close, capacity): + async def consumer(q): + with pytest.raises(Closed): + await q.get() + + q = Queue(capacity=capacity) + task = ag.start(consumer(q)) + assert not task.finished + q.close() if close else q.half_close() + assert task.finished + + +@p('close', [True, False, ]) +def test_close_a_queue_while_it_holding_a_putter(close): + async def producer(q): + with pytest.raises(Closed): + await q.put(None) + + q = Queue(capacity=1) + q.put_nowait(None) + task = ag.start(producer(q)) + assert not task.finished + q.close() if close else q.half_close() + assert task.finished + + +@p_order +def test_various_statistics(order): + q = Queue(capacity=2, order=order) + assert q.order == order + assert len(q) == 0 + assert q.capacity == 2 + assert q.size == 0 + assert q.is_empty + assert not q.is_full + ag.start(q.put(1)) + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + ag.start(q.put(2)) + assert q.size == 2 + assert not q.is_empty + assert q.is_full + ag.start(q.get()) + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + ag.start(q.get()) + assert q.size == 0 + assert q.is_empty + assert not q.is_full + + +@p_order +def test_various_statistics_nowait(order): + q = Queue(capacity=2, order=order) + assert q.order == order + assert len(q) == 0 + assert q.capacity == 2 + assert q.size == 0 + assert q.is_empty + assert not q.is_full + q.put_nowait(1) + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + q.put_nowait(2) + assert q.size == 2 + assert not q.is_empty + assert q.is_full + q.get_nowait() + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + q.get_nowait() + assert q.size == 0 + assert q.is_empty + assert not q.is_full + + +@p_capacity +def test_get_nowait_while_there_are_no_items(capacity): + q = Queue(capacity=capacity) + with pytest.raises(WouldBlock): + q.get_nowait() + + +@p('capacity', [1, 2, ]) +def test_put_nowait_while_there_are_no_getters_and_full_of_items(capacity): + q = Queue(capacity=capacity) + for i in range(capacity): + q._c_put(i) + assert q.is_full + with pytest.raises(WouldBlock): + q.put_nowait(99) + + +def test_putter_triggers_half_close(): + async def producer1(q): + await q.put('B') + q.half_close() + + async def producer2(q): + with pytest.raises(Closed): + await q.put('C') + + async def consumer1(q): + assert await q.get() == 'A' + + async def consumer2(q): + assert await q.get() == 'B' + + q = Queue(capacity=1) + q.put_nowait('A') + p1 = ag.start(producer1(q)) + p2 = ag.start(producer2(q)) + c1 = ag.start(consumer1(q)) + c2 = ag.start(consumer2(q)) + assert p1.finished + assert p2.finished + assert c1.finished + assert c2.finished + assert q.is_empty + + +def test_putter_triggers_close(): + async def producer1(q): + await q.put('B') + q.close() + + async def producer2(q): + with pytest.raises(Closed): + await q.put('C') + + async def consumer1(q): + assert await q.get() == 'A' + + async def consumer2(q): + with pytest.raises(Closed): + await q.get() + + q = Queue(capacity=1) + q.put_nowait('A') + p1 = ag.start(producer1(q)) + p2 = ag.start(producer2(q)) + c1 = ag.start(consumer1(q)) + c2 = ag.start(consumer2(q)) + assert p1.finished + assert p2.finished + assert c1.finished + assert c2.finished + + +@p_capacity2 +@p('close', [True, False, ]) +def test_getter_triggers_close(capacity, close): + async def producer1(q): + await q.put('A') + + async def producer2(q): + with pytest.raises(Closed): + await q.put('B') + + async def consumer1(q): + assert await q.get() == 'A' + q.close() if close else q.half_close() + + async def consumer2(q): + with pytest.raises(Closed): + await q.get() + + q = Queue(capacity=capacity) + c1 = ag.start(consumer1(q)) + c2 = ag.start(consumer2(q)) + p1 = ag.start(producer1(q)) + p2 = ag.start(producer2(q)) + assert p1.finished + assert p2.finished + assert c1.finished + assert c2.finished + + +@p('capacity', [1, None, ]) +@p("script", itertools.permutations("P2 P5 C2 C4 C1".split(), 5)) +def test_various_permutations(capacity, script): + consumed = [] + + async def producer(q, n_items): + for __ in range(n_items): + await q.put(str(n_items)) + + async def consumer(q, n_items): + for __ in range(n_items): + consumed.append(await q.get()) + + q = Queue(capacity=capacity) + tasks = [] + for action in script: + if action[0] == 'P': + tasks.append(ag.start(producer(q, int(action[1:])))) + elif action[0] == 'C': + tasks.append(ag.start(consumer(q, int(action[1:])))) + else: + pytest.fail(f"Unknown action: {action}") + assert q.is_empty + for t in tasks: + assert t.finished + consumed.sort() + assert ''.join(consumed) == '2255555' + + + +def test_quirk(): + '''これはテストでは無くどうしても取り除けなかった癖の確認。これが通らなくなったらドキュメントもそれに合わせて書き換えないといけない。''' + async def async_fn1(q, consumed): + await q.put('A') + await q.put('B') + item = await q.get() + consumed.append(item) + await q.put('C') + item = await q.get() + consumed.append(item) + + async def async_fn2(q, consumed): + item = await q.get() + consumed.append(item) + + consumed = [] + q = Queue(capacity=1) + ag.start(async_fn1(q, consumed)) + ag.start(async_fn2(q, consumed)) + assert consumed == ['B', 'C', 'A'] From 9c88089841b233ef036e1d250008cfd09e3d744f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Mon, 15 Jul 2024 11:55:56 +0900 Subject: [PATCH 3/3] edit docs --- sphinx/box.rst | 8 ++++++ sphinx/event.rst | 8 ++++++ sphinx/index.rst | 4 ++- sphinx/queue.rst | 8 ++++++ sphinx/reference.rst | 9 ------ src/asyncgui_ext/synctools/box.py | 42 +++++++++++++-------------- src/asyncgui_ext/synctools/event.py | 44 ++++++++++++++++------------- 7 files changed, 73 insertions(+), 50 deletions(-) create mode 100644 sphinx/box.rst create mode 100644 sphinx/event.rst create mode 100644 sphinx/queue.rst delete mode 100644 sphinx/reference.rst diff --git a/sphinx/box.rst b/sphinx/box.rst new file mode 100644 index 0000000..eb2f623 --- /dev/null +++ b/sphinx/box.rst @@ -0,0 +1,8 @@ +=== +Box +=== + +.. automodule:: asyncgui_ext.synctools.box + :members: + :undoc-members: + :exclude-members: diff --git a/sphinx/event.rst b/sphinx/event.rst new file mode 100644 index 0000000..9d1f9b1 --- /dev/null +++ b/sphinx/event.rst @@ -0,0 +1,8 @@ +===== +Event +===== + +.. automodule:: asyncgui_ext.synctools.event + :members: + :undoc-members: + :exclude-members: diff --git a/sphinx/index.rst b/sphinx/index.rst index 0b566b0..35d1329 100644 --- a/sphinx/index.rst +++ b/sphinx/index.rst @@ -7,4 +7,6 @@ Inter-task sychronization and communication. .. toctree:: :hidden: - reference + box + event + queue diff --git a/sphinx/queue.rst b/sphinx/queue.rst new file mode 100644 index 0000000..5295be5 --- /dev/null +++ b/sphinx/queue.rst @@ -0,0 +1,8 @@ +===== +Queue +===== + +.. automodule:: asyncgui_ext.synctools.queue + :members: + :undoc-members: + :exclude-members: diff --git a/sphinx/reference.rst b/sphinx/reference.rst deleted file mode 100644 index 9840ea1..0000000 --- a/sphinx/reference.rst +++ /dev/null @@ -1,9 +0,0 @@ -============= -API Reference -============= - - -.. autoclass:: asyncgui_ext.synctools.event.Event - :members: - :undoc-members: - :exclude-members: diff --git a/src/asyncgui_ext/synctools/box.py b/src/asyncgui_ext/synctools/box.py index a010dae..4841b1a 100644 --- a/src/asyncgui_ext/synctools/box.py +++ b/src/asyncgui_ext/synctools/box.py @@ -1,34 +1,34 @@ -__all__ = ('Box', ) -import types - +''' +.. code-block:: -class Box: - ''' - Similar to :class:`asyncgui.AsyncBox`, but this one can handle multiple tasks simultaneously. - This is the closest thing to :class:`asyncio.Event` in this library. - - .. code-block:: + import asyncgui as ag + from asyncgui_ext.synctools.box import Box - async def async_fn(b1, b2): - args, kwargs = await b1.get() + async def async_fn1(box): + for __ in range(10): + args, kwargs = await box.get() assert args == (1, ) assert kwargs == {'crow': 'raven', } - args, kwargs = await b2.get() + async def async_fn2(box): + for __ in range(10): + args, kwargs = await box.get() assert args == (2, ) assert kwargs == {'frog': 'toad', } - args, kwargs = await b1.get() - assert args == (1, ) - assert kwargs == {'crow': 'raven', } + box = Box() + box.put(1, crow='raven') + ag.start(async_fn1(box)) + box.update(2, frog='toad') + ag.start(async_fn2(box)) +''' + + +__all__ = ('Box', ) +import types - b1 = Box() - b2 = Box() - b1.put(1, crow='raven') - start(async_fn(b1, b2)) - b2.put(2, frog='toad') - ''' +class Box: __slots__ = ('_item', '_waiting_tasks', ) def __init__(self): diff --git a/src/asyncgui_ext/synctools/event.py b/src/asyncgui_ext/synctools/event.py index f56f07a..37ace1d 100644 --- a/src/asyncgui_ext/synctools/event.py +++ b/src/asyncgui_ext/synctools/event.py @@ -1,29 +1,35 @@ -__all__ = ('Event', ) -import types +''' +.. code-block:: + import asyncgui as ag + from asyncgui_ext.synctools.event import Event -class Event: - ''' - Similar to :class:`asyncgui.AsyncEvent`, but this one can handle multiple tasks simultaneously. + async def async_fn1(e): + args, kwargs = await e.wait() + assert args == (1, ) + assert kwargs == {'crow': 'raven', } - .. code-block:: + args, kwargs = await e.wait() + assert args == (2, ) + assert kwargs == {'toad': 'frog', } - async def async_fn(e): - args, kwargs = await e.wait() - assert args == (2, ) - assert kwargs == {'crow': 'raven', } + async def async_fn2(e): + args, kwargs = await e.wait() + assert args == (2, ) + assert kwargs == {'toad': 'frog', } - args, kwargs = await e.wait() - assert args == (3, ) - assert kwargs == {'toad': 'frog', } + e = Event() + ag.start(async_fn1(e)) + e.fire(1, crow='raven') + ag.start(async_fn2(e)) + e.fire(2, toad='frog') +''' + +__all__ = ('Event', ) +import types - e = Event() - e.fire(1, crocodile='alligator') - start(async_fn(e)) - e.fire(2, crow='raven') - e.fire(3, toad='frog') - ''' +class Event: __slots__ = ('_waiting_tasks', ) def __init__(self):