Skip to content

Commit

Permalink
Added delayBetweenRequestsMs parameter to device configuration, die to
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Feb 3, 2025
1 parent 8489c54 commit 9c86b51
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 18 deletions.
2 changes: 2 additions & 0 deletions thingsboard_gateway/connectors/modbus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
CONNECT_ATTEMPT_TIME_MS_PARAMETER = "connectAttemptTimeMs"
WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER = "waitAfterFailedAttemptsMs"

DELAY_BETWEEN_REQUESTS_MS_PARAMETER = "delayBetweenRequestsMs"

FUNCTION_CODE_PARAMETER = "functionCode"

ADDRESS_PARAMETER = "address"
Expand Down
43 changes: 35 additions & 8 deletions thingsboard_gateway/connectors/modbus/entities/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from time import monotonic
from asyncio import Lock

from pymodbus.client import AsyncModbusTlsClient, AsyncModbusTcpClient, AsyncModbusUdpClient, AsyncModbusSerialClient
Expand Down Expand Up @@ -48,6 +49,16 @@ def __init__(self, client_type, client):
self.lock = Lock()
self.client_type = client_type.lower()
self.__client = client
self.__previous_request_time = 0

def get_time_to_pass_delay_between_requests(self, delay_ms) -> int:
if delay_ms == 0:
return 0
next_possible_request_time = self.__previous_request_time + delay_ms
current_time = int(monotonic() * 1000)
if current_time >= next_possible_request_time:
return 0
return next_possible_request_time - current_time

def connected(self):
return self.__client.connected
Expand All @@ -62,35 +73,51 @@ async def close(self):

@with_lock_for_serial
async def read_coils(self, address, count, unit_id):
return await self.__client.read_coils(address=address, count=count, slave=unit_id) # noqa
result = await self.__client.read_coils(address=address, count=count, slave=unit_id) # noqa
self.__previous_request_time = int(monotonic() * 1000)
return result

@with_lock_for_serial
async def read_discrete_inputs(self, address, count, unit_id):
return await self.__client.read_discrete_inputs(address=address, count=count, slave=unit_id) # noqa
result = await self.__client.read_discrete_inputs(address=address, count=count, slave=unit_id) # noqa
self.__previous_request_time = int(monotonic() * 1000)
return result

@with_lock_for_serial
async def read_holding_registers(self, address, count, unit_id):
return await self.__client.read_holding_registers(address=address, count=count, slave=unit_id) # noqa
result = await self.__client.read_holding_registers(address=address, count=count, slave=unit_id) # noqa
self.__previous_request_time = int(monotonic() * 1000)
return result

@with_lock_for_serial
async def read_input_registers(self, address, count, unit_id):
return await self.__client.read_input_registers(address=address, count=count, slave=unit_id) # noqa
result = await self.__client.read_input_registers(address=address, count=count, slave=unit_id) # noqa
self.__previous_request_time = int(monotonic() * 1000)
return result

@with_lock_for_serial
async def write_coil(self, address, value, unit_id):
return await self.__client.write_coil(address=address, value=value, slave=unit_id) # noqa
result = await self.__client.write_coil(address=address, value=value, slave=unit_id) # noqa
self.__previous_request_time = int(monotonic() * 1000)
return result

@with_lock_for_serial
async def write_register(self, address, value, unit_id):
return await self.__client.write_register(address=address, value=value, slave=unit_id) # noqa
result = await self.__client.write_register(address=address, value=value, slave=unit_id) # noqa
self.__previous_request_time = int(monotonic() * 1000)
return result

@with_lock_for_serial
async def write_coils(self, address, values, unit_id):
return await self.__client.write_coils(address=address, values=values, slave=unit_id) # noqa
result = await self.__client.write_coils(address=address, values=values, slave=unit_id) # noqa
self.__previous_request_time = int(monotonic() * 1000)
return result

@with_lock_for_serial
async def write_registers(self, address, values, unit_id):
return await self.__client.write_registers(address=address, values=values, slave=unit_id) # noqa
result = await self.__client.write_registers(address=address, values=values, slave=unit_id) # noqa
self.__previous_request_time = int(monotonic() * 1000)
return result

def get_available_functions(self):
return {
Expand Down
13 changes: 7 additions & 6 deletions thingsboard_gateway/connectors/modbus/modbus_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,20 @@ def __get_master(self, slave: Slave):
Newly created master connection structure will look like:
self._master_connections = {
'127.0.0.1:5021': AsyncClient object
'127.0.0.1:5021': AsyncClient object (TCP/UDP),
'/dev/ttyUSB0': AsyncClient object (Serial)
}
"""
if not slave.host:
socket_str = slave.port
master_connection_name = slave.port
else:
socket_str = slave.host + ':' + str(slave.port)
if socket_str not in self._master_connections:
master_connection_name = slave.host + ':' + str(slave.port)
if master_connection_name not in self._master_connections:
master_connection = Master.configure_master(slave)
master = Master(slave.type, master_connection)
self._master_connections[socket_str] = master
self._master_connections[master_connection_name] = master

return self._master_connections[socket_str]
return self._master_connections[master_connection_name]

def __add_slave(self, slave_config):
slave = Slave(self, self.__log, slave_config)
Expand Down
21 changes: 17 additions & 4 deletions thingsboard_gateway/connectors/modbus/slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import asyncio
from threading import Thread
from time import sleep, monotonic
from typing import Tuple, Dict, Union
from typing import TYPE_CHECKING, Tuple, Dict, Union

from pymodbus.constants import Defaults

Expand All @@ -24,17 +24,22 @@
BYTESIZE_PARAMETER, CONNECT_ATTEMPT_COUNT_PARAMETER, CONNECT_ATTEMPT_TIME_MS_PARAMETER, HOST_PARAMETER, \
METHOD_PARAMETER, PARITY_PARAMETER, PORT_PARAMETER, REPACK_PARAMETER, RETRIES_PARAMETER, RETRY_ON_EMPTY_PARAMETER, \
RETRY_ON_INVALID_PARAMETER, RPC_SECTION, SERIAL_CONNECTION_TYPE_PARAMETER, STOPBITS_PARAMETER, STRICT_PARAMETER, \
TIMEOUT_PARAMETER, UNIT_ID_PARAMETER, WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER, WORD_ORDER_PARAMETER
TIMEOUT_PARAMETER, UNIT_ID_PARAMETER, WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER, WORD_ORDER_PARAMETER, \
DELAY_BETWEEN_REQUESTS_MS_PARAMETER
from thingsboard_gateway.connectors.modbus.entities.bytes_uplink_converter_config import BytesUplinkConverterConfig
from thingsboard_gateway.connectors.modbus.modbus_converter import ModbusConverter
from thingsboard_gateway.gateway.constants import DEVICE_NAME_PARAMETER, DEVICE_TYPE_PARAMETER, TYPE_PARAMETER, \
UPLINK_PREFIX, CONVERTER_PARAMETER, DOWNLINK_PREFIX
from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader

if TYPE_CHECKING:
from thingsboard_gateway.connectors.modbus.modbus_connector import ModbusConnector
from thingsboard_gateway.connectors.modbus.entities.master import Master


class Slave(Thread):
def __init__(self, connector, logger, config):
def __init__(self, connector: 'ModbusConnector', logger, config):
super().__init__()
self.daemon = True
self.stopped = False
Expand Down Expand Up @@ -81,6 +86,8 @@ def __init__(self, connector, logger, config):
self.connection_attempt = config.get(CONNECT_ATTEMPT_COUNT_PARAMETER, 2) \
if config.get(CONNECT_ATTEMPT_COUNT_PARAMETER, 2) >= 2 else 2

self.__delay_between_requests_ms = config.get(DELAY_BETWEEN_REQUESTS_MS_PARAMETER, 0)

self.device_name = config[DEVICE_NAME_PARAMETER]
self.device_type = config.get(DEVICE_TYPE_PARAMETER, 'default')

Expand All @@ -96,7 +103,7 @@ def __init__(self, connector, logger, config):
self.uplink_converter_config = BytesUplinkConverterConfig(**config)
self.uplink_converter = self.__load_uplink_converter(config)

self.__master = None
self.__master: 'Master' = None
self.available_functions = None

self.start()
Expand Down Expand Up @@ -212,6 +219,9 @@ async def __read(self, function_code, address, objects_count):
result = None

try:
time_to_sleep = self.__master.get_time_to_pass_delay_between_requests(self.__delay_between_requests_ms)
if time_to_sleep > 0:
await asyncio.sleep(time_to_sleep / 1000)
result = await self.available_functions[function_code](address=address,
count=objects_count,
unit_id=self.unit_id)
Expand All @@ -236,6 +246,9 @@ async def __write(self, function_code, address, value):
result = None

try:
time_to_sleep = self.__master.get_time_to_pass_delay_between_requests(self.__delay_between_requests_ms)
if time_to_sleep > 0:
await asyncio.sleep(time_to_sleep / 1000)
if function_code in (5, 6):
result = await self.available_functions[function_code](address=address, value=value,
unit_id=self.unit_id)
Expand Down

0 comments on commit 9c86b51

Please sign in to comment.