Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom connection validation to ConnectionPoolSupport #3081 #3138

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.apache.commons.pool2.BasePooledObjectFactory;
Expand Down Expand Up @@ -60,6 +61,7 @@
* </pre>
*
* @author Mark Paluch
* @author dae won
* @since 4.3
*/
public abstract class ConnectionPoolSupport {
Expand All @@ -77,8 +79,8 @@ private ConnectionPoolSupport() {
* @return the connection pool.
*/
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config) {
return createGenericObjectPool(connectionSupplier, config, true);
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, Predicate<T> connectionValidator) {
return createGenericObjectPool(connectionSupplier, config, true, connectionValidator);
}

/**
Expand All @@ -94,14 +96,17 @@ private ConnectionPoolSupport() {
*/
@SuppressWarnings("unchecked")
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, boolean wrapConnections) {
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, boolean wrapConnections,
Predicate<T> connectionValidator) {

LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");
LettuceAssert.notNull(connectionValidator, "Connection validator must not be null");

AtomicReference<Origin<T>> poolRef = new AtomicReference<>();

GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {
GenericObjectPool<T> pool = new GenericObjectPool<T>(
new EnhancedRedisPooledObjectFactory<T>(connectionSupplier, connectionValidator), config) {

@Override
public T borrowObject() throws Exception {
Expand Down Expand Up @@ -249,4 +254,43 @@ public CompletableFuture<Void> returnObjectAsync(T o) throws Exception {

}

private static class EnhancedRedisPooledObjectFactory<T extends StatefulConnection<?, ?>>
extends BasePooledObjectFactory<T> {

private final Supplier<T> connectionSupplier;

private final Predicate<T> connectionValidator;

EnhancedRedisPooledObjectFactory(Supplier<T> connectionSupplier, Predicate<T> connectionValidator) {
this.connectionSupplier = connectionSupplier;
this.connectionValidator = connectionValidator;
}

@Override
public T create() throws Exception {
return connectionSupplier.get();
}

@Override
public PooledObject<T> wrap(T obj) {
return new DefaultPooledObject<>(obj);
}

@Override
public boolean validateObject(PooledObject<T> p) {
T connection = p.getObject();
return connection.isOpen() && connectionValidator.test(connection);
}

@Override
public void destroyObject(PooledObject<T> p) throws Exception {
try {
p.getObject().close();
} catch (Exception e) {
e.printStackTrace();
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,13 @@ void verifierShouldCatchTooFewParametersDeclarations() {
void shouldWorkWithPooledConnection() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(client::connect, new GenericObjectPoolConfig<>());
.createGenericObjectPool(client::connect, new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ static void afterClass() {
void genericPoolShouldWorkWithWrappedConnections() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

borrowAndReturn(pool);
borrowAndClose(pool);
Expand All @@ -91,7 +97,13 @@ void genericPoolShouldCloseConnectionsAboveMaxIdleSize() throws Exception {
poolConfig.setMaxIdle(2);

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), poolConfig);
.createGenericObjectPool(() -> client.connect(), poolConfig, connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

borrowAndReturn(pool);
borrowAndClose(pool);
Expand Down Expand Up @@ -120,7 +132,13 @@ void genericPoolShouldCloseConnectionsAboveMaxIdleSize() throws Exception {
void genericPoolShouldWorkWithPlainConnections() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false);
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false, connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

borrowAndReturn(pool);

Expand Down Expand Up @@ -151,7 +169,13 @@ void softReferencePoolShouldWorkWithPlainConnections() throws Exception {
void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand All @@ -172,7 +196,13 @@ void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Excepti
void wrappedConnectionShouldUseWrappers() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand All @@ -197,7 +227,13 @@ void wrappedMasterSlaveConnectionShouldUseWrappers() throws Exception {

GenericObjectPool<StatefulRedisMasterReplicaConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> MasterReplica.connect(client, new StringCodec(), RedisURI.create(host, port)),
new GenericObjectPoolConfig<>());
new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisMasterReplicaConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand All @@ -223,7 +259,13 @@ void wrappedClusterConnectionShouldUseWrappers() throws Exception {
RedisURI.create(TestSettings.host(), 7379));

GenericObjectPool<StatefulRedisClusterConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(redisClusterClient::connect, new GenericObjectPoolConfig<>());
.createGenericObjectPool(redisClusterClient::connect, new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisClusterConnection<String, String> connection = pool.borrowObject();
RedisAdvancedClusterCommands<String, String> sync = connection.sync();
Expand All @@ -250,7 +292,13 @@ void wrappedClusterConnectionShouldUseWrappers() throws Exception {
void plainConnectionShouldNotUseWrappers() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false);
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false, connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand Down Expand Up @@ -295,7 +343,13 @@ void softRefPoolShouldWorkWithWrappedConnections() throws Exception {
void wrappedObjectClosedAfterReturn() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), true);
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), true, connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand All @@ -317,7 +371,13 @@ void wrappedObjectClosedAfterReturn() throws Exception {
void tryWithResourcesReturnsConnectionToPool() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> usedConnection = null;
try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
Expand Down