Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Authentication Changes: IAuthenticator.checkValid should return `Comp…
Browse files Browse the repository at this point in the history
…letableFuture<Boolean>`
Dinesh Narayanan committed Oct 11, 2019
1 parent 6ee0137 commit 5d23808
Showing 17 changed files with 263 additions and 123 deletions.
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ public final class BrokerConstants {

public static final String INTERCEPT_HANDLER_PROPERTY_NAME = "intercept.handler";
public static final String BROKER_INTERCEPTOR_THREAD_POOL_SIZE = "intercept.thread_pool.size";
public static final String AUTH_THREAD_POOL_SIZE = "auth.intercept.thread_pool.size";
public static final String PERSISTENT_STORE_PROPERTY_NAME = "persistent_store";
public static final String AUTOSAVE_INTERVAL_PROPERTY_NAME = "autosave_interval";
public static final String PASSWORD_FILE_PROPERTY_NAME = "password_file";
86 changes: 53 additions & 33 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
@@ -15,8 +15,8 @@
*/
package io.moquette.broker;

import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -28,15 +28,18 @@

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;

final class MQTTConnection {

@@ -158,27 +161,36 @@ void processConnect(MqttConnectMessage msg) {
username, channel);
}

if (!login(msg, clientId)) {
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}

try {
LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
sessionRegistry.bindToSession(this, msg, clientId);

initializeKeepAliveTimeout(channel, msg, clientId);
setupInflightResender(channel);

NettyUtils.clientID(channel, clientId);
LOG.trace("CONNACK sent, channel: {}", channel);
postOffice.dispatchConnection(msg);
LOG.trace("dispatch connection: {}", msg.toString());
} catch (SessionCorruptedException scex) {
LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", clientId, channel);
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
final String newClientId = clientId;
CompletableFuture<Boolean> future = login(msg, newClientId);
future.whenComplete((status, t) -> {
if (t == null) {
if (status) {
try {
LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
sessionRegistry.bindToSession(this, msg, newClientId);

initializeKeepAliveTimeout(channel, msg, newClientId);
setupInflightResender(channel);

NettyUtils.clientID(channel, newClientId);
LOG.trace("CONNACK sent, channel: {}", channel);
postOffice.dispatchConnection(msg);
LOG.trace("dispatch connection: {}", msg.toString());
} catch (SessionCorruptedException scex) {
LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", newClientId, channel);
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
} else {
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
channel.close().addListener(CLOSE_ON_FAILURE);
}
} else {
LOG.warn("MQTT connection for client ID {} cannot be created, channel: {}. Error message {}", newClientId, channel, t.getMessage());
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
channel.close().addListener(CLOSE_ON_FAILURE);
}
});
}

private void setupInflightResender(Channel channel) {
@@ -222,27 +234,35 @@ private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean ses
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
}

private boolean login(MqttConnectMessage msg, final String clientId) {
private CompletableFuture<Boolean> login(MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().password().getBytes(StandardCharsets.UTF_8);
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
return false;
return CompletableFuture.completedFuture(false);
}
final String login = msg.payload().userName();
if (!authenticator.checkValid(clientId, login, pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
return false;
}
NettyUtils.userName(channel, login);

return authenticator.checkValid(clientId, login, pwd).handleAsync((status, t) -> {
if (t == null) {
if (status) NettyUtils.userName(channel, login);
else
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);

return status;
} else {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}. Error message: {}", clientId, login, t.getMessage());
return false;
}
});
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
return false;
return CompletableFuture.completedFuture(false);
}
return true;
return CompletableFuture.completedFuture(true);
}

void handleConnectionLost() {
6 changes: 4 additions & 2 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
@@ -310,7 +310,9 @@ void dispatchConnectionLost(String clientId,String userName){
}

void flushInFlight(MQTTConnection mqttConnection) {
Session targetSession = sessionRegistry.retrieve(mqttConnection.getClientId());
targetSession.flushAllQueuedMessages();
if(mqttConnection.getClientId() != null) {
Session targetSession = sessionRegistry.retrieve(mqttConnection.getClientId());
targetSession.flushAllQueuedMessages();
}
}
}
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
@@ -232,7 +232,7 @@ private IAuthenticator initializeAuthenticator(IAuthenticator authenticator, ICo
if (passwdPath.isEmpty()) {
authenticator = new AcceptAllAuthenticator();
} else {
authenticator = new ResourceAuthenticator(resourceLoader, passwdPath);
authenticator = new ResourceAuthenticator(resourceLoader, passwdPath, props);
}
LOG.info("An {} authenticator instance will be used", authenticator.getClass().getName());
}
20 changes: 20 additions & 0 deletions broker/src/main/java/io/moquette/broker/Utils.java
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Utility static methods, like Map get with default value, or elvis operator.
@@ -46,6 +48,24 @@ public static byte[] readBytesAndRewind(ByteBuf payload) {
return payloadContent;
}

public static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(30, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

private Utils() {
}
}
Original file line number Diff line number Diff line change
@@ -16,10 +16,12 @@

package io.moquette.broker.security;

import java.util.concurrent.CompletableFuture;

public class AcceptAllAuthenticator implements IAuthenticator {

@Override
public boolean checkValid(String clientId, String username, byte[] password) {
return true;
public CompletableFuture<Boolean> checkValid(String clientId, String username, byte[] password) {
return CompletableFuture.completedFuture(true);
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

import com.zaxxer.hikari.HikariDataSource;
import io.moquette.BrokerConstants;
import io.moquette.broker.Utils;
import io.moquette.broker.config.IConfig;
import org.apache.commons.codec.binary.Hex;
import org.slf4j.Logger;
@@ -29,6 +30,9 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Load user credentials from a SQL database. sql driver must be provided at runtime
@@ -38,14 +42,16 @@ public class DBAuthenticator implements IAuthenticator {
private static final Logger LOG = LoggerFactory.getLogger(DBAuthenticator.class);

private final MessageDigest messageDigest;
private final ExecutorService executor;
private HikariDataSource dataSource;
private String sqlQuery;

public DBAuthenticator(IConfig conf) {
this(conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_DRIVER, ""),
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_URL, ""),
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_QUERY, ""),
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_DIGEST, ""));
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_DIGEST, ""),
Integer.parseInt(conf.getProperty(BrokerConstants.AUTH_THREAD_POOL_SIZE, "1")));
}

/**
@@ -59,11 +65,14 @@ public DBAuthenticator(IConfig conf) {
* : sql query like : "SELECT PASSWORD FROM USER WHERE LOGIN=?"
* @param digestMethod
* : password encoding algorithm : "MD5", "SHA-1", "SHA-256"
* @param authExecutorPoolSize
* : auth executor pool size. Defaults to 1.
*/
public DBAuthenticator(String driver, String jdbcUrl, String sqlQuery, String digestMethod) {
public DBAuthenticator(String driver, String jdbcUrl, String sqlQuery, String digestMethod, int authExecutorPoolSize) {
this.sqlQuery = sqlQuery;
this.dataSource = new HikariDataSource();
this.dataSource.setJdbcUrl(jdbcUrl);
this.executor = Executors.newFixedThreadPool(authExecutorPoolSize);

try {
this.messageDigest = MessageDigest.getInstance(digestMethod);
@@ -74,46 +83,52 @@ public DBAuthenticator(String driver, String jdbcUrl, String sqlQuery, String di
}

@Override
public synchronized boolean checkValid(String clientId, String username, byte[] password) {
// Check Username / Password in DB using sqlQuery
if (username == null || password == null) {
LOG.info("username or password was null");
return false;
}

ResultSet resultSet = null;
PreparedStatement preparedStatement = null;
Connection conn = null;
try {
conn = this.dataSource.getConnection();

preparedStatement = conn.prepareStatement(this.sqlQuery);
preparedStatement.setString(1, username);
resultSet = preparedStatement.executeQuery();
if (resultSet.next()) {
final String foundPwq = resultSet.getString(1);
messageDigest.update(password);
byte[] digest = messageDigest.digest();
String encodedPasswd = new String(Hex.encodeHex(digest));
return foundPwq.equals(encodedPasswd);
public synchronized CompletableFuture<Boolean> checkValid(String clientId, String username, byte[] password) {
return CompletableFuture.supplyAsync(() -> {
// Check Username / Password in DB using sqlQuery
if (username == null || password == null) {
LOG.info("username or password was null");
return false;
}
} catch (SQLException sqlex) {
LOG.error("Error quering DB for username: {}", username, sqlex);
} finally {

ResultSet resultSet = null;
PreparedStatement preparedStatement = null;
Connection conn = null;
try {
if (resultSet != null) {
resultSet.close();
}
if (preparedStatement != null) {
preparedStatement.close();
conn = this.dataSource.getConnection();

preparedStatement = conn.prepareStatement(this.sqlQuery);
preparedStatement.setString(1, username);
resultSet = preparedStatement.executeQuery();
if (resultSet.next()) {
final String foundPwq = resultSet.getString(1);
messageDigest.update(password);
byte[] digest = messageDigest.digest();
String encodedPasswd = new String(Hex.encodeHex(digest));
return foundPwq.equals(encodedPasswd);
}
if (conn != null) {
conn.close();
} catch (SQLException sqlex) {
LOG.error("Error quering DB for username: {}", username, sqlex);
} finally {
try {
if (resultSet != null) {
resultSet.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
LOG.error("Error releasing connection to the datasource", username, e);
}
} catch (SQLException e) {
LOG.error("Error releasing connection to the datasource", username, e);
}
}
return false;
return false;
}, executor);
}

public void cleanup() {
Utils.shutdownAndAwaitTermination(executor);
}
}
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
package io.moquette.broker.security;

import io.moquette.broker.config.FileResourceLoader;
import io.moquette.broker.config.IConfig;

/**
* Load user credentials from a text file. Each line of the file is formatted as
@@ -35,7 +36,7 @@
*/
public class FileAuthenticator extends ResourceAuthenticator {

public FileAuthenticator(String parent, String filePath) {
super(new FileResourceLoader(parent), filePath);
public FileAuthenticator(String parent, String filePath, IConfig config) {
super(new FileResourceLoader(parent), filePath, config);
}
}
Original file line number Diff line number Diff line change
@@ -16,10 +16,12 @@

package io.moquette.broker.security;

import java.util.concurrent.CompletableFuture;

/**
* username and password checker
*/
public interface IAuthenticator {

boolean checkValid(String clientId, String username, byte[] password);
CompletableFuture<Boolean> checkValid(String clientId, String username, byte[] password);
}
Loading

0 comments on commit 5d23808

Please sign in to comment.