-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathauart_hd_013156.py
89 lines (71 loc) · 2.88 KB
/
auart_hd_013156.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# auart_hd.py
# Author: Peter Hinch
# Copyright Peter Hinch 2018-2020 Released under the MIT license
# Demo of running a half-duplex protocol to a device. The device never sends
# unsolicited messages. An example is a communications device which responds
# to AT commands.
# The master sends a message to the device, which may respond with one or more
# lines of data. The master assumes that the device has sent all its data when
# a timeout has elapsed.
# In this test a physical device is emulated by the Device class
# To test link X1-X4 and X2-X3
from machine import UART
import uasyncio as asyncio
from primitives.delay_ms import Delay_ms
#crc16 constants valid for modbus-RTU
PRESET = 0xFFFF
POLYNOMIAL = 0xA001 # bit reverse of 0x8005
class PZEM():
def __init__(self, uart, address=0x01, timeout=1000):
self.uart = uart
self.uart.init(baudrate=9600,bits=8,parity=None,stop=1)
self.timeout = timeout
self.swriter = asyncio.StreamWriter(self.uart, {})
self.sreader = asyncio.StreamReader(self.uart)
self.delay = Delay_ms()
self.response = ""
#build data request with address of the device at cronstruction time to avoid crc calculation later.
self.request = bytes('','utf-8') + bytes([address]) + bytes('\x04\x00\x00\x04','utf-8')
self.request += crc16(request)
asyncio.create_task(self._send())
asyncio.create_task(self._recv())
async def _send(self):
while True:
await pzem.send_command(self.request)
await asyncio.sleep(1) #delay between request to be addded as a parameter in constructor
async def _recv(self):
while True:
res = await self.sreader.read(40) # the device does not send ed of line, so I read some data. TODO: testing -1
self.delay.trigger(self.timeout) # Got something, retrigger timer
async def send_command(self, command):
self.response = "" # Discard any pending messages
if command is None:
print('Timeout test.')
else:
await self.swriter.awrite("{}\r\n".format(command))
print('Command sent:', command)
self.delay.trigger(self.timeout) # Re-initialise timer
while self.delay.running():
await asyncio.sleep(1) # Wait for 4s after last msg received
return self.response
def crc16(data):
crc = PRESET
for c in data:
crc = crc ^ c
for j in range(8):
if crc & 0x01:
crc = (crc >> 1) ^ POLYNOMIAL
else:
crc = crc >> 1
return struct.pack('<H',crc)
async def main():
uart=UART(1)
pzem = PZEM(uart,address=0x01)
if __name__=="__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Interrupted')
finally:
asyncio.new_event_loop()
print('as_demos.auart_hd.test() to run again.')