forked from sfera-labs/exo-sense-py-modbus
-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathtcp_client_example.py
145 lines (122 loc) · 4.07 KB
/
tcp_client_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Main script
Do your stuff here, this file is similar to the loop() function on Arduino
Create a Modbus TCP client (slave) which can be requested for data or set with
specific values by a host device.
The TCP port and IP address can be choosen freely. The register definitions of
the client can be defined by the user.
"""
# system packages
import time
# import modbus client classes
from umodbus.tcp import ModbusTCP
IS_DOCKER_MICROPYTHON = False
try:
import network
except ImportError:
IS_DOCKER_MICROPYTHON = True
import json
# ===============================================
if IS_DOCKER_MICROPYTHON is False:
# connect to a network
station = network.WLAN(network.STA_IF)
if station.active() and station.isconnected():
station.disconnect()
time.sleep(1)
station.active(False)
time.sleep(1)
station.active(True)
# station.connect('SSID', 'PASSWORD')
station.connect('TP-LINK_FBFC3C', 'C1FBFC3C')
time.sleep(1)
while True:
print('Waiting for WiFi connection...')
if station.isconnected():
print('Connected to WiFi.')
print(station.ifconfig())
break
time.sleep(2)
# ===============================================
# TCP Slave setup
tcp_port = 502 # port to listen to
if IS_DOCKER_MICROPYTHON:
local_ip = '172.24.0.2' # static Docker IP address
else:
# set IP address of the MicroPython device explicitly
# local_ip = '192.168.4.1' # IP address
# or get it from the system after a connection to the network has been made
local_ip = station.ifconfig()[0]
# ModbusTCP can get TCP requests from a host device to provide/set data
client = ModbusTCP()
is_bound = False
# check whether client has been bound to an IP and port
is_bound = client.get_bound_status()
if not is_bound:
client.bind(local_ip=local_ip, local_port=tcp_port)
# commond slave register setup, to be used with the Master example above
register_definitions = {
"COILS": {
"RESET_REGISTER_DATA_COIL": {
"register": 42,
"len": 1,
"val": 0
},
"EXAMPLE_COIL": {
"register": 123,
"len": 1,
"val": 1
}
},
"HREGS": {
"EXAMPLE_HREG": {
"register": 93,
"len": 1,
"val": 19
}
},
"ISTS": {
"EXAMPLE_ISTS": {
"register": 67,
"len": 1,
"val": 0
}
},
"IREGS": {
"EXAMPLE_IREG": {
"register": 10,
"len": 2,
"val": 60001
}
}
}
# alternatively the register definitions can also be loaded from a JSON file
# this is always done if Docker is used for testing purpose in order to keep
# the client registers in sync with the test registers
if IS_DOCKER_MICROPYTHON:
with open('registers/example.json', 'r') as file:
register_definitions = json.load(file)
print('Setting up registers ...')
# use the defined values of each register type provided by register_definitions
client.setup_registers(registers=register_definitions)
# alternatively use dummy default values (True for bool regs, 999 otherwise)
# client.setup_registers(registers=register_definitions, use_default_vals=True)
print('Register setup done')
print('Serving as TCP client on {}:{}'.format(local_ip, tcp_port))
reset_data_register = \
register_definitions['COILS']['RESET_REGISTER_DATA_COIL']['register']
while True:
try:
result = client.process()
if reset_data_register in client.coils:
if client.get_coil(address=reset_data_register):
print('Resetting register data to default values ...')
client.setup_registers(registers=register_definitions)
print('Default values restored')
except KeyboardInterrupt:
print('KeyboardInterrupt, stopping TCP client...')
break
except Exception as e:
print('Exception during execution: {}'.format(e))
print("Finished providing/accepting data as client")