Skip to content

Commit

Permalink
Add independent storage cleanup based on scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc-André Weber committed Aug 28, 2023
1 parent 3210dbb commit 4b672a7
Show file tree
Hide file tree
Showing 36 changed files with 2,254 additions and 1,202 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@
<mockito.version>1.10.19</mockito.version>
<rest-assured.version>4.3.0</rest-assured.version>
<awaitility.version>1.6.5</awaitility.version>
<jedis.version>2.9.0</jedis.version>
<jedis.version>3.7.0</jedis.version>
<project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
<sonatypeOssDistMgmtSnapshotsUrl>
https://oss.sonatype.org/content/repositories/snapshots/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.RedisOptions;
import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.util.ModuleConfiguration;

import java.util.concurrent.atomic.AtomicReference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public class RedisRestStorageRunner {

public static void main(String[] args) {
ModuleConfiguration modConfig = new ModuleConfiguration()
.storageType(ModuleConfiguration.StorageType.redis);
.storageType(ModuleConfiguration.StorageType.redis)
.resourceCleanupIntervalSec(10);

Vertx.vertx().deployVerticle(new RestStorageMod(), new DeploymentOptions().setConfig(modConfig.asJsonObject()), event ->
LoggerFactory.getLogger(RedisRestStorageRunner.class).info("rest-storage started"));
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/swisspush/reststorage/RestStorageMod.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.vertx.core.http.HttpServerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.redis.RedisStorage;
import org.swisspush.reststorage.util.ModuleConfiguration;

public class RestStorageMod extends AbstractVerticle {
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/org/swisspush/reststorage/lock/Lock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.swisspush.reststorage.lock;

import io.vertx.core.Future;

/**
* Cluster wide locks allow you to obtain exclusive locks across the cluster.
* This is useful when you want to do something or access a resource on only one node of a cluster at any one time.
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public interface Lock {
/**
* Try to acquire a lock.
* The <code>token</code> parameter value must be unique across all clients and all lock requests. The <code>lockExpiryMs</code>
* parameter defines the expiry of the lock.
* When not manually released, the lock will be released automatically when expired.
*
* @param lock The name of the lock to acquire
* @param token A unique token to define the owner of the lock
* @param lockExpiryMs The lock expiry in milliseconds
* @return Returns a Future holding a Boolean value whether the lock could be successfully acquired or not
*/
Future<Boolean> acquireLock(String lock, String token, long lockExpiryMs);

/**
* Try to release a lock.
* The <code>token</code> parameter value is used to verify that only the owner of the lock can release it.
* The <code>token</code> parameter value also prevents the original owner of an already expired lock to release a lock
* which has been acquired by another client.
*
* @param lock The name of the lock to release
* @param token A unique token to verify if the owner of the lock tries to release the lock
* @return Returns a Promise holding a Boolean value whether the lock could be successfully released or not
*/
Future<Boolean> releaseLock(String lock, String token);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.swisspush.reststorage.lock.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.lock.Lock;
import org.swisspush.reststorage.lock.lua.LockLuaScripts;
import org.swisspush.reststorage.lock.lua.LuaScriptState;
import org.swisspush.reststorage.lock.lua.ReleaseLockRedisCommand;
import org.swisspush.reststorage.redis.RedisUtils;
import org.swisspush.reststorage.util.FailedAsyncResult;

import java.util.Collections;
import java.util.List;

/**
* Implementation of the {@link Lock} interface based on a redis database.
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class RedisBasedLock implements Lock {

private Logger log = LoggerFactory.getLogger(RedisBasedLock.class);

public static final String STORAGE_PREFIX = "rest-storage-lock:";

private LuaScriptState releaseLockLuaScriptState;
private RedisProvider redisProvider;

public RedisBasedLock(RedisProvider redisProvider) {
this.redisProvider = redisProvider;
this.releaseLockLuaScriptState = new LuaScriptState(LockLuaScripts.LOCK_RELEASE, redisProvider, false);
}

private void redisSetWithOptions(String key, String value, boolean nx, long px, Handler<AsyncResult<Response>> handler) {
JsonArray options = new JsonArray();
options.add("PX").add(px);
if (nx) {
options.add("NX");
}
redisProvider.redis().onSuccess(redisAPI -> redisAPI.send(Command.SET, RedisUtils.toPayload(key, value, options).toArray(new String[0]))
.onComplete(handler)).onFailure(throwable -> handler.handle(new FailedAsyncResult<>(throwable)));
}

@Override
public Future<Boolean> acquireLock(String lock, String token, long lockExpiryMs) {
Promise<Boolean> promise = Promise.promise();
redisSetWithOptions(buildLockKey(lock), token, true, lockExpiryMs, event -> {
if (event.succeeded()) {
if (event.result() != null) {
promise.complete("OK".equalsIgnoreCase(event.result().toString()));
} else {
promise.complete(false);
}
} else {
promise.fail(event.cause().getMessage());
}
});
return promise.future();
}

@Override
public Future<Boolean> releaseLock(String lock, String token) {
Promise<Boolean> promise = Promise.promise();
List<String> keys = Collections.singletonList(buildLockKey(lock));
List<String> arguments = Collections.singletonList(token);
ReleaseLockRedisCommand cmd = new ReleaseLockRedisCommand(releaseLockLuaScriptState,
keys, arguments, redisProvider, log, promise);
cmd.exec(0);
return promise.future();
}

private String buildLockKey(String lock) {
return STORAGE_PREFIX + lock;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.swisspush.reststorage.lock.lua;


/**
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public enum LockLuaScripts implements LuaScript {

LOCK_RELEASE("lock_release.lua");

private String file;

LockLuaScripts(String file) {
this.file = file;
}

@Override
public String getFilename() {
return file;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.swisspush.reststorage.lock.lua;

/**
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public interface LuaScript {
String getFilename();
}
140 changes: 140 additions & 0 deletions src/main/java/org/swisspush/reststorage/lock/lua/LuaScriptState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package org.swisspush.reststorage.lock.lua;

import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.reststorage.redis.RedisProvider;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;

/**
* Created by webermarca on 01.07.2016.
*/
public class LuaScriptState {
private LuaScript luaScriptType;
/**
* the script itself
*/
private String script;
/**
* if the script logs to the redis log
*/
private boolean logoutput = false;
/**
* the sha, over which the script can be accessed in redis
*/
private String sha;

private RedisProvider redisProvider;

private Logger log = LoggerFactory.getLogger(LuaScriptState.class);

public LuaScriptState(LuaScript luaScriptType, RedisProvider redisProvider, boolean logoutput) {
this.luaScriptType = luaScriptType;
this.redisProvider = redisProvider;
this.logoutput = logoutput;
this.composeLuaScript(luaScriptType);
this.loadLuaScript(new RedisCommandDoNothing(), 0);
}

/**
* Reads the script from the classpath and removes logging output if logoutput is false.
* The script is stored in the class member script.
*
* @param luaScriptType
*/
private void composeLuaScript(LuaScript luaScriptType) {
log.info("read the lua script for script type: {} with logoutput: {}", luaScriptType, logoutput);
this.script = readLuaScriptFromClasspath(luaScriptType);
this.sha = DigestUtils.sha1Hex(this.script);
}

private String readLuaScriptFromClasspath(LuaScript luaScriptType) {
BufferedReader in = new BufferedReader(new InputStreamReader(this.getClass().getClassLoader().getResourceAsStream(luaScriptType.getFilename())));
StringBuilder sb;
try {
sb = new StringBuilder();
String line;
while ((line = in.readLine()) != null) {
if (!logoutput && line.contains("redis.log(redis.LOG_NOTICE,")) {
continue;
}
sb.append(line).append("\n");
}

} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
in.close();
} catch (IOException e) {
// Ignore
}
}
return sb.toString();
}

/**
* Load the get script into redis and store the sha in the class member sha.
*
* @param redisCommand the redis command that should be executed, after the script is loaded.
* @param executionCounter a counter to control recursion depth
*/
public void loadLuaScript(final RedisCommand redisCommand, int executionCounter) {
final int executionCounterIncr = ++executionCounter;
// check first if the lua script already exists in the store
redisProvider.redis().onSuccess(redisAPI -> redisAPI.script(Arrays.asList("exists", sha), resultArray -> {
if (resultArray.failed()) {
log.error("Error checking whether lua script exists", resultArray.cause());
return;
}
Long exists = resultArray.result().get(0).toLong();
// if script already
if (Long.valueOf(1).equals(exists)) {
log.debug("RedisStorage script already exists in redis cache: {}", luaScriptType);
redisCommand.exec(executionCounterIncr);
} else {
log.info("load lua script for script type: {} logutput: {}", luaScriptType, logoutput);
redisAPI.script(Arrays.asList("load",script), stringAsyncResult -> {
String newSha = stringAsyncResult.result().toString();
log.info("got sha from redis for lua script: {}: {}", luaScriptType, newSha);
if (!newSha.equals(sha)) {
log.warn("the sha calculated by myself: {} doesn't match with the sha from redis: {}. " +
"We use the sha from redis", sha, newSha);
}
sha = newSha;
log.info("execute redis command for script type: {} with new sha: {}", luaScriptType, sha);
redisCommand.exec(executionCounterIncr);
});
}
})).onFailure(throwable -> log.error("Redis: Error checking whether lua script exists", throwable));
}

public String getScript() {
return script;
}

public void setScript(String script) {
this.script = script;
}

public boolean getLogoutput() {
return logoutput;
}

public void setLogoutput(boolean logoutput) {
this.logoutput = logoutput;
}

public String getSha() {
return sha;
}

public void setSha(String sha) {
this.sha = sha;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.swisspush.reststorage.lock.lua;

/**
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public interface RedisCommand {
void exec(int executionCounter);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.swisspush.reststorage.lock.lua;

/**
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class RedisCommandDoNothing implements RedisCommand{

@Override
public void exec(int executionCounter) {
// do nothing here
}
}
Loading

0 comments on commit 4b672a7

Please sign in to comment.