Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

About the performance of the RetainedMessages #2091

Open
zhaowgit opened this issue Oct 21, 2024 · 0 comments
Open

About the performance of the RetainedMessages #2091

zhaowgit opened this issue Oct 21, 2024 · 0 comments
Labels
feature-request New feature or request

Comments

@zhaowgit
Copy link

zhaowgit commented Oct 21, 2024

Describe the feature request

I am currently using MqttServer.InjectApplicationMessage to push RetainedMessages, but when the number of Topics reaches tens of thousands, the performance of Retained will drop significantly, with tens of thousands push taking several minutes. The more data there is, the lower the performance.

Which project is your feature request related to?

  • Server

Describe the solution you'd like

I found the reason for the performance degradation in the source code of MqttRetainedMessagesManager. For each push of RetainedMessages, all RetainedMessages are converted from Dictionary<string, MqttApplicationMessage> to List, which is used as a parameter for RetainedMessageChangedEvent. This consumes a lot of performance. Even if I don't subscribe to RetainedMessageChangedEvent, the conversion process will still run. I tried modifying the code in this section and disabled the conversion. After doing so, the time it took to push tens of thousands of messages reduced from several minutes to about 100ms. I suggest disabling this conversion when not subscribing to RetainedMessageChangedEvent to improve the performance of RetainedMessages.

Describe alternatives you've considered

Additional context

Example

        public async Task UpdateMessage(string clientId, MqttApplicationMessage applicationMessage)
        {
            if (applicationMessage == null)
            {
                throw new ArgumentNullException(nameof(applicationMessage));
            }
            try
            {
                List<MqttApplicationMessage> messagesForSave = null;
                var saveIsRequired = false;
                //add hasHandlers;
                var hasHandlers = eventContainer.RetainedMessageChangedEvent.HasHandlers;
                
                lock (_messages)
                {
                    var payload = applicationMessage.Payload;
                    var hasPayload = payload.Length > 0;

                    if (!hasPayload)
                    {
                        saveIsRequired = _messages.Remove(applicationMessage.Topic);
                        _logger.Verbose("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic);
                    }
                    else
                    {
                        if (!_messages.TryGetValue(applicationMessage.Topic, out var existingMessage))
                        {
                            _messages[applicationMessage.Topic] = applicationMessage;
                            saveIsRequired = true;
                        }
                        else
                        {
                            if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel ||
                                !MqttMemoryHelper.SequenceEqual(existingMessage.Payload, payload))
                            {
                                _messages[applicationMessage.Topic] = applicationMessage;
                                saveIsRequired = true;
                            }
                        }

                        _logger.Verbose("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic);
                    }
                    
                    //add " && hasHandlers"
                    if (saveIsRequired && hasHandlers)
                    {
                        messagesForSave = new List<MqttApplicationMessage>(_messages.Values);
                    }
                }

                //add " && hasHandlers"
                if (saveIsRequired && hasHandlers)
                {
                    using (await _storageAccessLock.EnterAsync().ConfigureAwait(false))
                    {
                        var eventArgs = new RetainedMessageChangedEventArgs(clientId, applicationMessage, messagesForSave);
                        await _eventContainer.RetainedMessageChangedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
                    }
                }
            }
            catch (Exception exception)
            {
                _logger.Error(exception, "Unhandled exception while handling retained messages.");
            }
        }
@zhaowgit zhaowgit added the feature-request New feature or request label Oct 21, 2024
@zhaowgit zhaowgit changed the title 关于驻留信息的性能 关于驻留信息的性能 About the performance of the RetainedMessages Oct 21, 2024
@zhaowgit zhaowgit changed the title 关于驻留信息的性能 About the performance of the RetainedMessages About the performance of the RetainedMessages Oct 22, 2024
zhaowgit added a commit to zhaowgit/MQTTnet that referenced this issue Oct 23, 2024
…t subscribing to RetainedMessageChangedEvent.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant