-
Notifications
You must be signed in to change notification settings - Fork 77
/
Copy pathreadall.py
executable file
·90 lines (78 loc) · 2.84 KB
/
readall.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python3
# -*- coding: iso-8859-15 -*-
# Read info with ModbusClient
import time
# import the server implementation
from pymodbus.client import ModbusSerialClient as ModbusClient
#from test.testdata import ModbusMockClient as ModbusClient
from pymodbus.mei_message import *
from pyepsolartracer.registers import registers,coils
import serial.rs485
# configure the client logging
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)
# choose the serial client
client = ModbusClient(method='rtu', port='/dev/ttyUSB0', baudrate=115200, stopbits = 1, bytesize = 8, timeout=1)
client.connect()
try:
client.socket.rs485_mode = serial.rs485.RS485Settings(rts_level_for_rx=True,rts_level_for_tx=False)
except:
pass
request = ReadDeviceInformationRequest(unit=1)
response = client.execute(request)
if hasattr(response, 'information'):
print ('ReadDeviceInformationRequest', repr(response.information))
else:
print ('ReadDeviceInformationRequest', repr(response))
unit = 1
# Read multiple registers at once
chunks = []
last = 0x3000 - 1
chunk = []
for reg in registers:
if reg.address != last + 1:
chunks.append(chunk)
chunk = []
chunk.append(reg)
last = reg.address
for chunk in chunks:
reg = chunk[0]
count = len(chunk) # FIXME multiword registers
if reg.is_input_register():
rr = client.read_input_registers(address=reg.address, count=count, slave = unit)
else:
rr = client.read_holding_registers(address=reg.address, count=count, slave = unit)
print("read_input_registers:", ("0x%04x" % reg.address), count, end = ' : ')
for i in range(count):
print (rr.getRegister(i), end = ', ')
print ('')
for reg in registers:
print()
print(reg, reg.is_input_register(), reg.is_holding_register())
if reg.is_input_register():
rr = client.read_input_registers(address=reg.address, count=reg.size, slave = unit)
print("read_input_registers:", rr.getRegister(0))
else:
rr = client.read_holding_registers(address=reg.address, count=reg.size, slave = unit)
print("read_holding_registers:", rr.getRegister(0))
value = reg.decode(rr)
print (rr, value)
for reg in coils:
print()
print(reg, reg.is_coil(), reg.is_discrete_input())
if reg.is_coil():
rr = client.read_coils(address = reg.address, count = reg.size, slave = unit)
if hasattr(rr, "bits"):
print("read_coils:", str(rr.bits))
else:
print("read_coils:", str(rr))
elif reg.is_discrete_input():
rr = client.read_discrete_inputs(address = reg.address, count = reg.size, slave = unit)
if hasattr(rr, "bits"):
print("read_discrete_inputs:", str(rr.bits))
else:
print("read_discrete_inputs:", str(rr))
value = reg.decode(rr)
print (rr, value)