-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
188 lines (167 loc) · 6.58 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import machine, neopixel, utime, os, time, sys
import ujson
from machine import Pin
from pico_lte.utils.status import Status
from pico_lte.core import PicoLTE
from pico_lte.common import debug
from printer import printMessage
# Uncomment this for verbose logging
debug.set_level(0)
# Constants
USER_LED = Pin(22, mode=Pin.OUT)
NEOPIXEL_NUM_LEDS = 8
NEOPIXEL_PIN = machine.Pin(15)
NEOPIXEL_RUN_TIME_SECONDS = 2
MAIN_LOOP_DELAY_SECONDS = 7200 # 120 minute loop delay
REPONSE_MAX_LENGTH_THRESHOLD = 1000
# Variables
picoLTE = PicoLTE()
neopixel = neopixel.NeoPixel(NEOPIXEL_PIN, NEOPIXEL_NUM_LEDS)
light_up_neopixel = False
readDelay = 15
run_count = 0
request_failures = 0
# Sets all NeoPixel LEDs off
def set_neopixel_off():
for i in range(NEOPIXEL_NUM_LEDS):
neopixel[i] = (0, 0, 0)
neopixel.write()
def neopixel_color_cycle(wait):
for i in range(NEOPIXEL_NUM_LEDS):
neopixel[i] = (255, 0, 0) # Set LED color to red
neopixel.write()
utime.sleep_ms(wait)
for i in range(NEOPIXEL_NUM_LEDS):
neopixel[i] = (0, 255, 0) # Set LED color to green
neopixel.write()
utime.sleep_ms(wait)
for i in range(NEOPIXEL_NUM_LEDS):
neopixel[i] = (0, 0, 255) # Set LED color to blue
neopixel.write()
utime.sleep_ms(wait)
# Turns LEDs off, checks local data file, logs ambient temperature, and registers LTE network
def run_initial_setup():
# Turn off all LEDs
set_neopixel_off();
USER_LED.off()
# Initialize file system as needed
print("run_initial_setup: Reading root directory... ****")
print(os.listdir())
initial_file = open("data.txt", "a")
data = initial_file.read()
print("run_initial_setup: Data file content:")
if not data:
print("run_initial_setup: NO DATA FOUND")
else:
print(data)
initial_file.close()
# Log ambient temperature for fun
print("run_initial_setup: Reading ambient temperature... ****")
sensor_temp = machine.ADC(4)
conversion_factor = 3.3 / (65535)
reading = sensor_temp.read_u16() * conversion_factor
temp = 27 - (reading -0.706)/(0.001721)
print(temp)
# Get LTE system ready
print("run_initial_setup: Registering LTE network... ****")
picoLTE.network.register_network()
picoLTE.http.set_context_id()
picoLTE.network.get_pdp_ready()
picoLTE.http.set_server_url()
# Check if the response message matches the one in the data file. If not, we should print it
def check_if_should_print(message_to_check):
print("check_if_should_print(" + message_to_check + ") ****")
file = open("data.txt", "r")
file_message = file.read()
if message_to_check != file_message:
print("check_if_should_print: File data is '" + file_message + "', which doesn't match ****")
file.close()
file = open("data.txt", "w")
file.write(message_to_check)
file.close()
return True
else:
print("check_if_should_print: File data is '" + file_message + "' which matches latest message ****")
file.close()
return False
# Parse the response for the message, check if it matches the data file, and print if it doesn't
def parse_and_try_print(message):
try:
print("parse_and_try_print: Checking response message of: '" + message + "' ****")
sanitized_message = message.replace("\"", "")
print("parse_and_try_print: Sanitized message is -> " + sanitized_message)
should_print = check_if_should_print(sanitized_message)
if should_print:
# Light up the NeoPixel for a few cycles
neopixel_cycle_count = 0
while neopixel_cycle_count < (NEOPIXEL_RUN_TIME_SECONDS * 10):
neopixel_color_cycle(100)
neopixel_cycle_count += 1
set_neopixel_off();
# Print the message
print("parse_and_try_print: Printing message... ****")
printMessage(sanitized_message)
except:
print("parse_and_try_print: Exception occurred while parsing response! ****")
set_neopixel_off()
class WebResponse(object):
def __init__(self, json_str):
data = ujson.loads(json_str)
for key, value in data.items():
setattr(self, key, value)
# Run the initial setup and then keep sending requests (main loop)
try:
print("main_loop: Initial setup starting... ****")
run_initial_setup()
print("main_loop: Initial setup complete... ****")
while True:
USER_LED.on()
# Reset the HTTP context before making a new request
picoLTE.http.set_context_id()
picoLTE.network.get_pdp_ready()
picoLTE.http.set_server_url()
print("main_loop: Sending web request... ****")
result = picoLTE.http.get()
time.sleep(readDelay)
# After the delay, read response. This gives the Sixfab LTE time to process the request
result = picoLTE.http.read_response()
debug.info(result)
# Check if the response was successful
if result["status"] == Status.SUCCESS:
json_result = ujson.dumps(result)
if len(json_result) > REPONSE_MAX_LENGTH_THRESHOLD or "<HTML>" in json_result:
# This seems excessively long, is this a 404 page?
print("main_loop: Got a response longer than max characters! Not going to print!")
readDelay = 30
request_failures += 1
else:
# Looks good, reset delay to the default
print("main_loop: Got what looks like a good response")
readDelay = 15
request_failures = 0
webby = WebResponse(json_result)
parse_and_try_print(webby.response[1])
else:
# No dice, extend delay before reading response. Try again next time
readDelay = 30
request_failures += 1
# Get ready for the next loop
print("main_loop: Main loop functions done...")
USER_LED.off()
run_count += 1
if run_count > 24:
print("main_loop: Reached run count threshold, resetting device...")
machine.reset()
elif request_failures > 2:
print("main_loop: Too many request failures, resetting device...")
machine.reset()
# Wait this many seconds before running the loop again
time.sleep(MAIN_LOOP_DELAY_SECONDS)
except Exception as e:
sys.print_exception(e)
print("main_loop: Exception occurred! Cleaning up... ****")
light_up_neopixel = False
set_neopixel_off()
USER_LED.off()
print("\n**** Program Exiting ****")
machine.reset()