-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathmessage.py
executable file
·69 lines (55 loc) · 1.93 KB
/
message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# message.py
# https://github.com/peterhinch/micropython-async/blob/a87bda1b716090da27fd288cc8b19b20525ea20c/v3/primitives/message.py
# Now uses ThreadSafeFlag for efficiency
# Copyright (c) 2018-2021 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Usage:
# from primitives.message import Message
try:
import uasyncio as asyncio
except ImportError:
import asyncio
# A coro waiting on a message issues await message
# A coro or hard/soft ISR raising the message issues.set(payload)
# .clear() should be issued by at least one waiting task and before
# next event.
# class Message(asyncio.ThreadSafeFlag):
class Message(object):
def __init__(self, _=0):
# Arg: poll interval. Compatibility with old code.
self._evt = asyncio.Event()
self._data = None # Message
self._state = False # Ensure only one task waits on ThreadSafeFlag
self._is_set = False # For .is_set()
# super().__init__()
def clear(self):
# At least one task must call clear when scheduled
self._state = False
self._is_set = False
def __iter__(self):
yield from self.wait()
return self._data
async def wait(self):
if self._state:
# A task waits on ThreadSafeFlag
await self._evt.wait() # Wait on event
else:
# First task to wait
self._state = True
# Ensure other tasks see updated ._state before they wait
await asyncio.sleep_ms(0)
await super().wait() # Wait on ThreadSafeFlag
self._evt.set()
self._evt.clear()
return self._data
def set(self, data=None):
# Can be called from a hard ISR
self._data = data
self._is_set = True
# super().set()
def is_set(self):
return self._is_set
def value(self):
return self._data