-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcode.py
executable file
·106 lines (86 loc) · 2.88 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import binascii
import board
import digitalio
import microcontroller
import os
import socketpool
import supervisor
import ssl
import time
import watchdog
import wifi
import adafruit_logging as logging
import adafruit_minimqtt.adafruit_minimqtt as MQTT
esp_uid = binascii.hexlify(microcontroller.cpu.uid).decode("ascii")
CIRCUITPY_WIFI_SSID = os.getenv("CIRCUITPY_WIFI_SSID")
CIRCUITPY_WIFI_PASSWORD = os.getenv("CIRCUITPY_WIFI_PASSWORD")
WIFI_HOSTNAME = os.getenv("WIFI_HOSTNAME", 'esp-{esp_uid}')
MQTT_HOST = os.getenv("MQTT_HOST", "mqtt")
MQTT_PORT = int(os.getenv("MQTT_PORT", 1883))
if os.getenv("MQTT_TLS", "") in ["true", 1]:
MQTT_TLS = True
else:
MQTT_TLS = False
MQTT_USERNAME = os.getenv("MQTT_USERNAME")
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
MQTT_TOPIC = os.getenv("MQTT_TOPIC")
MQTT_DEBUG_TOPIC = os.getenv("MQTT_DEBUG_TOPIC", f"debug/esp-{esp_uid}")
WATCHDOG_TIMEOUT = int(os.getenv("WATCHDOG_TIMEOUT", 180))
supervisor.set_next_code_file(
None,
reload_on_success=True,
reload_on_error=True,
)
wifi.radio.hostname = WIFI_HOSTNAME
pool = socketpool.SocketPool(wifi.radio)
relay = digitalio.DigitalInOut(board.D1)
relay.direction = digitalio.Direction.OUTPUT
def handle_connect(client, userdata, flags, rc):
print(f"Connected to MQTT {MQTT_HOST}:{MQTT_PORT}")
client.subscribe(MQTT_TOPIC)
client.publish(MQTT_DEBUG_TOPIC, f"connected, {supervisor.runtime.run_reason} {supervisor.get_previous_traceback()}")
def handle_disconnect(client, userdata, rc):
print("Disconnected from MQTT")
def handle_message(client, topic, message):
if message == "1":
if not relay.value:
relay.value = True
client.publish(MQTT_DEBUG_TOPIC, "on")
print("on")
else:
if relay.value:
relay.value = False
client.publish(MQTT_DEBUG_TOPIC, "off")
print("off")
def main():
print(f"Connecting to SSID {CIRCUITPY_WIFI_SSID}...")
wifi.radio.connect(CIRCUITPY_WIFI_SSID, CIRCUITPY_WIFI_PASSWORD)
print(f"Connected to SSID {CIRCUITPY_WIFI_SSID}, IP {wifi.radio.ipv4_address}")
if MQTT_TLS:
ssl_context = ssl.create_default_context()
else:
ssl_context = None
mqtt_client = MQTT.MQTT(
broker=MQTT_HOST,
port=MQTT_PORT,
username=MQTT_USERNAME,
password=MQTT_PASSWORD,
socket_pool=pool,
is_ssl=MQTT_TLS,
ssl_context=ssl_context,
socket_timeout=0.1,
client_id=esp_uid,
)
mqtt_client.enable_logger(logging)
mqtt_client.on_connect = handle_connect
mqtt_client.on_disconnect = handle_disconnect
mqtt_client.on_message = handle_message
mqtt_client.will_set(MQTT_DEBUG_TOPIC, "disconnected")
mqtt_client.connect()
while True:
wdt.feed()
mqtt_client.loop()
wdt = microcontroller.watchdog
wdt.timeout = WATCHDOG_TIMEOUT
wdt.mode = watchdog.WatchDogMode.RESET
main()