-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathweb_server.py
187 lines (161 loc) · 6.08 KB
/
web_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import socket
import network
class WebServerRoute:
def __init__(self, route, method, func):
self.route = route
self.method = method
self.func = func
class WebServer:
MAX_SOCKET_RECEIVE = 1024
def __init__(self):
self.ap = network.WLAN(network.AP_IF)
self.server_socket = None
self.routes = {}
def start(self, port=80):
print('web server starting')
# an AP is required
if not self.ap.active():
raise Exception('Access Point not running')
# cleanup any previously connected sockets
self.stop()
addr = socket.getaddrinfo('0.0.0.0', port)[0][-1]
self._create_socket(addr)
print('Listening on: {}'.format(addr))
print('Connect to http://192.168.4.1')
client = None
while True:
try:
client, client_addr = self.server_socket.accept()
print('Client connection from', client_addr)
client.settimeout(5.0)
request = b""
try:
while "\r\n\r\n" not in request:
request += client.recv(self.MAX_SOCKET_RECEIVE)
except OSError:
pass
print("Request: {}".format(request))
if "HTTP" not in request: # skip invalid requests
print('Invalid request')
continue
# pass to the request handler
self._handle_request(client, request)
finally:
if client:
client.close()
def stop(self):
print('web server stopping')
if not self.server_socket:
return
self.server_socket.close()
self.server_socket = None
def _create_socket(self, addr):
self.server_socket = socket.socket()
self.server_socket.bind(addr)
self.server_socket.listen(1)
def route(self, path, method='GET'):
"""
Route decorator to add a route
"""
def wrapper(handler):
self.add_route(path, method, handler)
return handler
return wrapper
def add_route(self, path, method, handler):
"""
Adds a route by supplying a path, method and handler manually
"""
route_id = self._make_route_id(path, method)
if route_id in self.routes:
raise Exception("Route {} already exists.".format(route_id))
self.routes[route_id] = WebServerRoute(path, method, handler)
def get_route_handler(self, request_path, request_method):
"""
Gets the route handler if applicable
"""
route_id = self._make_route_id(request_path, request_method)
if route_id in self.routes.keys():
return self.routes.get(route_id)
return None
def _make_route_id(self, request_path, request_method):
"""
Creates a route identifier by joining the method to path
"""
return '{}:{}'.format(request_method, request_path)
def _handle_request(self, client, request):
"""
Request handler takes the inbound request and transforms
"""
# retrieve the request header from the request
request_header = self.get_request_header(request)
if not request_header:
print('Error with request header')
self._handle_not_found(client, request)
return
# split the header to get method/path
method, path, *_ = request_header.split()
handler = self.get_route_handler(path, method)
if handler is None:
self._handle_not_found(client, request)
return
# update response with associated handler
handler.func(client, request)
def _handle_not_found(self, client, url):
error_msg = "Route not found: {}".format(url)
print(error_msg)
self.send_response(client, error_msg, status_code=404)
def send_response(self, client, payload, content_type='text/html', status_code=200):
try:
content_length = len(payload)
self.send_header(client, content_type, status_code, content_length)
if content_length > 0:
client.sendall(payload)
except Exception as exc:
print('Error sending response {}'.format(exc))
client.close()
def send_header(self, client, content_type='text/html', status_code=200, content_length=None):
try:
client.sendall("HTTP/1.0 {} OK\r\n".format(status_code))
client.sendall("Content-Type: {}\r\n".format(content_type))
if content_length is not None:
client.sendall("Content-Length: {}\r\n".format(content_length))
client.sendall("\r\n")
except Exception as exc:
print('Error sending header {}'.format(exc))
def get_request_header(self, request):
"""
Retrieves the header from a request
"""
request_data = request.decode().strip().split('\r\n')
if not request_data:
return ''
return request_data[0]
def get_form_data(self, request):
res = {}
request_data = request.decode().strip().split('\r\n')
if not request_data:
return res
for item in request_data[-1].split('&'):
param = item.split('=', 1)
if len(param) > 0:
res[self._unescape_plus(param[0])] = self._unescape_plus(param[1]) if len(param) > 1 else ''
return res
def _unescape_plus(self, s):
return self._unescape(s.replace('+', ' '))
def _unescape(self, s):
"""
Unescapes any html encoded string
"""
if '&' not in s:
return s
r = str(s).split('%')
try:
b = r[0].encode()
for i in range(1, len(r)):
try:
b += bytes([int(r[i][:2], 16)]) + r[i][2:].encode()
except Exception:
b += b'%' + r[i].encode()
return b.decode('UTF-8')
except Exception:
return str(s)