Skip to content

Commit

Permalink
Merge branch 'master' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
basimons authored Nov 6, 2023
2 parents 5fa4e7f + 818415e commit 1277578
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.hivemq.mqtt.handler.InterceptorHandler;
import com.hivemq.mqtt.handler.auth.AuthHandler;
import com.hivemq.mqtt.handler.auth.AuthInProgressMessageHandler;
import com.hivemq.mqtt.handler.connack.MqttConnacker;
import com.hivemq.mqtt.handler.connect.ConnectHandler;
import com.hivemq.mqtt.handler.connect.ConnectionLimiterHandler;
import com.hivemq.mqtt.handler.connect.NoConnectIdleHandler;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class ChannelDependencies {
private final @NotNull PingRequestHandler pingRequestHandler;
private final @NotNull RestrictionsConfigurationService restrictionsConfigurationService;
private final @NotNull MqttConnectDecoder mqttConnectDecoder;
private final @NotNull MqttConnacker mqttConnacker;
private final @NotNull MQTTMessageEncoder mqttMessageEncoder;
private final @NotNull EventLog eventLog;
private final @NotNull SslParameterHandler sslParameterHandler;
Expand Down Expand Up @@ -99,6 +101,7 @@ public ChannelDependencies(
final @NotNull PingRequestHandler pingRequestHandler,
final @NotNull RestrictionsConfigurationService restrictionsConfigurationService,
final @NotNull MqttConnectDecoder mqttConnectDecoder,
final @NotNull MqttConnacker mqttConnacker,
final @NotNull EventLog eventLog,
final @NotNull SslParameterHandler sslParameterHandler,
final @NotNull MqttDecoders mqttDecoders,
Expand Down Expand Up @@ -129,6 +132,7 @@ public ChannelDependencies(
this.pingRequestHandler = pingRequestHandler;
this.restrictionsConfigurationService = restrictionsConfigurationService;
this.mqttConnectDecoder = mqttConnectDecoder;
this.mqttConnacker = mqttConnacker;
this.shutdownHooks = shutdownHooks;
this.mqttMessageEncoder = new MQTTMessageEncoder(encoderFactory, globalMQTTMessageCounter);
this.eventLog = eventLog;
Expand Down Expand Up @@ -216,6 +220,10 @@ public MqttConnectDecoder getMqttConnectDecoder() {
return mqttConnectDecoder;
}

public @NotNull MqttConnacker getMqttConnacker() {
return mqttConnacker;
}

@NotNull
public MQTTMessageEncoder getMqttMessageEncoder() {
return mqttMessageEncoder;
Expand Down
238 changes: 143 additions & 95 deletions src/main/java/com/hivemq/codec/decoder/MQTTMessageDecoder.java

Large diffs are not rendered by default.

19 changes: 14 additions & 5 deletions src/main/java/com/hivemq/codec/decoder/MqttConnectDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ public MqttConnectDecoder(
mqtt31ConnectDecoder = new Mqtt31ConnectDecoder(mqttConnacker, clientIds, fullConfigurationService, hiveMQId);
}

public @Nullable CONNECT decode(
final @NotNull ClientConnectionContext clientConnectionContext,
final @NotNull ByteBuf buf,
final byte fixedHeader) {

public @Nullable ProtocolVersion decodeProtocolVersion(
final @NotNull ClientConnectionContext clientConnectionContext, final @NotNull ByteBuf buf) {
/*
* It is sufficient to look at the second byte of the variable header (Length LSB) This byte
* indicates how long the following protocol name is going to be. In case of the
Expand All @@ -87,7 +85,6 @@ public MqttConnectDecoder(

final ByteBuf lengthLSBBuf = buf.slice(buf.readerIndex() + 1, 1);
final int lengthLSB = lengthLSBBuf.readByte();

final ProtocolVersion protocolVersion;
switch (lengthLSB) {
case 4:
Expand Down Expand Up @@ -117,6 +114,18 @@ public MqttConnectDecoder(
clientConnectionContext.setProtocolVersion(protocolVersion);
clientConnectionContext.setConnectReceivedTimestamp(System.currentTimeMillis());

return protocolVersion;
}

public @Nullable CONNECT decode(
final @NotNull ClientConnectionContext clientConnectionContext,
final @NotNull ByteBuf buf,
final byte fixedHeader) {

final ProtocolVersion protocolVersion = decodeProtocolVersion(clientConnectionContext, buf);
if (protocolVersion == null) {
return null;
}
if (protocolVersion == ProtocolVersion.MQTTv5) {
return mqtt5ConnectDecoder.decode(clientConnectionContext, buf, fixedHeader);
} else if (protocolVersion == ProtocolVersion.MQTTv3_1_1) {
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/com/hivemq/mqtt/message/pool/FreePacketIdRanges.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.base.Preconditions;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extension.sdk.api.annotations.ThreadSafe;
import com.hivemq.mqtt.message.pool.exception.MessageIdUnavailableException;
import com.hivemq.mqtt.message.pool.exception.NoMessageIdAvailableException;

/**
Expand All @@ -37,12 +39,11 @@
* Upon assignment, the interval's lower end is incremented (interval size reduces from below).
* When the ID is returned, it either joins one of the existing {@link Range} intervals in the list (if it is adjacent)
* or it forms a new {@link Range} that is added to the list.
* <p>
* This class is NOT thread-safe.
* <a
* href="https://github.com/hivemq/hivemq-mqtt-client/blob/master/src/main/java/com/hivemq/client/internal/util/Ranges.java">The
* original implementation in the HiveMQ Java Client.</a>
*/
@ThreadSafe
public class FreePacketIdRanges {

private static final int MIN_ALLOWED_MQTT_PACKET_ID = 1;
Expand All @@ -61,7 +62,7 @@ public FreePacketIdRanges() {
*
* @return a new ID if available in any of the ranges or {@link NoMessageIdAvailableException} if ran out of IDs.
*/
public int takeNextId() throws NoMessageIdAvailableException {
public synchronized int takeNextId() throws NoMessageIdAvailableException {
if (rootRange.start == rootRange.end) {
throw new NoMessageIdAvailableException();
}
Expand All @@ -79,7 +80,7 @@ public int takeNextId() throws NoMessageIdAvailableException {
*
* @param id an ID that the caller attempts to take.
*/
public void takeIfAvailable(final int id) {
public synchronized void takeSpecificId(final int id) throws MessageIdUnavailableException {
Preconditions.checkArgument(id >= MIN_ALLOWED_MQTT_PACKET_ID && id <= MAX_ALLOWED_MQTT_PACKET_ID,
"Attempting to take an ID %s that is outside the valid packet IDs range.",
id);
Expand All @@ -89,7 +90,7 @@ public void takeIfAvailable(final int id) {

while (current != null) {
if (id < current.start) {
return; // since the ranges are traversed in increasing order of IDs, the given id will not be found
throw new MessageIdUnavailableException(id); // since the ranges are traversed in increasing order of IDs, the given id will not be found
}

if (id < current.end) { // the id is int the current range of free ids
Expand All @@ -106,6 +107,12 @@ public void takeIfAvailable(final int id) {
}
}

// Updating the root range to preserve the invariant of root range being non-empty if there are ranges
// with free IDs available. We do so by ignoring empty intervals.
while ((rootRange.start == rootRange.end) && (rootRange.next != null)) {
rootRange = rootRange.next;
}

return; // id found and taken
}

Expand All @@ -120,7 +127,7 @@ public void takeIfAvailable(final int id) {
*
* @param id an ID that the caller attempts to return (to free).
*/
public void returnId(final int id) {
public synchronized void returnId(final int id) {
Preconditions.checkArgument(id >= MIN_ALLOWED_MQTT_PACKET_ID && id <= MAX_ALLOWED_MQTT_PACKET_ID,
"Attempting to return an ID %s that is outside the valid packet IDs range.",
id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.mqtt.message.pool.exception;

public class MessageIdUnavailableException extends Exception {

public MessageIdUnavailableException(final int unavailableMessageId) {
super(String.format("Desired message id %d is unavailable", unavailableMessageId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.dropping.MessageDroppedService;
import com.hivemq.mqtt.message.pool.FreePacketIdRanges;
import com.hivemq.mqtt.message.pool.exception.MessageIdUnavailableException;
import com.hivemq.mqtt.message.pool.exception.NoMessageIdAvailableException;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.publish.PUBLISHFactory;
Expand Down Expand Up @@ -222,7 +223,12 @@ public void onSuccess(final ImmutableList<MessageWithID> messages) {
for (int i = 0, messagesSize = messages.size(); i < messagesSize; i++) {
final MessageWithID message = messages.get(i);
final FreePacketIdRanges freePacketIdRanges = clientConnection.getFreePacketIdRanges();
freePacketIdRanges.takeIfAvailable(message.getPacketIdentifier());
try {
freePacketIdRanges.takeSpecificId(message.getPacketIdentifier());
} catch (final MessageIdUnavailableException e) {
log.warn("The desired packet ID was not available when polling inflight messages: {}",
e.getMessage());
}

if (message instanceof PUBLISH) {
final PUBLISH publish = (PUBLISH) message;
Expand Down Expand Up @@ -411,7 +417,8 @@ public void onFailure(final @NotNull Throwable t) {
}

private @NotNull ImmutableIntArray createMessageIds(
final @NotNull FreePacketIdRanges messageIDPool, final int pollMessageLimit) throws NoMessageIdAvailableException {
final @NotNull FreePacketIdRanges messageIDPool, final int pollMessageLimit)
throws NoMessageIdAvailableException {
final ImmutableIntArray.Builder builder = ImmutableIntArray.builder(pollMessageLimit);
for (int i = 0; i < pollMessageLimit; i++) {
final int nextId = messageIDPool.takeNextId();
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/hivemq/util/ReasonStrings.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class ReasonStrings {
"Not authorized to connect. Will topic contained wildcard characters (#/+). The broker does not allow this.";
public static final String CONNACK_NOT_AUTHORIZED_NO_AUTHENTICATOR =
"Not authorized to connect. No authenticator registered.";
public static final String CONNACK_PACKET_TOO_LARGE = "Sent CONNECT exceeded the maximum permissible size.";
public static final String CONNACK_NOT_AUTHORIZED_FAILED = "Not authorized to connect. Authentication failed.";

public static final String CONNACK_TOPIC_NAME_INVALID_WILL_LENGTH =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.hivemq.mqtt.handler.InterceptorHandler;
import com.hivemq.mqtt.handler.auth.AuthHandler;
import com.hivemq.mqtt.handler.auth.AuthInProgressMessageHandler;
import com.hivemq.mqtt.handler.connack.MqttConnacker;
import com.hivemq.mqtt.handler.connect.ConnectHandler;
import com.hivemq.mqtt.handler.connect.ConnectionLimiterHandler;
import com.hivemq.mqtt.handler.connect.NoConnectIdleHandler;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class ChannelDependenciesTest {
@Mock
private @NotNull MqttConnectDecoder mqttConnectDecoder;

@Mock
private @NotNull MqttConnacker mqttConnacker;

@Mock
private @NotNull EncoderFactory encoderFactory;

Expand Down Expand Up @@ -160,6 +164,7 @@ public void setUp() throws Exception {
pingRequestHandler,
restrictionsConfigurationService,
mqttConnectDecoder,
mqttConnacker,
eventLog,
sslParameterHandler,
mqttDecoders,
Expand Down
Loading

0 comments on commit 1277578

Please sign in to comment.