forked from vkuhlen/bob-webserver-demo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
webserver.py
121 lines (96 loc) · 3.9 KB
/
webserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import machine
import time
import binascii
from microWebSrv import MicroWebSrv
import sensors
from config import Config
_config = Config()
# To not run into trouble with CORS while developing
_headers = {'Access-Control-Allow-Origin': '*'}
@MicroWebSrv.route('/restart')
def restart(httpClient, httpResponse):
machine.reset()
##############################################################################
# Routes for sensor readings #################################################
##############################################################################
@MicroWebSrv.route('/api/sensors/<sensor>')
def measure_ds1820(httpClient, httpResponse, routeArgs):
sensor = routeArgs['sensor']
if hasattr(sensors, sensor):
# Read sensor DS1820
if sensor == 'ds1820':
ds = sensors.ds1820
for rom in ds.roms:
ds.start_conversion(rom=rom)
time.sleep_ms(750)
data = {}
for rom in ds.roms:
data[binascii.hexlify(rom).decode('utf-8')] = ds.read_temp_async(rom=rom)
# Read sensor HX711
elif sensor == 'hx711':
hx = sensors.hx711
data = {'weight': hx.read_average(times=5)}
# Read sensor BME280
elif sensor == 'bme280':
bme = sensors.bme280
data = {}
(data['t'],
data['p'],
data['h']) = bme.read_compensated_data()
return httpResponse.WriteResponseJSONOk(obj=data, headers=_headers)
else:
return httpResponse.WriteResponseJSONError(404)
##############################################################################
# Routes for working with the config #########################################
##############################################################################
@MicroWebSrv.route('/api/config')
def get_config(httpClient, httpResponse):
return httpResponse.WriteResponseJSONOk(obj=_config.data, headers=_headers)
@MicroWebSrv.route('/api/config/<section>/<subsection>', 'GET')
def get_config_subsection(httpClient, httpResponse, routeArgs):
section = routeArgs['section']
subsection = routeArgs['subsection']
data = _config.data.get(section, {}).get(subsection, None)
if data is None:
return httpResponse.WriteResponseJSONError(404)
else:
return httpResponse.WriteResponseJSONOk(
obj=data,
headers=_headers)
@MicroWebSrv.route('/api/config/<section>/<subsection>', 'POST')
def post_config_subsection(httpClient, httpResponse, routeArgs):
section = routeArgs['section']
subsection = routeArgs['subsection']
form_data = httpClient.ReadRequestContentAsJSON()
if not section in _config.data.keys():
_config.data[section] = {}
_config.data[section][subsection] = form_data
_config.write()
return httpResponse.WriteResponseJSONOk(
obj={'status': 'saved'},
headers=_headers)
@MicroWebSrv.route('/api/config/<section>/<subsection>', 'DELETE')
def delete_config_subsection(httpClient, httpResponse, routeArgs):
section = routeArgs['section']
subsection = routeArgs['subsection']
if section in _config.data.keys():
data = _config.data[section].pop(subsection, None)
if data is None:
return httpResponse.WriteResponseJSONError(404)
else:
_config.write()
return httpResponse.WriteResponseJSONOk(
obj={'status': 'deleted'},
headers=_headers)
@MicroWebSrv.route('/api/config/<section>/<subsection>', 'OPTIONS')
def options_config(httpClient, httpResponse, routeArgs):
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, DELETE, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type'}
return httpResponse.WriteResponseOk(
headers = headers,
contentType = "text/plain",
contentCharset = "UTF-8",
content="")
mws = MicroWebSrv()