Skip to content

Commit

Permalink
Removed processing for sendDataOnlyOnChange as deprecated module, use…
Browse files Browse the repository at this point in the history
… reportStrategy ON_CHANGE instead
  • Loading branch information
imbeacon committed Jan 20, 2025
1 parent ee62df8 commit 2192c49
Showing 1 changed file with 7 additions and 27 deletions.
34 changes: 7 additions & 27 deletions thingsboard_gateway/connectors/mqtt/mqtt_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,9 @@ def __init__(self, gateway, config, connector_type):

# Extract main sections from configuration ---------------------------------------------------------------------
self.__broker = config.get('broker')
self.__send_data_only_on_change = self.__broker.get(SEND_ON_CHANGE_PARAMETER,
self.config.get(SEND_ON_CHANGE_PARAMETER,
DEFAULT_SEND_ON_CHANGE_VALUE))
self.__send_data_only_on_change_ttl = self.__broker.get(SEND_ON_CHANGE_TTL_PARAMETER,
self.config.get(SEND_ON_CHANGE_TTL_PARAMETER,
DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE)) # noqa

# for sendDataOnlyOnChange param
self.__topic_content = {}
if not self.__broker:
self.__log.error('Broker configuration is missing!')
return

self.__mapping = []
self.__server_side_rpc = []
Expand Down Expand Up @@ -259,27 +253,21 @@ def __init__(self, gateway, config, connector_type):
self.__stop_event = Event()
self.daemon = True

self.__msg_queue = Queue(config['broker'].get('maxMessageQueue', 1000000000))
self.__msg_queue = Queue(self.__broker.get('maxMessageQueue', 1000000000))
self.__workers_thread_pool = []
self.__max_msg_number_for_worker = config['broker'].get('maxMessageNumberPerWorker', 10)
self.__max_number_of_workers = config['broker'].get('maxNumberOfWorkers', 100)
self.__max_msg_number_for_worker = self.__broker.get('maxMessageNumberPerWorker', 10)
self.__max_number_of_workers = self.__broker.get('maxNumberOfWorkers', 100)

self._on_message_queue = Queue(config['broker'].get('maxProcessingMessageQueue', 1000000000))
self._on_message_queue = Queue(self.__broker.get('maxProcessingMessageQueue', 1000000000))
self._on_message_thread = Thread(name='On Message', target=self._process_on_message, daemon=True)
self._on_message_thread.start()

def is_filtering_enable(self, device_name):
return self.__send_data_only_on_change

def get_config(self):
return self.config

def get_type(self):
return self._connector_type

def get_ttl_for_duplicates(self, device_name):
return self.__send_data_only_on_change_ttl

@staticmethod
def __add_ts_to_test_message(msg, ts_name):
msg = simplejson.loads(msg.decode('utf-8').replace("'", '"'))
Expand Down Expand Up @@ -638,14 +626,6 @@ def _process_on_message(self):
available_converters = self.__mapping_sub_topics[topic]
for converter in available_converters:
try:
# check if data is equal
if converter.config.get('sendDataOnlyOnChange', False) and self.__topic_content.get(
message.topic) == content:
request_handled = True
continue

self.__topic_content[message.topic] = content

request_handled = self.put_data_to_convert(converter, message, content)
except Exception as e:
self.__log.exception(e)
Expand Down

0 comments on commit 2192c49

Please sign in to comment.