From 9c86b51db21f53567f303bddecb0b34bd53880a9 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 3 Feb 2025 08:38:17 +0200 Subject: [PATCH] Added delayBetweenRequestsMs parameter to device configuration, die to #1664 --- .../connectors/modbus/constants.py | 2 + .../connectors/modbus/entities/master.py | 43 +++++++++++++++---- .../connectors/modbus/modbus_connector.py | 13 +++--- .../connectors/modbus/slave.py | 21 +++++++-- 4 files changed, 61 insertions(+), 18 deletions(-) diff --git a/thingsboard_gateway/connectors/modbus/constants.py b/thingsboard_gateway/connectors/modbus/constants.py index a6f6f710..00cfc171 100644 --- a/thingsboard_gateway/connectors/modbus/constants.py +++ b/thingsboard_gateway/connectors/modbus/constants.py @@ -42,6 +42,8 @@ CONNECT_ATTEMPT_TIME_MS_PARAMETER = "connectAttemptTimeMs" WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER = "waitAfterFailedAttemptsMs" +DELAY_BETWEEN_REQUESTS_MS_PARAMETER = "delayBetweenRequestsMs" + FUNCTION_CODE_PARAMETER = "functionCode" ADDRESS_PARAMETER = "address" diff --git a/thingsboard_gateway/connectors/modbus/entities/master.py b/thingsboard_gateway/connectors/modbus/entities/master.py index 21bacbbe..9259de96 100644 --- a/thingsboard_gateway/connectors/modbus/entities/master.py +++ b/thingsboard_gateway/connectors/modbus/entities/master.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from time import monotonic from asyncio import Lock from pymodbus.client import AsyncModbusTlsClient, AsyncModbusTcpClient, AsyncModbusUdpClient, AsyncModbusSerialClient @@ -48,6 +49,16 @@ def __init__(self, client_type, client): self.lock = Lock() self.client_type = client_type.lower() self.__client = client + self.__previous_request_time = 0 + + def get_time_to_pass_delay_between_requests(self, delay_ms) -> int: + if delay_ms == 0: + return 0 + next_possible_request_time = self.__previous_request_time + delay_ms + current_time = int(monotonic() * 1000) + if current_time >= next_possible_request_time: + return 0 + return next_possible_request_time - current_time def connected(self): return self.__client.connected @@ -62,35 +73,51 @@ async def close(self): @with_lock_for_serial async def read_coils(self, address, count, unit_id): - return await self.__client.read_coils(address=address, count=count, slave=unit_id) # noqa + result = await self.__client.read_coils(address=address, count=count, slave=unit_id) # noqa + self.__previous_request_time = int(monotonic() * 1000) + return result @with_lock_for_serial async def read_discrete_inputs(self, address, count, unit_id): - return await self.__client.read_discrete_inputs(address=address, count=count, slave=unit_id) # noqa + result = await self.__client.read_discrete_inputs(address=address, count=count, slave=unit_id) # noqa + self.__previous_request_time = int(monotonic() * 1000) + return result @with_lock_for_serial async def read_holding_registers(self, address, count, unit_id): - return await self.__client.read_holding_registers(address=address, count=count, slave=unit_id) # noqa + result = await self.__client.read_holding_registers(address=address, count=count, slave=unit_id) # noqa + self.__previous_request_time = int(monotonic() * 1000) + return result @with_lock_for_serial async def read_input_registers(self, address, count, unit_id): - return await self.__client.read_input_registers(address=address, count=count, slave=unit_id) # noqa + result = await self.__client.read_input_registers(address=address, count=count, slave=unit_id) # noqa + self.__previous_request_time = int(monotonic() * 1000) + return result @with_lock_for_serial async def write_coil(self, address, value, unit_id): - return await self.__client.write_coil(address=address, value=value, slave=unit_id) # noqa + result = await self.__client.write_coil(address=address, value=value, slave=unit_id) # noqa + self.__previous_request_time = int(monotonic() * 1000) + return result @with_lock_for_serial async def write_register(self, address, value, unit_id): - return await self.__client.write_register(address=address, value=value, slave=unit_id) # noqa + result = await self.__client.write_register(address=address, value=value, slave=unit_id) # noqa + self.__previous_request_time = int(monotonic() * 1000) + return result @with_lock_for_serial async def write_coils(self, address, values, unit_id): - return await self.__client.write_coils(address=address, values=values, slave=unit_id) # noqa + result = await self.__client.write_coils(address=address, values=values, slave=unit_id) # noqa + self.__previous_request_time = int(monotonic() * 1000) + return result @with_lock_for_serial async def write_registers(self, address, values, unit_id): - return await self.__client.write_registers(address=address, values=values, slave=unit_id) # noqa + result = await self.__client.write_registers(address=address, values=values, slave=unit_id) # noqa + self.__previous_request_time = int(monotonic() * 1000) + return result def get_available_functions(self): return { diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index adbbdaef..42296242 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -187,19 +187,20 @@ def __get_master(self, slave: Slave): Newly created master connection structure will look like: self._master_connections = { - '127.0.0.1:5021': AsyncClient object + '127.0.0.1:5021': AsyncClient object (TCP/UDP), + '/dev/ttyUSB0': AsyncClient object (Serial) } """ if not slave.host: - socket_str = slave.port + master_connection_name = slave.port else: - socket_str = slave.host + ':' + str(slave.port) - if socket_str not in self._master_connections: + master_connection_name = slave.host + ':' + str(slave.port) + if master_connection_name not in self._master_connections: master_connection = Master.configure_master(slave) master = Master(slave.type, master_connection) - self._master_connections[socket_str] = master + self._master_connections[master_connection_name] = master - return self._master_connections[socket_str] + return self._master_connections[master_connection_name] def __add_slave(self, slave_config): slave = Slave(self, self.__log, slave_config) diff --git a/thingsboard_gateway/connectors/modbus/slave.py b/thingsboard_gateway/connectors/modbus/slave.py index c87e7de0..2a3201e3 100644 --- a/thingsboard_gateway/connectors/modbus/slave.py +++ b/thingsboard_gateway/connectors/modbus/slave.py @@ -14,7 +14,7 @@ import asyncio from threading import Thread from time import sleep, monotonic -from typing import Tuple, Dict, Union +from typing import TYPE_CHECKING, Tuple, Dict, Union from pymodbus.constants import Defaults @@ -24,7 +24,8 @@ BYTESIZE_PARAMETER, CONNECT_ATTEMPT_COUNT_PARAMETER, CONNECT_ATTEMPT_TIME_MS_PARAMETER, HOST_PARAMETER, \ METHOD_PARAMETER, PARITY_PARAMETER, PORT_PARAMETER, REPACK_PARAMETER, RETRIES_PARAMETER, RETRY_ON_EMPTY_PARAMETER, \ RETRY_ON_INVALID_PARAMETER, RPC_SECTION, SERIAL_CONNECTION_TYPE_PARAMETER, STOPBITS_PARAMETER, STRICT_PARAMETER, \ - TIMEOUT_PARAMETER, UNIT_ID_PARAMETER, WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER, WORD_ORDER_PARAMETER + TIMEOUT_PARAMETER, UNIT_ID_PARAMETER, WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER, WORD_ORDER_PARAMETER, \ + DELAY_BETWEEN_REQUESTS_MS_PARAMETER from thingsboard_gateway.connectors.modbus.entities.bytes_uplink_converter_config import BytesUplinkConverterConfig from thingsboard_gateway.connectors.modbus.modbus_converter import ModbusConverter from thingsboard_gateway.gateway.constants import DEVICE_NAME_PARAMETER, DEVICE_TYPE_PARAMETER, TYPE_PARAMETER, \ @@ -32,9 +33,13 @@ from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader +if TYPE_CHECKING: + from thingsboard_gateway.connectors.modbus.modbus_connector import ModbusConnector + from thingsboard_gateway.connectors.modbus.entities.master import Master + class Slave(Thread): - def __init__(self, connector, logger, config): + def __init__(self, connector: 'ModbusConnector', logger, config): super().__init__() self.daemon = True self.stopped = False @@ -81,6 +86,8 @@ def __init__(self, connector, logger, config): self.connection_attempt = config.get(CONNECT_ATTEMPT_COUNT_PARAMETER, 2) \ if config.get(CONNECT_ATTEMPT_COUNT_PARAMETER, 2) >= 2 else 2 + self.__delay_between_requests_ms = config.get(DELAY_BETWEEN_REQUESTS_MS_PARAMETER, 0) + self.device_name = config[DEVICE_NAME_PARAMETER] self.device_type = config.get(DEVICE_TYPE_PARAMETER, 'default') @@ -96,7 +103,7 @@ def __init__(self, connector, logger, config): self.uplink_converter_config = BytesUplinkConverterConfig(**config) self.uplink_converter = self.__load_uplink_converter(config) - self.__master = None + self.__master: 'Master' = None self.available_functions = None self.start() @@ -212,6 +219,9 @@ async def __read(self, function_code, address, objects_count): result = None try: + time_to_sleep = self.__master.get_time_to_pass_delay_between_requests(self.__delay_between_requests_ms) + if time_to_sleep > 0: + await asyncio.sleep(time_to_sleep / 1000) result = await self.available_functions[function_code](address=address, count=objects_count, unit_id=self.unit_id) @@ -236,6 +246,9 @@ async def __write(self, function_code, address, value): result = None try: + time_to_sleep = self.__master.get_time_to_pass_delay_between_requests(self.__delay_between_requests_ms) + if time_to_sleep > 0: + await asyncio.sleep(time_to_sleep / 1000) if function_code in (5, 6): result = await self.available_functions[function_code](address=address, value=value, unit_id=self.unit_id)