Skip to content

Commit

Permalink
[SDCISA-13221] K8s: vertx-rest-storage (#139)
Browse files Browse the repository at this point in the history
- made redis client aware of redis cluster
  • Loading branch information
thwint authored Sep 18, 2023
1 parent ec617a4 commit 4783761
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisClientType;
import io.vertx.redis.client.RedisOptions;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -42,7 +43,7 @@ public DefaultRedisProvider(Vertx vertx, ModuleConfiguration configuration) {

@Override
public Future<RedisAPI> redis() {
if(redisAPI != null) {
if (redisAPI != null) {
return Future.succeededFuture(redisAPI);
} else {
return setupRedisClient();
Expand All @@ -53,15 +54,15 @@ private boolean reconnectEnabled() {
return configuration.getRedisReconnectAttempts() != 0;
}

private Future<RedisAPI> setupRedisClient(){
private Future<RedisAPI> setupRedisClient() {
Promise<RedisAPI> currentPromise = Promise.promise();
Promise<RedisAPI> masterPromise = connectPromiseRef.accumulateAndGet(
currentPromise, (oldVal, newVal) -> (oldVal != null) ? oldVal : newVal);
if( currentPromise == masterPromise ){
if (currentPromise == masterPromise) {
// Our promise is THE promise. So WE have to resolve it.
connectToRedis().onComplete(event -> {
connectPromiseRef.getAndSet(null);
if(event.failed()) {
if (event.failed()) {
currentPromise.fail(event.cause());
} else {
redisAPI = event.result();
Expand Down Expand Up @@ -89,13 +90,19 @@ private Future<RedisAPI> connectToRedis() {
}

if (connecting.compareAndSet(false, true)) {
redis = Redis.createClient(vertx, new RedisOptions()
.setConnectionString(createConnectString())
RedisOptions redisOptions = new RedisOptions()
.setPassword((redisAuth == null ? "" : redisAuth))
.setMaxPoolSize(redisMaxPoolSize)
.setMaxPoolWaiting(redisMaxPoolWaitingSize)
.setPoolRecycleTimeout(redisPoolRecycleTimeoutMs)
.setMaxWaitingHandlers(redisMaxPipelineWaitingSize));
.setMaxWaitingHandlers(redisMaxPipelineWaitingSize);
if (configuration.isRedisClustered()) {
redisOptions.setType(RedisClientType.CLUSTER);
redisOptions.addConnectionString(createConnectString());
} else {
redisOptions.setConnectionString(createConnectString());
}
redis = Redis.createClient(vertx, redisOptions);

redis.connect().onSuccess(conn -> {
log.info("Successfully connected to redis");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum StorageType {
private String redisHost = "localhost";
private int redisPort = 6379;
private boolean redisEnableTls;
private boolean redisClustered;
/**
* @deprecated Instance authentication is considered as legacy. With Redis from 6.x on the ACL authentication method should be used.
*/
Expand Down Expand Up @@ -129,6 +130,11 @@ public ModuleConfiguration redisEnableTls(boolean redisEnableTls) {
return this;
}

public ModuleConfiguration redisClustered(boolean redisClustered) {
this.redisClustered = redisClustered;
return this;
}

public ModuleConfiguration redisReconnectAttempts(int redisReconnectAttempts) {
this.redisReconnectAttempts = redisReconnectAttempts;
return this;
Expand Down Expand Up @@ -308,6 +314,10 @@ public boolean isRedisEnableTls() {
return redisEnableTls;
}

public boolean isRedisClustered() {
return redisClustered;
}

public String getRedisAuth() {
return redisAuth;
}
Expand Down

0 comments on commit 4783761

Please sign in to comment.