-
Notifications
You must be signed in to change notification settings - Fork 343
thread
loboris edited this page Jan 3, 2018
·
13 revisions
Python functions can be run in thread.
It means the function execution runs in ESP32 task separate from the main MicroPython thread and apears as running in background.
Maximum number of threads that can be started is defined via menuconfig. As each thread needs some memory (for stack), creating large amount of threads can couse stack overflow.
Thread function is usualy defined as infinite loop.
Threads can be created, paused, resumed and stopped.
Threads can comunicate with each other or the main thread (repl) using notifications and messages.
Method | Notes |
---|---|
_thread.start_new_thread("th_name", th_func, args[, kwargs]) | Start a new thread and return its identifier. The thread executes the function with the argument list args (which must be a tuple). The optional kwargs argument specifies a dictionary of keyword arguments. |
_thread.stack_size([size]) | Return the thread stack size (in bytes) used when creating new threads. The optional size argument specifies the stack size to be used for subsequently created threads. The maximum stack size used by the thread can be checked with _thread.list()
|
_thread.allowsuspend(allow) | The thread can be suspended if allow is True. When created, thread does not allow suspension until explicitly set by this function. The method must be called from the thread function. |
_thread.suspend(th_id) | Suspend the execution of the thread function. |
_thread.resume(th_id) | Resume the execution of the thread function previously suspended with _thread.suspend(). |
_thread.kill(th_id) | Terminate the thread, free all allocated memory. |
_thread.getThreadName(th_id) | Get the name of the thread whose id is th_id. |
_thread.getSelfName() | Get the name of the thread executing this method. |
_thread.getReplID() | Get the thread id of the main (repl) thread. Can be uset to send notifications/messages to the main thread. |
_thread.getnotification() | Check if any notification was sent to the thread executing this method. Returns integer >0 if there was pending notification or 0 if not |
_thread.getmsg() | Check if any message was sent to the thread executing this method. Returns 3-items tuple containing message type, sender thread id and message itself (integer or string). |
_thread.notify(th_id, value) | Send notification to the thread with id th_id. value is integer >0. |
_thread.sendmsg(th_id, msg) | Send message to the thread with id th_id. msg can be integer or string. |
_thread.replAcceptMsg([flag]) | Return True if the main thread (repl) is allowed to accept messages. If executed from the main thread, optional flag (True |
_thread.list([print]) | Print the status of all created threads. If the optional print argument is set to False, returns the tuple with created threads information. Thread info tuple has th_id, type, name, suspended state, stack size and max stack used items |
import machine, _thread, time
import micropython, gc
import bme280
# Setup the LED pins
bled = machine.Pin(4, mode=machine.Pin.OUT)
#rled = machine.Pin(0, mode=machine.Pin.OUT)
#gled = machine.Pin(2, mode=machine.Pin.OUT)
bled.value(0)
#gled.value(0)
#rled.value(0)
# Setup I2C to be used with BME280 sensor
i2c=machine.I2C(scl=machine.Pin(26),sda=machine.Pin(25),speed=400000)
# Initialize BME280
bme=bme280.BME280(i2c=i2c)
# Define LED thread function
#---------------------------
def rgbled(n=200, led=bled):
notif_exit = 4718
notif_replay = 2
notif_count = 3
x = 0
_thread.allowsuspend(True)
while True:
led.value(1)
time.sleep_ms(n)
led.value(0)
x = x + 1
t = 10
while t > 0:
notif = _thread.getnotification()
if notif == notif_exit:
_thread.sendmsg(_thread.getReplID(), "[%s] Exiting" % (_thread.getSelfName()))
return
elif notif == notif_replay:
_thread.sendmsg(_thread.getReplID(), "[%s] I've been notified" % (_thread.getSelfName()))
elif notif == notif_count:
_thread.sendmsg(_thread.getReplID(), "[%s] Run counter = %u" % (_thread.getSelfName(), x))
elif notif == 777:
_thread.sendmsg(_thread.getReplID(), "[%s] Forced EXCEPTION" % (_thread.getSelfName()))
time.sleep_ms(1000)
zz = 234 / 0
elif notif != 0:
_thread.sendmsg(_thread.getReplID(), "[%s] Got unknown notification: %u" % (_thread.getSelfName(), notif))
typ, sender, msg = _thread.getmsg()
if msg:
_thread.sendmsg(_thread.getReplID(), "[%s] Message from '%s'\n'%s'" % (_thread.getSelfName(), _thread.getThreadName(sender), msg))
time.sleep_ms(100)
t = t - 1
gc.collect()
# For LED thread we don't need more than 3K stack
_ = _thread.stack_size(3*1024)
# Start LED thread
#rth=_thread.start_new_thread("R_Led", rgbled, (100, rled))
time.sleep_ms(500)
#gth=_thread.start_new_thread("G_Led", rgbled, (250, gled))
bth=_thread.start_new_thread("B_Led", rgbled, (100, bled))
# Function to generate BME280 values string
#---------------
def bmevalues():
t, p, h = bme.read_compensated_data()
p = p // 256
pi = p // 100
pd = p - pi * 100
hi = h // 1024
hd = h * 100 // 1024 - hi * 100
#return "[{}] T={0:1g}C ".format(time.strftime("%H:%M:%S",time.localtime()), round(t / 100,1)) + "P={}.{:02d}hPa ".format(pi, pd) + "H={}.{:01d}%".format(hi, hd)
return "[{}] T={}C ".format(time.strftime("%H:%M:%S",time.localtime()), t / 100) + "P={}.{:02d}hPa ".format(pi, pd) + "H={}.{:02d}%".format(hi, hd)
# Define BME280 thread function
#-----------------------
def bmerun(interval=60):
_thread.allowsuspend(True)
sendmsg = True
send_time = time.time() + interval
while True:
while time.time() < send_time:
notif = _thread.getnotification()
if notif == 10002:
_thread.sendmsg(_thread.getReplID(), bmevalues())
elif notif == 10004:
sendmsg = False
elif notif == 10006:
sendmsg = True
elif (notif <= 3600) and (notif >= 10):
interval = notif
send_time = time.time() + interval
_thread.sendmsg(_thread.getReplID(), "Interval set to {} seconds".format(interval))
time.sleep_ms(100)
send_time = send_time + interval
if sendmsg:
_thread.sendmsg(_thread.getReplID(), bmevalues())
# 3K is enough for BME280 thread
_ = _thread.stack_size(3*1024)
# start the BME280 thread
bmeth=_thread.start_new_thread("BME280", bmerun, (60,))
# === In the 3rd thread we will run Neopixels rainbow demo ===
num_np = 144
np=machine.Neopixel(machine.Pin(21), num_np)
def rainbow(pos=1, bri=0.02):
dHue = 360*3/num_np
for i in range(1, num_np):
hue = (dHue * (pos+i)) % 360;
np.setHSB(i, hue, 1.0, bri, 1, False)
np.show()
# DEfine Neopixels thread function
#-----------------
def thrainbow_py():
pos = 0
bri = 0.02
while True:
for i in range(0, num_np):
dHue = 360.0/num_np * (pos+i);
hue = dHue % 360;
np.setHSB(i, hue, 1.0, bri, 1, False)
np.show()
notif = _thread.getnotification()
if (notif > 0) and (notif <= 100):
bri = notif / 100.0
elif notif == 1000:
_thread.sendmsg(_thread.getReplID(), "[%s] Run counter = %u" % (_thread.getSelfName(), pos))
pos = pos + 1
import math
#---------------
def thrainbow():
pos = 0
np.brightness(25)
while True:
np.rainbow(pos, 3)
notif = _thread.getnotification()
if (notif > 0) and (notif <= 100):
np.brightness(math.trunc(notif * 2.55))
elif notif == 1000:
_thread.sendmsg(_thread.getReplID(), "[%s] Run counter = %u" % (_thread.getSelfName(), pos))
pos = pos + 1
# Start the Neopixels thread
npth=_thread.start_new_thread("Neopixel", thrainbow, ())
utime.sleep(1)
machine.heap_info()
_thread.list()
# Set neopixel brightnes (%)
#_thread.notify(npth, 20)
# Get counter value from Neopixel thread
#_thread.notify(npth, 1000)