diff --git a/README.md b/README.md index 0f7db98..a219fef 100644 --- a/README.md +++ b/README.md @@ -188,30 +188,31 @@ The data compression feature is not compatible with all vertx-rest-storage featu The following configuration values are available: -| Property | Type | Default value | Description | -|:----------------------------------------|:-------|:-------------------------|:--------------------------------------------------------------------------------------------------------------------------------------| -| root | common | . | The prefix for the directory or redis key | -| storageType | common | filesystem | The storage implementation to use. Choose between filesystem or redis | -| port | common | 8989 | The port the mod listens to when HTTP API is enabled. | -| httpRequestHandlerEnabled | common | true | When set to _false_, the storage is accessible throught the event bus only. | -| httpRequestHandlerAuthenticationEnabled | common | false | Enable / disable authentication for the HTTP API | -| httpRequestHandlerUsername | common | | The username for the HTTP API authentication | -| httpRequestHandlerPassword | common | | The password for the HTTP API authentication | -| prefix | common | / | The part of the URL path before this handler (aka "context path" in JEE terminology) | -| storageAddress | common | resource-storage | The eventbus address the mod listens to. | -| editorConfig | common | | Additional configuration values for the editor | -| confirmCollectionDelete | common | false | When set to _true_, an additional _recursive=true_ url parameter has to be set to delete collections | -| redisHost | redis | localhost | The host where redis is running on | -| redisPort | redis | 6379 | The port where redis is running on | -| expirablePrefix | redis | rest-storage:expirable | The prefix for expirable data redis keys | -| resourcesPrefix | redis | rest-storage:resources | The prefix for resources redis keys | -| collectionsPrefix | redis | rest-storage:collections | The prefix for collections redis keys | -| deltaResourcesPrefix | redis | delta:resources | The prefix for delta resources redis keys | -| deltaEtagsPrefix | redis | delta:etags | The prefix for delta etags redis keys | -| lockPrefix | redis | rest-storage:locks | The prefix for lock redis keys | -| resourceCleanupAmount | redis | 100000 | The maximum amount of resources to clean in a single cleanup run | -| rejectStorageWriteOnLowMemory | redis | false | When set to _true_, PUT requests with the x-importance-level header can be rejected when memory gets low | -| freeMemoryCheckIntervalMs | redis | 60000 | The interval in milliseconds to calculate the actual memory usage | +| Property | Type | Default value | Description | +|:----------------------------------------|:-------|:-------------------------|:------------------------------------------------------------------------------------------------------------------------------| +| root | common | . | The prefix for the directory or redis key | +| storageType | common | filesystem | The storage implementation to use. Choose between filesystem or redis | +| port | common | 8989 | The port the mod listens to when HTTP API is enabled. | +| httpRequestHandlerEnabled | common | true | When set to _false_, the storage is accessible throught the event bus only. | +| httpRequestHandlerAuthenticationEnabled | common | false | Enable / disable authentication for the HTTP API | +| httpRequestHandlerUsername | common | | The username for the HTTP API authentication | +| httpRequestHandlerPassword | common | | The password for the HTTP API authentication | +| prefix | common | / | The part of the URL path before this handler (aka "context path" in JEE terminology) | +| storageAddress | common | resource-storage | The eventbus address the mod listens to. | +| editorConfig | common | | Additional configuration values for the editor | +| confirmCollectionDelete | common | false | When set to _true_, an additional _recursive=true_ url parameter has to be set to delete collections | +| redisHost | redis | localhost | The host where redis is running on | +| redisPort | redis | 6379 | The port where redis is running on | +| expirablePrefix | redis | rest-storage:expirable | The prefix for expirable data redis keys | +| resourcesPrefix | redis | rest-storage:resources | The prefix for resources redis keys | +| collectionsPrefix | redis | rest-storage:collections | The prefix for collections redis keys | +| deltaResourcesPrefix | redis | delta:resources | The prefix for delta resources redis keys | +| deltaEtagsPrefix | redis | delta:etags | The prefix for delta etags redis keys | +| lockPrefix | redis | rest-storage:locks | The prefix for lock redis keys | +| resourceCleanupAmount | redis | 100000 | The maximum amount of resources to clean in a single cleanup run | +| resourceCleanupIntervalSec | redis | | The interval (in seconds) how often to peform the storage cleanup. When set to _null_ no periodic storage cleanup is peformed | +| rejectStorageWriteOnLowMemory | redis | false | When set to _true_, PUT requests with the x-importance-level header can be rejected when memory gets low | +| freeMemoryCheckIntervalMs | redis | 60000 | The interval in milliseconds to calculate the actual memory usage | ### Configuration util diff --git a/pom.xml b/pom.xml index c913215..5f7dedf 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush rest-storage - 3.0.15-SNAPSHOT + 3.0.16-SNAPSHOT rest-storage Persistence for REST resources in the filesystem or a redis database @@ -415,7 +415,7 @@ 1.10.19 4.3.0 1.6.5 - 2.9.0 + 3.7.0 UTF8 https://oss.sonatype.org/content/repositories/snapshots/ diff --git a/src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java b/src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java index fda415d..1167a27 100644 --- a/src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java +++ b/src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java @@ -6,6 +6,8 @@ import io.vertx.redis.client.Redis; import io.vertx.redis.client.RedisAPI; import io.vertx.redis.client.RedisOptions; +import org.apache.commons.lang.StringUtils; +import org.swisspush.reststorage.redis.RedisProvider; import org.swisspush.reststorage.util.ModuleConfiguration; import java.util.concurrent.atomic.AtomicReference; @@ -19,7 +21,7 @@ public class DefaultRedisProvider implements RedisProvider { private final Vertx vertx; - private ModuleConfiguration configuration; + private final ModuleConfiguration configuration; private RedisAPI redisAPI; @@ -62,9 +64,8 @@ private Future setupRedisClient(){ private Future connectToRedis() { Promise promise = Promise.promise(); - String protocol = configuration.isRedisEnableTls() ? "rediss://" : "redis://"; Redis.createClient(vertx, new RedisOptions() - .setConnectionString(protocol + configuration.getRedisHost() + ":" + configuration.getRedisPort()) + .setConnectionString(createConnectString()) .setPassword((configuration.getRedisAuth() == null ? "" : configuration.getRedisAuth())) .setMaxPoolSize(configuration.getMaxRedisConnectionPoolSize()) .setMaxPoolWaiting(configuration.getMaxQueueWaiting()) @@ -79,4 +80,16 @@ private Future connectToRedis() { return promise.future(); } + + private String createConnectString() { + StringBuilder connectionStringBuilder = new StringBuilder(); + connectionStringBuilder.append(configuration.isRedisEnableTls() ? "rediss://" : "redis://"); + String redisUser = configuration.getRedisUser(); + String redisPassword = configuration.getRedisPassword(); + if (StringUtils.isNotEmpty(redisUser) && StringUtils.isNotEmpty(redisPassword)) { + connectionStringBuilder.append(configuration.getRedisUser()).append(":").append(redisPassword).append("@"); + } + connectionStringBuilder.append(configuration.getRedisHost()).append(":").append(configuration.getRedisPort()); + return connectionStringBuilder.toString(); + } } diff --git a/src/main/java/org/swisspush/reststorage/FileSystemStorage.java b/src/main/java/org/swisspush/reststorage/FileSystemStorage.java index 22e9d2d..7e213c6 100644 --- a/src/main/java/org/swisspush/reststorage/FileSystemStorage.java +++ b/src/main/java/org/swisspush/reststorage/FileSystemStorage.java @@ -27,7 +27,7 @@ public class FileSystemStorage implements Storage { private final int rootLen; private final FileSystemDirLister fileSystemDirLister; - private Logger log = LoggerFactory.getLogger(FileSystemStorage.class); + private final Logger log = LoggerFactory.getLogger(FileSystemStorage.class); public FileSystemStorage(Vertx vertx, String root) { this.vertx = vertx; diff --git a/src/main/java/org/swisspush/reststorage/MimeTypeResolver.java b/src/main/java/org/swisspush/reststorage/MimeTypeResolver.java index 1f02414..9f208f2 100644 --- a/src/main/java/org/swisspush/reststorage/MimeTypeResolver.java +++ b/src/main/java/org/swisspush/reststorage/MimeTypeResolver.java @@ -10,7 +10,7 @@ public class MimeTypeResolver { private Map mimeTypes = new HashMap<>(); - private String defaultMimeType; + private final String defaultMimeType; public MimeTypeResolver(String defaultMimeType) { this.defaultMimeType = defaultMimeType; diff --git a/src/main/java/org/swisspush/reststorage/ModuleConfigurationAuthentication.java b/src/main/java/org/swisspush/reststorage/ModuleConfigurationAuthentication.java index a7dd47b..fcc7849 100644 --- a/src/main/java/org/swisspush/reststorage/ModuleConfigurationAuthentication.java +++ b/src/main/java/org/swisspush/reststorage/ModuleConfigurationAuthentication.java @@ -34,7 +34,7 @@ private User(String name, String password) { } } - private User user; + private final User user; public ModuleConfigurationAuthentication(ModuleConfiguration configuration) { Objects.requireNonNull(configuration); diff --git a/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java b/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java index d294dcc..7304a33 100644 --- a/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java +++ b/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java @@ -14,7 +14,8 @@ public class RedisRestStorageRunner { public static void main(String[] args) { ModuleConfiguration modConfig = new ModuleConfiguration() - .storageType(ModuleConfiguration.StorageType.redis); + .storageType(ModuleConfiguration.StorageType.redis) + .resourceCleanupIntervalSec(10); Vertx.vertx().deployVerticle(new RestStorageMod(), new DeploymentOptions().setConfig(modConfig.asJsonObject()), event -> LoggerFactory.getLogger(RedisRestStorageRunner.class).info("rest-storage started")); diff --git a/src/main/java/org/swisspush/reststorage/RestStorageMod.java b/src/main/java/org/swisspush/reststorage/RestStorageMod.java index dda25eb..ae3a6bc 100644 --- a/src/main/java/org/swisspush/reststorage/RestStorageMod.java +++ b/src/main/java/org/swisspush/reststorage/RestStorageMod.java @@ -5,11 +5,13 @@ import io.vertx.core.http.HttpServerRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.swisspush.reststorage.redis.RedisProvider; +import org.swisspush.reststorage.redis.RedisStorage; import org.swisspush.reststorage.util.ModuleConfiguration; public class RestStorageMod extends AbstractVerticle { - private Logger log = LoggerFactory.getLogger(RestStorageMod.class); + private final Logger log = LoggerFactory.getLogger(RestStorageMod.class); private RedisProvider redisProvider; diff --git a/src/main/java/org/swisspush/reststorage/lock/Lock.java b/src/main/java/org/swisspush/reststorage/lock/Lock.java new file mode 100644 index 0000000..aef427b --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/lock/Lock.java @@ -0,0 +1,36 @@ +package org.swisspush.reststorage.lock; + +import io.vertx.core.Future; + +/** + * Cluster wide locks allow you to obtain exclusive locks across the cluster. + * This is useful when you want to do something or access a resource on only one node of a cluster at any one time. + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public interface Lock { + /** + * Try to acquire a lock. + * The token parameter value must be unique across all clients and all lock requests. The lockExpiryMs + * parameter defines the expiry of the lock. + * When not manually released, the lock will be released automatically when expired. + * + * @param lock The name of the lock to acquire + * @param token A unique token to define the owner of the lock + * @param lockExpiryMs The lock expiry in milliseconds + * @return Returns a Future holding a Boolean value whether the lock could be successfully acquired or not + */ + Future acquireLock(String lock, String token, long lockExpiryMs); + + /** + * Try to release a lock. + * The token parameter value is used to verify that only the owner of the lock can release it. + * The token parameter value also prevents the original owner of an already expired lock to release a lock + * which has been acquired by another client. + * + * @param lock The name of the lock to release + * @param token A unique token to verify if the owner of the lock tries to release the lock + * @return Returns a Promise holding a Boolean value whether the lock could be successfully released or not + */ + Future releaseLock(String lock, String token); +} diff --git a/src/main/java/org/swisspush/reststorage/lock/impl/RedisBasedLock.java b/src/main/java/org/swisspush/reststorage/lock/impl/RedisBasedLock.java new file mode 100644 index 0000000..e8609cf --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/lock/impl/RedisBasedLock.java @@ -0,0 +1,83 @@ +package org.swisspush.reststorage.lock.impl; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.json.JsonArray; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.reststorage.redis.RedisProvider; +import org.swisspush.reststorage.lock.Lock; +import org.swisspush.reststorage.lock.lua.LockLuaScripts; +import org.swisspush.reststorage.lock.lua.LuaScriptState; +import org.swisspush.reststorage.lock.lua.ReleaseLockRedisCommand; +import org.swisspush.reststorage.redis.RedisUtils; +import org.swisspush.reststorage.util.FailedAsyncResult; + +import java.util.Collections; +import java.util.List; + +/** + * Implementation of the {@link Lock} interface based on a redis database. + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public class RedisBasedLock implements Lock { + + private final Logger log = LoggerFactory.getLogger(RedisBasedLock.class); + + public static final String STORAGE_PREFIX = "rest-storage-lock:"; + + private final LuaScriptState releaseLockLuaScriptState; + private final RedisProvider redisProvider; + + public RedisBasedLock(RedisProvider redisProvider) { + this.redisProvider = redisProvider; + this.releaseLockLuaScriptState = new LuaScriptState(LockLuaScripts.LOCK_RELEASE, redisProvider, false); + } + + private void redisSetWithOptions(String key, String value, boolean nx, long px, Handler> handler) { + JsonArray options = new JsonArray(); + options.add("PX").add(px); + if (nx) { + options.add("NX"); + } + redisProvider.redis().onSuccess(redisAPI -> redisAPI.send(Command.SET, RedisUtils.toPayload(key, value, options).toArray(new String[0])) + .onComplete(handler)).onFailure(throwable -> handler.handle(new FailedAsyncResult<>(throwable))); + } + + @Override + public Future acquireLock(String lock, String token, long lockExpiryMs) { + Promise promise = Promise.promise(); + redisSetWithOptions(buildLockKey(lock), token, true, lockExpiryMs, event -> { + if (event.succeeded()) { + if (event.result() != null) { + promise.complete("OK".equalsIgnoreCase(event.result().toString())); + } else { + promise.complete(false); + } + } else { + promise.fail(event.cause().getMessage()); + } + }); + return promise.future(); + } + + @Override + public Future releaseLock(String lock, String token) { + Promise promise = Promise.promise(); + List keys = Collections.singletonList(buildLockKey(lock)); + List arguments = Collections.singletonList(token); + ReleaseLockRedisCommand cmd = new ReleaseLockRedisCommand(releaseLockLuaScriptState, + keys, arguments, redisProvider, log, promise); + cmd.exec(0); + return promise.future(); + } + + private String buildLockKey(String lock) { + return STORAGE_PREFIX + lock; + } +} diff --git a/src/main/java/org/swisspush/reststorage/lock/lua/LockLuaScripts.java b/src/main/java/org/swisspush/reststorage/lock/lua/LockLuaScripts.java new file mode 100644 index 0000000..eb0d776 --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/lock/lua/LockLuaScripts.java @@ -0,0 +1,21 @@ +package org.swisspush.reststorage.lock.lua; + + +/** + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public enum LockLuaScripts implements LuaScript { + + LOCK_RELEASE("lock_release.lua"); + + private final String file; + + LockLuaScripts(String file) { + this.file = file; + } + + @Override + public String getFilename() { + return file; + } +} diff --git a/src/main/java/org/swisspush/reststorage/lock/lua/LuaScript.java b/src/main/java/org/swisspush/reststorage/lock/lua/LuaScript.java new file mode 100644 index 0000000..acf5b8e --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/lock/lua/LuaScript.java @@ -0,0 +1,8 @@ +package org.swisspush.reststorage.lock.lua; + +/** + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public interface LuaScript { + String getFilename(); +} diff --git a/src/main/java/org/swisspush/reststorage/lock/lua/LuaScriptState.java b/src/main/java/org/swisspush/reststorage/lock/lua/LuaScriptState.java new file mode 100644 index 0000000..b80da46 --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/lock/lua/LuaScriptState.java @@ -0,0 +1,140 @@ +package org.swisspush.reststorage.lock.lua; + +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.reststorage.redis.RedisProvider; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; + +/** + * Created by webermarca on 01.07.2016. + */ +public class LuaScriptState { + private final LuaScript luaScriptType; + /** + * the script itself + */ + private String script; + /** + * if the script logs to the redis log + */ + private boolean logoutput = false; + /** + * the sha, over which the script can be accessed in redis + */ + private String sha; + + private final RedisProvider redisProvider; + + private final Logger log = LoggerFactory.getLogger(LuaScriptState.class); + + public LuaScriptState(LuaScript luaScriptType, RedisProvider redisProvider, boolean logoutput) { + this.luaScriptType = luaScriptType; + this.redisProvider = redisProvider; + this.logoutput = logoutput; + this.composeLuaScript(luaScriptType); + this.loadLuaScript(new RedisCommandDoNothing(), 0); + } + + /** + * Reads the script from the classpath and removes logging output if logoutput is false. + * The script is stored in the class member script. + * + * @param luaScriptType + */ + private void composeLuaScript(LuaScript luaScriptType) { + log.info("read the lua script for script type: {} with logoutput: {}", luaScriptType, logoutput); + this.script = readLuaScriptFromClasspath(luaScriptType); + this.sha = DigestUtils.sha1Hex(this.script); + } + + private String readLuaScriptFromClasspath(LuaScript luaScriptType) { + BufferedReader in = new BufferedReader(new InputStreamReader(this.getClass().getClassLoader().getResourceAsStream(luaScriptType.getFilename()))); + StringBuilder sb; + try { + sb = new StringBuilder(); + String line; + while ((line = in.readLine()) != null) { + if (!logoutput && line.contains("redis.log(redis.LOG_NOTICE,")) { + continue; + } + sb.append(line).append("\n"); + } + + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + in.close(); + } catch (IOException e) { + // Ignore + } + } + return sb.toString(); + } + + /** + * Load the get script into redis and store the sha in the class member sha. + * + * @param redisCommand the redis command that should be executed, after the script is loaded. + * @param executionCounter a counter to control recursion depth + */ + public void loadLuaScript(final RedisCommand redisCommand, int executionCounter) { + final int executionCounterIncr = ++executionCounter; + // check first if the lua script already exists in the store + redisProvider.redis().onSuccess(redisAPI -> redisAPI.script(Arrays.asList("exists", sha), resultArray -> { + if (resultArray.failed()) { + log.error("Error checking whether lua script exists", resultArray.cause()); + return; + } + Long exists = resultArray.result().get(0).toLong(); + // if script already + if (Long.valueOf(1).equals(exists)) { + log.debug("RedisStorage script already exists in redis cache: {}", luaScriptType); + redisCommand.exec(executionCounterIncr); + } else { + log.info("load lua script for script type: {} logutput: {}", luaScriptType, logoutput); + redisAPI.script(Arrays.asList("load",script), stringAsyncResult -> { + String newSha = stringAsyncResult.result().toString(); + log.info("got sha from redis for lua script: {}: {}", luaScriptType, newSha); + if (!newSha.equals(sha)) { + log.warn("the sha calculated by myself: {} doesn't match with the sha from redis: {}. " + + "We use the sha from redis", sha, newSha); + } + sha = newSha; + log.info("execute redis command for script type: {} with new sha: {}", luaScriptType, sha); + redisCommand.exec(executionCounterIncr); + }); + } + })).onFailure(throwable -> log.error("Redis: Error checking whether lua script exists", throwable)); + } + + public String getScript() { + return script; + } + + public void setScript(String script) { + this.script = script; + } + + public boolean getLogoutput() { + return logoutput; + } + + public void setLogoutput(boolean logoutput) { + this.logoutput = logoutput; + } + + public String getSha() { + return sha; + } + + public void setSha(String sha) { + this.sha = sha; + } + +} diff --git a/src/main/java/org/swisspush/reststorage/lock/lua/RedisCommand.java b/src/main/java/org/swisspush/reststorage/lock/lua/RedisCommand.java new file mode 100644 index 0000000..67d14dc --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/lock/lua/RedisCommand.java @@ -0,0 +1,8 @@ +package org.swisspush.reststorage.lock.lua; + +/** + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public interface RedisCommand { + void exec(int executionCounter); +} diff --git a/src/main/java/org/swisspush/reststorage/lock/lua/RedisCommandDoNothing.java b/src/main/java/org/swisspush/reststorage/lock/lua/RedisCommandDoNothing.java new file mode 100644 index 0000000..1f5da71 --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/lock/lua/RedisCommandDoNothing.java @@ -0,0 +1,12 @@ +package org.swisspush.reststorage.lock.lua; + +/** + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public class RedisCommandDoNothing implements RedisCommand{ + + @Override + public void exec(int executionCounter) { + // do nothing here + } +} diff --git a/src/main/java/org/swisspush/reststorage/lock/lua/ReleaseLockRedisCommand.java b/src/main/java/org/swisspush/reststorage/lock/lua/ReleaseLockRedisCommand.java new file mode 100644 index 0000000..d939ebc --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/lock/lua/ReleaseLockRedisCommand.java @@ -0,0 +1,61 @@ +package org.swisspush.reststorage.lock.lua; + +import io.vertx.core.Promise; +import org.slf4j.Logger; +import org.swisspush.reststorage.redis.RedisProvider; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public class ReleaseLockRedisCommand implements RedisCommand { + + private final LuaScriptState luaScriptState; + private final List keys; + private final List arguments; + private final Promise promise; + private final RedisProvider redisProvider; + private final Logger log; + + public ReleaseLockRedisCommand(LuaScriptState luaScriptState, List keys, List arguments, + RedisProvider redisProvider, Logger log, final Promise promise) { + this.luaScriptState = luaScriptState; + this.keys = keys; + this.arguments = arguments; + this.redisProvider = redisProvider; + this.log = log; + this.promise = promise; + } + + @Override + public void exec(int executionCounter) { + List args = new ArrayList<>(); + args.add(luaScriptState.getSha()); + args.add(String.valueOf(keys.size())); + args.addAll(keys); + args.addAll(arguments); + + redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { + if (event.succeeded()) { + Long unlocked = event.result().toLong(); + promise.complete(unlocked > 0); + } else { + String message = event.cause().getMessage(); + if (message != null && message.startsWith("NOSCRIPT")) { + log.warn("ReleaseLockRedisCommand script couldn't be found, reload it"); + log.warn("amount the script got loaded: " + executionCounter); + if (executionCounter > 10) { + promise.fail("amount the script got loaded is higher than 10, we abort"); + } else { + luaScriptState.loadLuaScript(new ReleaseLockRedisCommand(luaScriptState, keys, + arguments, redisProvider, log, promise), executionCounter); + } + } else { + promise.fail("ReleaseLockRedisCommand request failed with message: " + message); + } + } + })).onFailure(throwable -> promise.fail("Redis: ReleaseLockRedisCommand request failed with error: " + throwable.getMessage())); + } +} diff --git a/src/main/java/org/swisspush/reststorage/RedisProvider.java b/src/main/java/org/swisspush/reststorage/redis/RedisProvider.java similarity index 85% rename from src/main/java/org/swisspush/reststorage/RedisProvider.java rename to src/main/java/org/swisspush/reststorage/redis/RedisProvider.java index df6240f..2c9ea6b 100644 --- a/src/main/java/org/swisspush/reststorage/RedisProvider.java +++ b/src/main/java/org/swisspush/reststorage/redis/RedisProvider.java @@ -1,4 +1,4 @@ -package org.swisspush.reststorage; +package org.swisspush.reststorage.redis; import io.vertx.core.Future; import io.vertx.redis.client.RedisAPI; diff --git a/src/main/java/org/swisspush/reststorage/RedisStorage.java b/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java similarity index 93% rename from src/main/java/org/swisspush/reststorage/RedisStorage.java rename to src/main/java/org/swisspush/reststorage/redis/RedisStorage.java index dec2c3d..99009c6 100644 --- a/src/main/java/org/swisspush/reststorage/RedisStorage.java +++ b/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java @@ -1,1194 +1,1247 @@ -package org.swisspush.reststorage; - -import io.vertx.core.*; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.json.DecodeException; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; -import io.vertx.core.streams.ReadStream; -import io.vertx.core.streams.WriteStream; -import io.vertx.redis.client.Response; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.text.StrSubstitutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.swisspush.reststorage.util.GZIPUtil; -import org.swisspush.reststorage.util.LockMode; -import org.swisspush.reststorage.util.ModuleConfiguration; -import org.swisspush.reststorage.util.ResourceNameUtil; - -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.text.DecimalFormat; -import java.util.*; -import java.util.stream.Stream; - -public class RedisStorage implements Storage { - - private Logger log = LoggerFactory.getLogger(RedisStorage.class); - - // set to very high value = Wed Nov 16 5138 09:46:39 - private static final String MAX_EXPIRE_IN_MILLIS = "99999999999999"; - private final String EMPTY = ""; - private static final float MAX_PERCENTAGE = 100.0f; - private static final float MIN_PERCENTAGE = 0.0f; - private static final int CLEANUP_BULK_SIZE = 200; - - private String redisResourcesPrefix; - private String redisCollectionsPrefix; - private String redisDeltaResourcesPrefix; - private String redisDeltaEtagsPrefix; - private String expirableSet; - private long cleanupResourcesAmount; - private String redisLockPrefix; - private Vertx vertx; - - private RedisProvider redisProvider; - private Map luaScripts = new HashMap<>(); - private DecimalFormat decimalFormat; - - private Optional currentMemoryUsageOptional = Optional.empty(); - - public RedisStorage(Vertx vertx, ModuleConfiguration config, RedisProvider redisProvider) { - this.expirableSet = config.getExpirablePrefix(); - this.redisResourcesPrefix = config.getResourcesPrefix(); - this.redisCollectionsPrefix = config.getCollectionsPrefix(); - this.redisDeltaResourcesPrefix = config.getDeltaResourcesPrefix(); - this.redisDeltaEtagsPrefix = config.getDeltaEtagsPrefix(); - this.cleanupResourcesAmount = config.getResourceCleanupAmount(); - this.redisLockPrefix = config.getLockPrefix(); - - this.vertx = vertx; - this.redisProvider = redisProvider; - this.decimalFormat = new DecimalFormat(); - this.decimalFormat.setMaximumFractionDigits(1); - - // load all the lua scripts - LuaScriptState luaGetScriptState = new LuaScriptState(LuaScript.GET, false); - luaGetScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); - luaScripts.put(LuaScript.GET, luaGetScriptState); - - LuaScriptState luaStorageExpandScriptState = new LuaScriptState(LuaScript.STORAGE_EXPAND, false); - luaStorageExpandScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); - luaScripts.put(LuaScript.STORAGE_EXPAND, luaStorageExpandScriptState); - - LuaScriptState luaPutScriptState = new LuaScriptState(LuaScript.PUT, false); - luaPutScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); - luaScripts.put(LuaScript.PUT, luaPutScriptState); - - LuaScriptState luaDeleteScriptState = new LuaScriptState(LuaScript.DELETE, false); - luaDeleteScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); - luaScripts.put(LuaScript.DELETE, luaDeleteScriptState); - - LuaScriptState luaCleanupScriptState = new LuaScriptState(LuaScript.CLEANUP, false); - luaCleanupScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); - luaScripts.put(LuaScript.CLEANUP, luaCleanupScriptState); - - if (config.isRejectStorageWriteOnLowMemory()) { - calculateCurrentMemoryUsage().onComplete(optionalAsyncResult -> currentMemoryUsageOptional = optionalAsyncResult.result()); - startPeriodicMemoryUsageUpdate(config.getFreeMemoryCheckIntervalMs()); - } - } - - private void startPeriodicMemoryUsageUpdate(long intervalMs) { - vertx.setPeriodic(intervalMs, updateMemoryUsage -> calculateCurrentMemoryUsage().onComplete(optionalAsyncResult -> currentMemoryUsageOptional = optionalAsyncResult.result())); - } - - public Future> calculateCurrentMemoryUsage() { - Promise> promise = Promise.promise(); - - redisProvider.redis().onSuccess(redisAPI -> redisAPI.info(Collections.singletonList("memory"), memoryInfo -> { - if (memoryInfo.failed()) { - log.error("Unable to get memory information from redis", memoryInfo.cause()); - promise.complete(Optional.empty()); - return; - } - - long totalSystemMemory; - try { - Optional totalSystemMemoryOpt = memoryInfo.result().toString() - .lines() - .filter(source -> source.startsWith("total_system_memory:")) - .findAny(); - if (totalSystemMemoryOpt.isEmpty()) { - log.warn("No 'total_system_memory' section received from redis. Unable to calculate the current memory usage"); - promise.complete(Optional.empty()); - return; - } - totalSystemMemory = Long.parseLong(totalSystemMemoryOpt.get().split(":")[1]); - if (totalSystemMemory == 0L) { - log.warn("'total_system_memory' value 0 received from redis. Unable to calculate the current memory usage"); - promise.complete(Optional.empty()); - return; - } - - } catch (NumberFormatException ex) { - logPropertyWarning("total_system_memory", ex); - promise.complete(Optional.empty()); - return; - } - - long usedMemory; - try { - Optional usedMemoryOpt = memoryInfo.result().toString() - .lines() - .filter(source -> source.startsWith("used_memory:")) - .findAny(); - if (usedMemoryOpt.isEmpty()) { - log.warn("No 'used_memory' section received from redis. Unable to calculate the current memory usage"); - promise.complete(Optional.empty()); - return; - } - usedMemory = Long.parseLong(usedMemoryOpt.get().split(":")[1]); - } catch (NumberFormatException ex) { - logPropertyWarning("used_memory", ex); - promise.complete(Optional.empty()); - return; - } - - float currentMemoryUsagePercentage = ((float) usedMemory / totalSystemMemory) * 100; - if (currentMemoryUsagePercentage > MAX_PERCENTAGE) { - currentMemoryUsagePercentage = MAX_PERCENTAGE; - } else if (currentMemoryUsagePercentage < MIN_PERCENTAGE) { - currentMemoryUsagePercentage = MIN_PERCENTAGE; - } - log.info("Current memory usage is {}%", decimalFormat.format(currentMemoryUsagePercentage)); - promise.complete(Optional.of(currentMemoryUsagePercentage)); - })) - .onFailure(event -> { - log.error("Unable to get memory information from redis", event); - promise.complete(Optional.empty()); - }); - return promise.future(); - } - - private void logPropertyWarning(String property, Exception ex) { - log.warn("No or invalid '{}' value received from redis. Unable to calculate the current memory usage. " + - "Exception: {}", property, ex.toString()); - } - - private enum LuaScript { - GET("get.lua"), STORAGE_EXPAND("storageExpand.lua"), PUT("put.lua"), DELETE("del.lua"), CLEANUP("cleanup.lua"); - - private String file; - - LuaScript(String file) { - this.file = file; - } - - public String getFile() { - return file; - } - } - - /** - * Holds the state of a lua script. - */ - private class LuaScriptState { - - private LuaScript luaScriptType; - /** - * the script itself - */ - private String script; - /** - * if the script logs to the redis log - */ - private boolean logoutput = false; - /** - * the sha, over which the script can be accessed in redis - */ - private String sha; - - private LuaScriptState(LuaScript luaScriptType, boolean logoutput) { - this.luaScriptType = luaScriptType; - this.logoutput = logoutput; - this.composeLuaScript(luaScriptType); - this.loadLuaScript(new RedisCommandDoNothing(), 0); - } - - /** - * Reads the script from the classpath and removes logging output if logoutput is false. - * The script is stored in the class member script. - * - * @param luaScriptType - */ - private void composeLuaScript(LuaScript luaScriptType) { - log.info("read the lua script for script type: {} with logoutput: {}", luaScriptType, logoutput); - - // It is not possible to evalsha or eval inside lua scripts, - // so we wrap the cleanupscript around the deletescript manually to avoid code duplication. - // we have to comment the return, so that the cleanup script doesn't terminate - if (LuaScript.CLEANUP.equals(luaScriptType)) { - Map values = new HashMap<>(); - values.put("delscript", readLuaScriptFromClasspath(LuaScript.DELETE).replaceAll("return", "--return")); - StrSubstitutor sub = new StrSubstitutor(values, "--%(", ")"); - this.script = sub.replace(readLuaScriptFromClasspath(LuaScript.CLEANUP)); - } else { - this.script = readLuaScriptFromClasspath(luaScriptType); - } - this.sha = DigestUtils.sha1Hex(this.script); - } - - private String readLuaScriptFromClasspath(LuaScript luaScriptType) { - BufferedReader in = new BufferedReader(new InputStreamReader(this.getClass().getClassLoader().getResourceAsStream(luaScriptType.getFile()))); - StringBuilder sb; - try { - sb = new StringBuilder(); - String line; - while ((line = in.readLine()) != null) { - if (!logoutput && line.contains("redis.log(redis.LOG_NOTICE,")) { - continue; - } - sb.append(line).append("\n"); - } - - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - try { - in.close(); - } catch (IOException e) { - // Ignore - } - } - return sb.toString(); - } - - /** - * Rereads the lua script, eg. if the loglevel changed. - */ - public void recomposeLuaScript() { - this.composeLuaScript(luaScriptType); - } - - /** - * Load the get script into redis and store the sha in the class member sha. - * - * @param redisCommand the redis command that should be executed, after the script is loaded. - * @param executionCounter a counter to control recursion depth - */ - public void loadLuaScript(final RedisCommand redisCommand, int executionCounter) { - final int executionCounterIncr = ++executionCounter; - - redisProvider.redis().onSuccess(redisAPI -> { - // check first if the lua script already exists in the store - redisAPI.script(Arrays.asList("exists", sha), resultArray -> { - if (resultArray.failed()) { - log.error("Error checking whether lua script exists", resultArray.cause()); - return; - } - Long exists = resultArray.result().get(0).toLong(); - // if script already - if (Long.valueOf(1).equals(exists)) { - log.debug("RedisStorage script already exists in redis cache: " + luaScriptType); - redisCommand.exec(executionCounterIncr); - } else { - log.info("load lua script for script type: {} logoutput: {}", luaScriptType, logoutput); - redisAPI.script(Arrays.asList("load", script), stringAsyncResult -> { - if (stringAsyncResult.failed()) { - log.error("Loading of lua script {} failed", luaScriptType); - return; - } - String newSha = stringAsyncResult.result().toString(); - log.info("got sha from redis for lua script: {}: {}", luaScriptType, newSha); - if (!newSha.equals(sha)) { - log.warn("the sha calculated by myself: {} doesn't match with the sha from redis: {}. " + - "We use the sha from redis", sha, newSha); - } - sha = newSha; - log.info("execute redis command for script type: {} with new sha: {}", luaScriptType, sha); - redisCommand.exec(executionCounterIncr); - }); - } - }); - }) - .onFailure(event -> log.error("Error checking whether lua script exists", event)); - } - - public String getScript() { - return script; - } - - public void setScript(String script) { - this.script = script; - } - - public boolean getLogoutput() { - return logoutput; - } - - public void setLogoutput(boolean logoutput) { - this.logoutput = logoutput; - } - - public String getSha() { - return sha; - } - - public void setSha(String sha) { - this.sha = sha; - } - } - - /** - * The interface for a redis command. - */ - private interface RedisCommand { - void exec(int executionCounter); - } - - /** - * A dummy that can be passed if no RedisCommand should be executed. - */ - private static class RedisCommandDoNothing implements RedisCommand { - - @Override - public void exec(int executionCounter) { - // do nothing here - } - } - - /** - * If the loglevel is trace and the logoutput in luaScriptState is false, then reload the script with logoutput and execute the RedisCommand. - * If the loglevel is not trace and the logoutput in luaScriptState is true, then reload the script without logoutput and execute the RedisCommand. - * If the loglevel is matching the luaScriptState, just execute the RedisCommand. - * - * @param luaScript the type of lua script - * @param redisCommand the redis command to execute - */ - private void reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript luaScript, RedisCommand redisCommand, int executionCounter) { - boolean logoutput = log.isTraceEnabled(); - LuaScriptState luaScriptState = luaScripts.get(luaScript); - // if the loglevel didn't change, execute the command and return - if (logoutput == luaScriptState.getLogoutput()) { - redisCommand.exec(executionCounter); - return; - // if the loglevel changed, set the new loglevel into the luaScriptState, recompose the script and provide the redisCommand as parameter to execute - } else if (logoutput && !luaScriptState.getLogoutput()) { - luaScriptState.setLogoutput(true); - luaScriptState.recomposeLuaScript(); - - } else if (!logoutput && luaScriptState.getLogoutput()) { - luaScriptState.setLogoutput(false); - luaScriptState.recomposeLuaScript(); - } - luaScriptState.loadLuaScript(redisCommand, executionCounter); - } - - public class ByteArrayReadStream implements ReadStream { - - ByteArrayInputStream content; - int size; - boolean paused; - int position; - Handler endHandler; - Handler handler; - - public ByteArrayReadStream(byte[] byteArray) { - size = byteArray.length; - content = new ByteArrayInputStream(byteArray); - } - - private void doRead() { - vertx.runOnContext(v -> { - if (!paused) { - if (position < size) { - int toRead = 8192; - if (position + toRead > size) { - toRead = size - position; - } - byte[] bytes = new byte[toRead]; - content.read(bytes, 0, toRead); - handler.handle(Buffer.buffer(bytes)); - position += toRead; - doRead(); - } else { - endHandler.handle(null); - } - } - }); - } - - public ByteArrayReadStream resume() { - paused = false; - doRead(); - return this; - } - - @Override - public ReadStream fetch(long amount) { - return null; - } - - @Override - public ByteArrayReadStream pause() { - paused = true; - return this; - } - - @Override - public ByteArrayReadStream exceptionHandler(Handler handler) { - return this; - } - - @Override - public ReadStream handler(Handler handler) { - this.handler = handler; - doRead(); - return this; - } - - @Override - public ByteArrayReadStream endHandler(Handler endHandler) { - this.endHandler = endHandler; - return this; - } - } - - @Override - public Optional getCurrentMemoryUsage() { - return currentMemoryUsageOptional; - } - - @Override - public void get(String path, String etag, int offset, int limit, final Handler handler) { - final String key = encodePath(path); - List keys = Collections.singletonList(key); - List arguments = Arrays.asList( - redisResourcesPrefix, - redisCollectionsPrefix, - expirableSet, - String.valueOf(System.currentTimeMillis()), - MAX_EXPIRE_IN_MILLIS, - String.valueOf(offset), - String.valueOf(limit), - etag - ); - reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.GET, new Get(keys, arguments, handler), 0); - } - - /** - * The Get Command Execution. - * If the get script cannot be found under the sha in luaScriptState, reload the script. - * To avoid infinite recursion, we limit the recursion. - */ - private class Get implements RedisCommand { - - private List keys; - private List arguments; - private Handler handler; - - public Get(List keys, List arguments, final Handler handler) { - this.keys = keys; - this.arguments = arguments; - this.handler = handler; - } - - public void exec(final int executionCounter) { - List args = toPayload(luaScripts.get(LuaScript.GET).getSha(), keys.size(), keys, arguments); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { - if (event.succeeded()) { - Response values = event.result(); - if (log.isTraceEnabled()) { - log.trace("RedisStorage get result: {}", values); - } - if ("notModified".equals(values.toString())) { - notModified(handler); - } else if ("notFound".equals(values.toString())) { - notFound(handler); - } else { - handleJsonArrayValues(values, handler, "0".equals(arguments.get(5)) && - "-1".equals(arguments.get(6))); - } - } else { - String message = event.cause().getMessage(); - if (message != null && message.startsWith("NOSCRIPT")) { - log.warn("get script couldn't be found, reload it"); - log.warn("amount the script got loaded: {}", executionCounter); - if (executionCounter > 10) { - log.error("amount the script got loaded is higher than 10, we abort"); - } else { - luaScripts.get(LuaScript.GET).loadLuaScript(new Get(keys, arguments, handler), executionCounter); - } - } else { - log.error("GET request failed with message: {}", message); - } - } - })) - .onFailure(event -> log.error("Redis: GET request failed", event)); - } - } - - @Override - public void storageExpand(String path, String etag, List subResources, Handler handler) { - final String key = encodePath(path); - List keys = Collections.singletonList(key); - List arguments = Arrays.asList( - redisResourcesPrefix, - redisCollectionsPrefix, - expirableSet, - String.valueOf(System.currentTimeMillis()), - MAX_EXPIRE_IN_MILLIS, - StringUtils.join(subResources, ";"), - String.valueOf(subResources.size()) - ); - reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.STORAGE_EXPAND, new StorageExpand(keys, arguments, handler, etag), 0); - } - - /** - * The StorageExpand Command Execution. - * If the get script cannot be found under the sha in luaScriptState, reload the script. - * To avoid infinite recursion, we limit the recursion. - */ - private class StorageExpand implements RedisCommand { - - private List keys; - private List arguments; - private Handler handler; - private String etag; - - public StorageExpand(List keys, List arguments, final Handler handler, String etag) { - this.keys = keys; - this.arguments = arguments; - this.handler = handler; - this.etag = etag; - } - - public void exec(final int executionCounter) { - List args = toPayload(luaScripts.get(LuaScript.STORAGE_EXPAND).getSha(), keys.size(), keys, arguments); - - redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { - if (event.succeeded()) { - String value = event.result().toString(); - if (log.isTraceEnabled()) { - log.trace("RedisStorage get result: {}", value); - } - if ("compressionNotSupported".equalsIgnoreCase(value)) { - error(handler, "Collections having compressed resources are not supported in storage expand"); - return; - } - if ("notFound".equalsIgnoreCase(value)) { - notFound(handler); - return; - } - JsonObject expandResult = new JsonObject(); - - JsonArray resultArr = new JsonArray(value); - - for (Object resultEntry : resultArr) { - JsonArray entries = (JsonArray) resultEntry; - String subResourceName = ResourceNameUtil.resetReplacedColonsAndSemiColons(entries.getString(0)); - String subResourceValue = entries.getString(1); - if (subResourceValue.startsWith("[") && subResourceValue.endsWith("]")) { - expandResult.put(subResourceName, extractSortedJsonArray(subResourceValue)); - } else { - try { - expandResult.put(subResourceName, new JsonObject(subResourceValue)); - } catch (DecodeException ex) { - invalid(handler, "Error decoding invalid json resource '" + subResourceName + "'"); - return; - } - } - } - - byte[] finalExpandedContent = decodeBinary(expandResult.encode()); - String calcDigest = DigestUtils.sha1Hex(finalExpandedContent); - - if (calcDigest.equals(etag)) { - notModified(handler); - } else { - DocumentResource r = new DocumentResource(); - r.readStream = new ByteArrayReadStream(finalExpandedContent); - r.length = finalExpandedContent.length; - r.etag = calcDigest; - r.closeHandler = event1 -> { - // nothing to close - }; - handler.handle(r); - } - } else { - String message = event.cause().getMessage(); - if (message != null && message.startsWith("NOSCRIPT")) { - log.warn("storageExpand script couldn't be found, reload it"); - log.warn("amount the script got loaded: {}", executionCounter); - if (executionCounter > 10) { - log.error("amount the script got loaded is higher than 10, we abort"); - } else { - luaScripts.get(LuaScript.STORAGE_EXPAND).loadLuaScript( - new StorageExpand(keys, arguments, handler, etag), executionCounter); - } - } else { - log.error("StorageExpand request failed with message: {}", message); - } - } - })) - .onFailure(event -> log.error("Redis: StorageExpand request failed", event)); - } - } - - private JsonArray extractSortedJsonArray(String arrayString) { - String arrayContent = arrayString.replaceAll("\\[", EMPTY).replaceAll("\\]", EMPTY) - .replaceAll("\"", EMPTY).replaceAll("\\\\", EMPTY); - String[] splitted = StringUtils.split(arrayContent, ","); - List resources = new ArrayList<>(); - List collections = new ArrayList<>(); - for (String split : splitted) { - if (split.endsWith("/")) { - collections.add(split); - } else { - resources.add(split); - } - } - Collections.sort(collections); - collections.addAll(resources); - return new JsonArray(new ArrayList(collections)); - } - - private void handleJsonArrayValues(Response values, Handler handler, boolean allowEmptyReturn) { - String type = values.get(0).toString(); - if ("TYPE_RESOURCE".equals(type)) { - String valueStr = values.get(1).toString(); - DocumentResource r = new DocumentResource(); - byte[] content = decodeBinary(valueStr); - if (values.get(3) != null) { - // data is compressed - GZIPUtil.decompressResource(vertx, log, content, decompressedResult -> { - if (decompressedResult.succeeded()) { - r.readStream = new ByteArrayReadStream(decompressedResult.result()); - r.length = decompressedResult.result().length; - r.etag = values.get(2).toString(); - r.closeHandler = event -> { - // nothing to close - }; - handler.handle(r); - } else { - error(handler, "Error during decompression of resource: " + decompressedResult.cause().getMessage()); - } - }); - } else { - r.readStream = new ByteArrayReadStream(content); - r.length = content.length; - Response etagRsp = values.get(2); - r.etag = etagRsp == null ? null : etagRsp.toString(); - r.closeHandler = event -> { - // nothing to close - }; - handler.handle(r); - } - } else if ("TYPE_COLLECTION".equals(type)) { - CollectionResource r = new CollectionResource(); - Set items = new HashSet<>(); - for (Response value : values) { - String member = value.toString(); - if (!"TYPE_COLLECTION".equals(member)) { - if (member.endsWith(":")) { - member = member.replaceAll(":$", ""); - CollectionResource c = new CollectionResource(); - c.name = member; - items.add(c); - } else { - DocumentResource d = new DocumentResource(); - d.name = member; - items.add(d); - } - } - } - if (allowEmptyReturn && items.size() == 0) { - notFound(handler); - } else { - r.items = new ArrayList<>(items); - Collections.sort(r.items); - handler.handle(r); - } - } else { - notFound(handler); - } - } - - static class ByteArrayWriteStream implements WriteStream { - - private ByteArrayOutputStream bos = new ByteArrayOutputStream(); - - public byte[] getBytes() { - return bos.toByteArray(); - } - - @Override - public ByteArrayWriteStream setWriteQueueMaxSize(int maxSize) { - return this; - } - - @Override - public boolean writeQueueFull() { - return false; - } - - @Override - public ByteArrayWriteStream drainHandler(Handler handler) { - return this; - } - - @Override - public ByteArrayWriteStream exceptionHandler(Handler handler) { - return this; - } - - @Override - public Future write(Buffer data) { - try { - bos.write(data.getBytes()); - return Future.succeededFuture(); - } catch (IOException e) { - return Future.failedFuture(e.getMessage()); - } - - } - - @Override - public void write(Buffer data, Handler> handler) { - write(data).onComplete(handler); - } - - @Override - public Future end() { - try { - bos.close(); - return Future.succeededFuture(); - } catch (IOException e) { - return Future.failedFuture(e.getMessage()); - } - } - - @Override - public void end(Handler> handler) { - end().onComplete(handler); - } - } - - private String initEtagValue(String providedEtag) { - if (!isEmpty(providedEtag)) { - return providedEtag; - } - return UUID.randomUUID().toString(); - } - - - @Override - public void put(String path, final String etag, final boolean merge, final long expire, final String lockOwner, - final LockMode lockMode, final long lockExpire, final Handler handler) { - put(path, etag, merge, expire, lockOwner, lockMode, lockExpire, false, handler); - } - - @Override - public void put(String path, final String etag, final boolean merge, final long expire, final Handler handler) { - put(path, etag, merge, expire, "", LockMode.SILENT, 0, handler); - } - - @Override - public void put(String path, String etag, boolean merge, long expire, String lockOwner, LockMode lockMode, - long lockExpire, boolean storeCompressed, Handler handler) { - final String key = encodePath(path); - final DocumentResource d = new DocumentResource(); - final ByteArrayWriteStream stream = new ByteArrayWriteStream(); - - final String etagValue = initEtagValue(etag); - d.writeStream = stream; - d.closeHandler = event -> { - String expireInMillis = MAX_EXPIRE_IN_MILLIS; - if (expire > -1) { - expireInMillis = String.valueOf(System.currentTimeMillis() + (expire * 1000)); - } - - if (Long.parseLong(expireInMillis) > Long.parseLong(MAX_EXPIRE_IN_MILLIS)) { - // #76 reset to the defined max value - expireInMillis = MAX_EXPIRE_IN_MILLIS; - } - - String lockExpireInMillis = String.valueOf(System.currentTimeMillis() + (lockExpire * 1000)); - - List keys = Collections.singletonList(key); - - if (storeCompressed) { - String finalExpireInMillis = expireInMillis; - GZIPUtil.compressResource(vertx, log, stream.getBytes(), compressResourceResult -> { - if (compressResourceResult.succeeded()) { - List arg = Arrays.asList( - redisResourcesPrefix, - redisCollectionsPrefix, - expirableSet, - merge ? "true" : "false", - finalExpireInMillis, - MAX_EXPIRE_IN_MILLIS, - encodeBinary(compressResourceResult.result()), - etagValue, - redisLockPrefix, - lockOwner, - lockMode.text(), - lockExpireInMillis, - storeCompressed ? "1" : "0" - ); - reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.PUT, new Put(d, keys, arg, handler), 0); - } else { - error(handler, "Error during compression of resource"); - } - }); - } else { - List arguments = Arrays.asList( - redisResourcesPrefix, - redisCollectionsPrefix, - expirableSet, - merge ? "true" : "false", - expireInMillis, - MAX_EXPIRE_IN_MILLIS, - encodeBinary(stream.getBytes()), - etagValue, - redisLockPrefix, - lockOwner, - lockMode.text(), - lockExpireInMillis, - storeCompressed ? "1" : "0" - ); - reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.PUT, new Put(d, keys, arguments, handler), 0); - } - }; - handler.handle(d); - } - - /** - * The Put Command Execution. - * If the get script cannot be found under the sha in luaScriptState, reload the script. - * To avoid infinite recursion, we limit the recursion. - */ - private class Put implements RedisCommand { - - private DocumentResource d; - private List keys; - private List arguments; - private Handler handler; - - public Put(DocumentResource d, List keys, List arguments, Handler handler) { - this.d = d; - this.keys = keys; - this.arguments = arguments; - this.handler = handler; - } - - public void exec(final int executionCounter) { - List args = toPayload(luaScripts.get(LuaScript.PUT).getSha(), keys.size(), keys, arguments); - - redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { - if (event.succeeded()) { - String result = event.result().toString(); - if (log.isTraceEnabled()) { - log.trace("RedisStorage successful put. Result: {}", result); - } - if (result != null && result.startsWith("existingCollection")) { - CollectionResource c = new CollectionResource(); - handler.handle(c); - } else if (result != null && result.startsWith("existingResource")) { - DocumentResource d = new DocumentResource(); - d.exists = false; - handler.handle(d); - } else if ("notModified".equals(result)) { - notModified(handler); - } else if (LockMode.REJECT.text().equals(result)) { - rejected(handler); - } else { - d.endHandler.handle(null); - } - } else { - String message = event.cause().getMessage(); - if (message != null && message.startsWith("NOSCRIPT")) { - log.warn("put script couldn't be found, reload it"); - log.warn("amount the script got loaded: {}", executionCounter); - if (executionCounter > 10) { - log.error("amount the script got loaded is higher than 10, we abort"); - } else { - luaScripts.get(LuaScript.PUT).loadLuaScript(new Put(d, keys, arguments, handler), executionCounter); - } - } else if (message != null && d.errorHandler != null) { - log.error("PUT request failed with message: {}", message); - d.errorHandler.handle(event.cause()); - } - } - })) - .onFailure(event -> log.error("Redis: PUT request failed", event)); - } - } - - @Override - public void delete(String path, String lockOwner, LockMode lockMode, long lockExpire, boolean confirmCollectionDelete, - boolean deleteRecursive, final Handler handler) { - final String key = encodePath(path); - List keys = Collections.singletonList(key); - - String lockExpireInMillis = String.valueOf(System.currentTimeMillis() + (lockExpire * 1000)); - - List arguments = Arrays.asList( - redisResourcesPrefix, - redisCollectionsPrefix, - redisDeltaResourcesPrefix, - redisDeltaEtagsPrefix, - expirableSet, - String.valueOf(System.currentTimeMillis()), - MAX_EXPIRE_IN_MILLIS, - confirmCollectionDelete ? "true" : "false", - deleteRecursive ? "true" : "false", - redisLockPrefix, - lockOwner, - lockMode.text(), - lockExpireInMillis - ); - reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.DELETE, new Delete(keys, arguments, handler), 0); - } - - /** - * The Delete Command Execution. - * If the get script cannot be found under the sha in luaScriptState, reload the script. - * To avoid infinite recursion, we limit the recursion. - */ - private class Delete implements RedisCommand { - - private List keys; - private List arguments; - private Handler handler; - - public Delete(List keys, List arguments, final Handler handler) { - this.keys = keys; - this.arguments = arguments; - this.handler = handler; - } - - public void exec(final int executionCounter) { - List args = toPayload(luaScripts.get(LuaScript.DELETE).getSha(), keys.size(), keys, arguments); - - redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { - if (event.cause() != null && event.cause().getMessage().startsWith("NOSCRIPT")) { - log.warn("delete script couldn't be found, reload it"); - log.warn("amount the script got loaded: {}", executionCounter); - if (executionCounter > 10) { - log.error("amount the script got loaded is higher than 10, we abort"); - } else { - luaScripts.get(LuaScript.DELETE).loadLuaScript(new Delete(keys, arguments, handler), executionCounter); - } - return; - } - - String result = null; - if (event.result() != null) { - result = event.result().toString(); - } - if (log.isTraceEnabled()) { - log.trace("RedisStorage delete result: {}", result); - } - if ("notEmpty".equals(result)) { - notEmpty(handler); - return; - } - if ("notFound".equals(result)) { - notFound(handler); - return; - } else if (LockMode.REJECT.text().equals(result)) { - rejected(handler); - return; - } - Resource r = new Resource(); - handler.handle(r); - })) - .onFailure(event -> log.error("Redis: DELETE request failed", event)); - } - } - - /** - * Cleans up the outdated resources recursive. - * If the script which is refered over the luaScriptState.sha, the execution is aborted and the script is reloaded. - * - * @param handler the handler to execute - * @param cleanedLastRun how many resources were cleaned in the last run - * @param maxdel max resources to clean - * @param bulkSize how many resources should be cleaned in one run - */ - public void cleanupRecursive(final Handler handler, final long cleanedLastRun, final long maxdel, - final int bulkSize) { - List arguments = Arrays.asList( - redisResourcesPrefix, - redisCollectionsPrefix, - redisDeltaResourcesPrefix, - redisDeltaEtagsPrefix, - expirableSet, - "0", - MAX_EXPIRE_IN_MILLIS, - "false", - "true", - String.valueOf(System.currentTimeMillis()), - String.valueOf(bulkSize) - ); - List args = toPayload(luaScripts.get(LuaScript.CLEANUP).getSha(), 0, Collections.emptyList(), arguments); - - redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { - if (log.isTraceEnabled()) { - log.trace("RedisStorage cleanup resources succeeded: {}", event.succeeded()); - } - - if (event.failed() && event.cause() != null && event.cause().getMessage().startsWith("NOSCRIPT")) { - log.warn("the cleanup script is not loaded. Load it and exit. The Cleanup will success the next time"); - luaScripts.get(LuaScript.CLEANUP).loadLuaScript(new RedisCommandDoNothing(), 0); - return; - } - - long cleanedThisRun = 0; - if (event.succeeded() && event.result().toLong() != null) { - cleanedThisRun = event.result().toLong(); - } - if (log.isTraceEnabled()) { - log.trace("RedisStorage cleanup resources cleanded this run: {}", cleanedThisRun); - } - final long cleaned = cleanedLastRun + cleanedThisRun; - if (cleanedThisRun != 0 && cleaned < maxdel) { - if (log.isTraceEnabled()) { - log.trace("RedisStorage cleanup resources call recursive next bulk"); - } - cleanupRecursive(handler, cleaned, maxdel, bulkSize); - } else { - redisAPI.zcount(expirableSet, "0", String.valueOf(System.currentTimeMillis()), longAsyncResult -> { - Long result = longAsyncResult.result().toLong(); - if (log.isTraceEnabled()) { - log.trace("RedisStorage cleanup resources zcount on expirable set: {}", result); - } - int resToCleanLeft = 0; - if (result != null && result.intValue() >= 0) { - resToCleanLeft = result.intValue(); - } - JsonObject retObj = new JsonObject(); - retObj.put("cleanedResources", cleaned); - retObj.put("expiredResourcesLeft", resToCleanLeft); - DocumentResource r = new DocumentResource(); - byte[] content = decodeBinary(retObj.toString()); - r.readStream = new ByteArrayReadStream(content); - r.length = content.length; - r.closeHandler = event1 -> { - // nothing to close - }; - handler.handle(r); - }); - } - })) - .onFailure(event -> log.error("Redis: cleanupRecursive failed", event)); - } - - private String encodePath(String path) { - if (path.equals("/")) { - path = ""; - } - return ResourceNameUtil.replaceColonsAndSemiColons(path).replaceAll("/", ":"); - } - - private String encodeBinary(byte[] bytes) { - return new String(bytes, StandardCharsets.ISO_8859_1); - } - - private byte[] decodeBinary(String s) { - return s.getBytes(StandardCharsets.ISO_8859_1); - } - - private void notFound(Handler handler) { - Resource r = new Resource(); - r.exists = false; - handler.handle(r); - } - - private void notEmpty(Handler handler) { - Resource r = new Resource(); - r.error = true; - r.errorMessage = "directory not empty. Use recursive=true parameter to delete"; - handler.handle(r); - } - - private void notModified(Handler handler) { - Resource r = new Resource(); - r.modified = false; - handler.handle(r); - } - - private void rejected(Handler handler) { - Resource r = new Resource(); - r.rejected = true; - handler.handle(r); - } - - private void invalid(Handler handler, String invalidMessage) { - Resource r = new Resource(); - r.invalid = true; - r.invalidMessage = invalidMessage; - handler.handle(r); - } - - private void error(Handler handler, String errorMessage) { - Resource r = new Resource(); - r.error = true; - r.errorMessage = errorMessage; - handler.handle(r); - } - - @Override - public void cleanup(Handler handler, String cleanupResourcesAmountStr) { - long cleanupResourcesAmountUsed = cleanupResourcesAmount; - if (log.isTraceEnabled()) { - log.trace("RedisStorage cleanup resources, cleanupResourcesAmount: {}", cleanupResourcesAmountUsed); - } - try { - cleanupResourcesAmountUsed = Long.parseLong(cleanupResourcesAmountStr); - } catch (Exception e) { - // do nothing - } - cleanupRecursive(handler, 0, cleanupResourcesAmountUsed, CLEANUP_BULK_SIZE); - } - - private boolean isEmpty(CharSequence cs) { - return cs == null || cs.length() == 0; - } - - /** - * from https://github.com/vert-x3/vertx-redis-client/blob/3.9/src/main/java/io/vertx/redis/impl/RedisClientImpl.java#L94 - * - * @param parameters - * @return - */ - private static List toPayload(Object... parameters) { - List result = new ArrayList<>(parameters.length); - - for (Object param : parameters) { - // unwrap - if (param instanceof JsonArray) { - param = ((JsonArray) param).getList(); - } - // unwrap - if (param instanceof JsonObject) { - param = ((JsonObject) param).getMap(); - } - - if (param instanceof Collection) { - ((Collection) param).stream().filter(Objects::nonNull).forEach(o -> result.add(o.toString())); - } else if (param instanceof Map) { - for (Map.Entry pair : ((Map) param).entrySet()) { - result.add(pair.getKey().toString()); - result.add(pair.getValue().toString()); - } - } else if (param instanceof Stream) { - ((Stream) param).forEach(e -> { - if (e instanceof Object[]) { - Collections.addAll(result, (String[]) e); - } else { - result.add(e.toString()); - } - }); - } else if (param != null) { - result.add(param.toString()); - } - } - return result; - } - -} +package org.swisspush.reststorage.redis; + +import io.vertx.core.*; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; +import io.vertx.redis.client.Response; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.text.StrSubstitutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.reststorage.CollectionResource; +import org.swisspush.reststorage.DocumentResource; +import org.swisspush.reststorage.Resource; +import org.swisspush.reststorage.Storage; +import org.swisspush.reststorage.lock.Lock; +import org.swisspush.reststorage.lock.impl.RedisBasedLock; +import org.swisspush.reststorage.util.GZIPUtil; +import org.swisspush.reststorage.util.LockMode; +import org.swisspush.reststorage.util.ModuleConfiguration; +import org.swisspush.reststorage.util.ResourceNameUtil; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.*; +import java.util.stream.Stream; + +public class RedisStorage implements Storage { + + private final Logger log = LoggerFactory.getLogger(RedisStorage.class); + + // set to very high value = Wed Nov 16 5138 09:46:39 + private static final String MAX_EXPIRE_IN_MILLIS = "99999999999999"; + private final String EMPTY = ""; + private static final float MAX_PERCENTAGE = 100.0f; + private static final float MIN_PERCENTAGE = 0.0f; + private static final int CLEANUP_BULK_SIZE = 200; + + public static final String STORAGE_CLEANUP_TASK_LOCK = "storageCleanupTask"; + + private final String redisResourcesPrefix; + private final String redisCollectionsPrefix; + private final String redisDeltaResourcesPrefix; + private final String redisDeltaEtagsPrefix; + private final String expirableSet; + private final Integer resourceCleanupIntervalSec; + private final long cleanupResourcesAmount; + private final String redisLockPrefix; + private final Vertx vertx; + + private final Lock lock; + + private final RedisProvider redisProvider; + private final Map luaScripts = new HashMap<>(); + private final DecimalFormat decimalFormat; + + private Optional currentMemoryUsageOptional = Optional.empty(); + + private final String ID; + + public RedisStorage(Vertx vertx, ModuleConfiguration config, RedisProvider redisProvider) { + this.expirableSet = config.getExpirablePrefix(); + this.redisResourcesPrefix = config.getResourcesPrefix(); + this.redisCollectionsPrefix = config.getCollectionsPrefix(); + this.redisDeltaResourcesPrefix = config.getDeltaResourcesPrefix(); + this.redisDeltaEtagsPrefix = config.getDeltaEtagsPrefix(); + this.resourceCleanupIntervalSec = config.getResourceCleanupIntervalSec(); + this.cleanupResourcesAmount = config.getResourceCleanupAmount(); + this.redisLockPrefix = config.getLockPrefix(); + + this.vertx = vertx; + this.redisProvider = redisProvider; + this.decimalFormat = new DecimalFormat(); + this.decimalFormat.setMaximumFractionDigits(1); + + this.ID = UUID.randomUUID().toString(); + this.lock = new RedisBasedLock(redisProvider); + + // load all the lua scripts + LuaScriptState luaGetScriptState = new LuaScriptState(LuaScript.GET, false); + luaGetScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); + luaScripts.put(LuaScript.GET, luaGetScriptState); + + LuaScriptState luaStorageExpandScriptState = new LuaScriptState(LuaScript.STORAGE_EXPAND, false); + luaStorageExpandScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); + luaScripts.put(LuaScript.STORAGE_EXPAND, luaStorageExpandScriptState); + + LuaScriptState luaPutScriptState = new LuaScriptState(LuaScript.PUT, false); + luaPutScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); + luaScripts.put(LuaScript.PUT, luaPutScriptState); + + LuaScriptState luaDeleteScriptState = new LuaScriptState(LuaScript.DELETE, false); + luaDeleteScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); + luaScripts.put(LuaScript.DELETE, luaDeleteScriptState); + + LuaScriptState luaCleanupScriptState = new LuaScriptState(LuaScript.CLEANUP, false); + luaCleanupScriptState.loadLuaScript(new RedisCommandDoNothing(), 0); + luaScripts.put(LuaScript.CLEANUP, luaCleanupScriptState); + + if (config.isRejectStorageWriteOnLowMemory()) { + calculateCurrentMemoryUsage().onComplete(optionalAsyncResult -> currentMemoryUsageOptional = optionalAsyncResult.result()); + startPeriodicMemoryUsageUpdate(config.getFreeMemoryCheckIntervalMs()); + } + + if (resourceCleanupIntervalSec != null) { + startPeriodicStorageCleanup(resourceCleanupIntervalSec * 1000L); + } + } + + private void startPeriodicStorageCleanup(long intervalMs) { + vertx.setPeriodic(intervalMs, event -> lock.acquireLock(STORAGE_CLEANUP_TASK_LOCK, + token(STORAGE_CLEANUP_TASK_LOCK), lockExpiry(resourceCleanupIntervalSec)) + .onComplete(lockEvent -> { + if (lockEvent.succeeded()) { + if (lockEvent.result()) { + cleanup(cleanupEvent -> cleanupEvent.readStream.handler(this::logCleanupResult) + .endHandler(nothing -> cleanupEvent.closeHandler.handle(null)), + String.valueOf(cleanupResourcesAmount)); + } + } else { + log.error("Could not acquire lock '{}'. Message: {}", STORAGE_CLEANUP_TASK_LOCK, lockEvent.cause().getMessage()); + } + })); + } + + private long lockExpiry(long taskInterval) { + taskInterval = taskInterval * 1000; + if (taskInterval <= 1) { + return 1; + } + return taskInterval / 2; + } + + private String token(String appendix) { + return ID + "_" + System.currentTimeMillis() + "_" + appendix; + } + + private void startPeriodicMemoryUsageUpdate(long intervalMs) { + vertx.setPeriodic(intervalMs, updateMemoryUsage -> calculateCurrentMemoryUsage().onComplete(optionalAsyncResult -> currentMemoryUsageOptional = optionalAsyncResult.result())); + } + + public Future> calculateCurrentMemoryUsage() { + Promise> promise = Promise.promise(); + + redisProvider.redis().onSuccess(redisAPI -> redisAPI.info(Collections.singletonList("memory"), memoryInfo -> { + if (memoryInfo.failed()) { + log.error("Unable to get memory information from redis", memoryInfo.cause()); + promise.complete(Optional.empty()); + return; + } + + long totalSystemMemory; + try { + Optional totalSystemMemoryOpt = memoryInfo.result().toString() + .lines() + .filter(source -> source.startsWith("total_system_memory:")) + .findAny(); + if (totalSystemMemoryOpt.isEmpty()) { + log.warn("No 'total_system_memory' section received from redis. Unable to calculate the current memory usage"); + promise.complete(Optional.empty()); + return; + } + totalSystemMemory = Long.parseLong(totalSystemMemoryOpt.get().split(":")[1]); + if (totalSystemMemory == 0L) { + log.warn("'total_system_memory' value 0 received from redis. Unable to calculate the current memory usage"); + promise.complete(Optional.empty()); + return; + } + + } catch (NumberFormatException ex) { + logPropertyWarning("total_system_memory", ex); + promise.complete(Optional.empty()); + return; + } + + long usedMemory; + try { + Optional usedMemoryOpt = memoryInfo.result().toString() + .lines() + .filter(source -> source.startsWith("used_memory:")) + .findAny(); + if (usedMemoryOpt.isEmpty()) { + log.warn("No 'used_memory' section received from redis. Unable to calculate the current memory usage"); + promise.complete(Optional.empty()); + return; + } + usedMemory = Long.parseLong(usedMemoryOpt.get().split(":")[1]); + } catch (NumberFormatException ex) { + logPropertyWarning("used_memory", ex); + promise.complete(Optional.empty()); + return; + } + + float currentMemoryUsagePercentage = ((float) usedMemory / totalSystemMemory) * 100; + if (currentMemoryUsagePercentage > MAX_PERCENTAGE) { + currentMemoryUsagePercentage = MAX_PERCENTAGE; + } else if (currentMemoryUsagePercentage < MIN_PERCENTAGE) { + currentMemoryUsagePercentage = MIN_PERCENTAGE; + } + log.info("Current memory usage is {}%", decimalFormat.format(currentMemoryUsagePercentage)); + promise.complete(Optional.of(currentMemoryUsagePercentage)); + })) + .onFailure(event -> { + log.error("Unable to get memory information from redis", event); + promise.complete(Optional.empty()); + }); + return promise.future(); + } + + private void logPropertyWarning(String property, Exception ex) { + log.warn("No or invalid '{}' value received from redis. Unable to calculate the current memory usage. " + + "Exception: {}", property, ex.toString()); + } + + private void logCleanupResult(Object resultEvent) { + log.debug(resultEvent.toString()); + } + + private enum LuaScript { + GET("get.lua"), STORAGE_EXPAND("storageExpand.lua"), PUT("put.lua"), DELETE("del.lua"), CLEANUP("cleanup.lua"); + + private final String file; + + LuaScript(String file) { + this.file = file; + } + + public String getFile() { + return file; + } + } + + /** + * Holds the state of a lua script. + */ + private class LuaScriptState { + + private final LuaScript luaScriptType; + /** + * the script itself + */ + private String script; + /** + * if the script logs to the redis log + */ + private boolean logoutput = false; + /** + * the sha, over which the script can be accessed in redis + */ + private String sha; + + private LuaScriptState(LuaScript luaScriptType, boolean logoutput) { + this.luaScriptType = luaScriptType; + this.logoutput = logoutput; + this.composeLuaScript(luaScriptType); + this.loadLuaScript(new RedisCommandDoNothing(), 0); + } + + /** + * Reads the script from the classpath and removes logging output if logoutput is false. + * The script is stored in the class member script. + * + * @param luaScriptType + */ + private void composeLuaScript(LuaScript luaScriptType) { + log.info("read the lua script for script type: {} with logoutput: {}", luaScriptType, logoutput); + + // It is not possible to evalsha or eval inside lua scripts, + // so we wrap the cleanupscript around the deletescript manually to avoid code duplication. + // we have to comment the return, so that the cleanup script doesn't terminate + if (LuaScript.CLEANUP.equals(luaScriptType)) { + Map values = new HashMap<>(); + values.put("delscript", readLuaScriptFromClasspath(LuaScript.DELETE).replaceAll("return", "--return")); + StrSubstitutor sub = new StrSubstitutor(values, "--%(", ")"); + this.script = sub.replace(readLuaScriptFromClasspath(LuaScript.CLEANUP)); + } else { + this.script = readLuaScriptFromClasspath(luaScriptType); + } + this.sha = DigestUtils.sha1Hex(this.script); + } + + private String readLuaScriptFromClasspath(LuaScript luaScriptType) { + BufferedReader in = new BufferedReader(new InputStreamReader(this.getClass().getClassLoader().getResourceAsStream(luaScriptType.getFile()))); + StringBuilder sb; + try { + sb = new StringBuilder(); + String line; + while ((line = in.readLine()) != null) { + if (!logoutput && line.contains("redis.log(redis.LOG_NOTICE,")) { + continue; + } + sb.append(line).append("\n"); + } + + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + in.close(); + } catch (IOException e) { + // Ignore + } + } + return sb.toString(); + } + + /** + * Rereads the lua script, eg. if the loglevel changed. + */ + public void recomposeLuaScript() { + this.composeLuaScript(luaScriptType); + } + + /** + * Load the get script into redis and store the sha in the class member sha. + * + * @param redisCommand the redis command that should be executed, after the script is loaded. + * @param executionCounter a counter to control recursion depth + */ + public void loadLuaScript(final RedisCommand redisCommand, int executionCounter) { + final int executionCounterIncr = ++executionCounter; + + redisProvider.redis().onSuccess(redisAPI -> { + // check first if the lua script already exists in the store + redisAPI.script(Arrays.asList("exists", sha), resultArray -> { + if (resultArray.failed()) { + log.error("Error checking whether lua script exists", resultArray.cause()); + return; + } + Long exists = resultArray.result().get(0).toLong(); + // if script already + if (Long.valueOf(1).equals(exists)) { + log.debug("RedisStorage script already exists in redis cache: " + luaScriptType); + redisCommand.exec(executionCounterIncr); + } else { + log.info("load lua script for script type: {} logoutput: {}", luaScriptType, logoutput); + redisAPI.script(Arrays.asList("load", script), stringAsyncResult -> { + if (stringAsyncResult.failed()) { + log.error("Loading of lua script {} failed", luaScriptType); + return; + } + String newSha = stringAsyncResult.result().toString(); + log.info("got sha from redis for lua script: {}: {}", luaScriptType, newSha); + if (!newSha.equals(sha)) { + log.warn("the sha calculated by myself: {} doesn't match with the sha from redis: {}. " + + "We use the sha from redis", sha, newSha); + } + sha = newSha; + log.info("execute redis command for script type: {} with new sha: {}", luaScriptType, sha); + redisCommand.exec(executionCounterIncr); + }); + } + }); + }) + .onFailure(event -> log.error("Error checking whether lua script exists", event)); + } + + public String getScript() { + return script; + } + + public void setScript(String script) { + this.script = script; + } + + public boolean getLogoutput() { + return logoutput; + } + + public void setLogoutput(boolean logoutput) { + this.logoutput = logoutput; + } + + public String getSha() { + return sha; + } + + public void setSha(String sha) { + this.sha = sha; + } + } + + /** + * The interface for a redis command. + */ + private interface RedisCommand { + void exec(int executionCounter); + } + + /** + * A dummy that can be passed if no RedisCommand should be executed. + */ + private static class RedisCommandDoNothing implements RedisCommand { + + @Override + public void exec(int executionCounter) { + // do nothing here + } + } + + /** + * If the loglevel is trace and the logoutput in luaScriptState is false, then reload the script with logoutput and execute the RedisCommand. + * If the loglevel is not trace and the logoutput in luaScriptState is true, then reload the script without logoutput and execute the RedisCommand. + * If the loglevel is matching the luaScriptState, just execute the RedisCommand. + * + * @param luaScript the type of lua script + * @param redisCommand the redis command to execute + */ + private void reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript luaScript, RedisCommand redisCommand, int executionCounter) { + boolean logoutput = log.isTraceEnabled(); + LuaScriptState luaScriptState = luaScripts.get(luaScript); + // if the loglevel didn't change, execute the command and return + if (logoutput == luaScriptState.getLogoutput()) { + redisCommand.exec(executionCounter); + return; + // if the loglevel changed, set the new loglevel into the luaScriptState, recompose the script and provide the redisCommand as parameter to execute + } else if (logoutput && !luaScriptState.getLogoutput()) { + luaScriptState.setLogoutput(true); + luaScriptState.recomposeLuaScript(); + + } else if (!logoutput && luaScriptState.getLogoutput()) { + luaScriptState.setLogoutput(false); + luaScriptState.recomposeLuaScript(); + } + luaScriptState.loadLuaScript(redisCommand, executionCounter); + } + + public class ByteArrayReadStream implements ReadStream { + + final ByteArrayInputStream content; + int size; + boolean paused; + int position; + Handler endHandler; + Handler handler; + + public ByteArrayReadStream(byte[] byteArray) { + size = byteArray.length; + content = new ByteArrayInputStream(byteArray); + } + + private void doRead() { + vertx.runOnContext(v -> { + if (!paused) { + if (position < size) { + int toRead = 8192; + if (position + toRead > size) { + toRead = size - position; + } + byte[] bytes = new byte[toRead]; + content.read(bytes, 0, toRead); + handler.handle(Buffer.buffer(bytes)); + position += toRead; + doRead(); + } else { + endHandler.handle(null); + } + } + }); + } + + public ByteArrayReadStream resume() { + paused = false; + doRead(); + return this; + } + + @Override + public ReadStream fetch(long amount) { + return null; + } + + @Override + public ByteArrayReadStream pause() { + paused = true; + return this; + } + + @Override + public ByteArrayReadStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public ReadStream handler(Handler handler) { + this.handler = handler; + doRead(); + return this; + } + + @Override + public ByteArrayReadStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + } + + @Override + public Optional getCurrentMemoryUsage() { + return currentMemoryUsageOptional; + } + + @Override + public void get(String path, String etag, int offset, int limit, final Handler handler) { + final String key = encodePath(path); + List keys = Collections.singletonList(key); + List arguments = Arrays.asList( + redisResourcesPrefix, + redisCollectionsPrefix, + expirableSet, + String.valueOf(System.currentTimeMillis()), + MAX_EXPIRE_IN_MILLIS, + String.valueOf(offset), + String.valueOf(limit), + etag + ); + reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.GET, new Get(keys, arguments, handler), 0); + } + + /** + * The Get Command Execution. + * If the get script cannot be found under the sha in luaScriptState, reload the script. + * To avoid infinite recursion, we limit the recursion. + */ + private class Get implements RedisCommand { + + private final List keys; + private final List arguments; + private final Handler handler; + + public Get(List keys, List arguments, final Handler handler) { + this.keys = keys; + this.arguments = arguments; + this.handler = handler; + } + + public void exec(final int executionCounter) { + List args = toPayload(luaScripts.get(LuaScript.GET).getSha(), keys.size(), keys, arguments); + redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { + if (event.succeeded()) { + Response values = event.result(); + if (log.isTraceEnabled()) { + log.trace("RedisStorage get result: {}", values); + } + if ("notModified".equals(values.toString())) { + notModified(handler); + } else if ("notFound".equals(values.toString())) { + notFound(handler); + } else { + handleJsonArrayValues(values, handler, "0".equals(arguments.get(5)) && + "-1".equals(arguments.get(6))); + } + } else { + String message = event.cause().getMessage(); + if (message != null && message.startsWith("NOSCRIPT")) { + log.warn("get script couldn't be found, reload it"); + log.warn("amount the script got loaded: {}", executionCounter); + if (executionCounter > 10) { + log.error("amount the script got loaded is higher than 10, we abort"); + } else { + luaScripts.get(LuaScript.GET).loadLuaScript(new Get(keys, arguments, handler), executionCounter); + } + } else { + log.error("GET request failed with message: {}", message); + } + } + })) + .onFailure(event -> log.error("Redis: GET request failed", event)); + } + } + + @Override + public void storageExpand(String path, String etag, List subResources, Handler handler) { + final String key = encodePath(path); + List keys = Collections.singletonList(key); + List arguments = Arrays.asList( + redisResourcesPrefix, + redisCollectionsPrefix, + expirableSet, + String.valueOf(System.currentTimeMillis()), + MAX_EXPIRE_IN_MILLIS, + StringUtils.join(subResources, ";"), + String.valueOf(subResources.size()) + ); + reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.STORAGE_EXPAND, new StorageExpand(keys, arguments, handler, etag), 0); + } + + /** + * The StorageExpand Command Execution. + * If the get script cannot be found under the sha in luaScriptState, reload the script. + * To avoid infinite recursion, we limit the recursion. + */ + private class StorageExpand implements RedisCommand { + + private final List keys; + private final List arguments; + private final Handler handler; + private final String etag; + + public StorageExpand(List keys, List arguments, final Handler handler, String etag) { + this.keys = keys; + this.arguments = arguments; + this.handler = handler; + this.etag = etag; + } + + public void exec(final int executionCounter) { + List args = toPayload(luaScripts.get(LuaScript.STORAGE_EXPAND).getSha(), keys.size(), keys, arguments); + + redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { + if (event.succeeded()) { + String value = event.result().toString(); + if (log.isTraceEnabled()) { + log.trace("RedisStorage get result: {}", value); + } + if ("compressionNotSupported".equalsIgnoreCase(value)) { + error(handler, "Collections having compressed resources are not supported in storage expand"); + return; + } + if ("notFound".equalsIgnoreCase(value)) { + notFound(handler); + return; + } + JsonObject expandResult = new JsonObject(); + + JsonArray resultArr = new JsonArray(value); + + for (Object resultEntry : resultArr) { + JsonArray entries = (JsonArray) resultEntry; + String subResourceName = ResourceNameUtil.resetReplacedColonsAndSemiColons(entries.getString(0)); + String subResourceValue = entries.getString(1); + if (subResourceValue.startsWith("[") && subResourceValue.endsWith("]")) { + expandResult.put(subResourceName, extractSortedJsonArray(subResourceValue)); + } else { + try { + expandResult.put(subResourceName, new JsonObject(subResourceValue)); + } catch (DecodeException ex) { + invalid(handler, "Error decoding invalid json resource '" + subResourceName + "'"); + return; + } + } + } + + byte[] finalExpandedContent = decodeBinary(expandResult.encode()); + String calcDigest = DigestUtils.sha1Hex(finalExpandedContent); + + if (calcDigest.equals(etag)) { + notModified(handler); + } else { + DocumentResource r = new DocumentResource(); + r.readStream = new ByteArrayReadStream(finalExpandedContent); + r.length = finalExpandedContent.length; + r.etag = calcDigest; + r.closeHandler = event1 -> { + // nothing to close + }; + handler.handle(r); + } + } else { + String message = event.cause().getMessage(); + if (message != null && message.startsWith("NOSCRIPT")) { + log.warn("storageExpand script couldn't be found, reload it"); + log.warn("amount the script got loaded: {}", executionCounter); + if (executionCounter > 10) { + log.error("amount the script got loaded is higher than 10, we abort"); + } else { + luaScripts.get(LuaScript.STORAGE_EXPAND).loadLuaScript( + new StorageExpand(keys, arguments, handler, etag), executionCounter); + } + } else { + log.error("StorageExpand request failed with message: {}", message); + } + } + })) + .onFailure(event -> log.error("Redis: StorageExpand request failed", event)); + } + } + + private JsonArray extractSortedJsonArray(String arrayString) { + String arrayContent = arrayString.replaceAll("\\[", EMPTY).replaceAll("\\]", EMPTY) + .replaceAll("\"", EMPTY).replaceAll("\\\\", EMPTY); + String[] splitted = StringUtils.split(arrayContent, ","); + List resources = new ArrayList<>(); + List collections = new ArrayList<>(); + for (String split : splitted) { + if (split.endsWith("/")) { + collections.add(split); + } else { + resources.add(split); + } + } + Collections.sort(collections); + collections.addAll(resources); + return new JsonArray(new ArrayList(collections)); + } + + private void handleJsonArrayValues(Response values, Handler handler, boolean allowEmptyReturn) { + String type = values.get(0).toString(); + if ("TYPE_RESOURCE".equals(type)) { + String valueStr = values.get(1).toString(); + DocumentResource r = new DocumentResource(); + byte[] content = decodeBinary(valueStr); + if (values.get(3) != null) { + // data is compressed + GZIPUtil.decompressResource(vertx, log, content, decompressedResult -> { + if (decompressedResult.succeeded()) { + r.readStream = new ByteArrayReadStream(decompressedResult.result()); + r.length = decompressedResult.result().length; + r.etag = values.get(2).toString(); + r.closeHandler = event -> { + // nothing to close + }; + handler.handle(r); + } else { + error(handler, "Error during decompression of resource: " + decompressedResult.cause().getMessage()); + } + }); + } else { + r.readStream = new ByteArrayReadStream(content); + r.length = content.length; + Response etagRsp = values.get(2); + r.etag = etagRsp == null ? null : etagRsp.toString(); + r.closeHandler = event -> { + // nothing to close + }; + handler.handle(r); + } + } else if ("TYPE_COLLECTION".equals(type)) { + CollectionResource r = new CollectionResource(); + Set items = new HashSet<>(); + for (Response value : values) { + String member = value.toString(); + if (!"TYPE_COLLECTION".equals(member)) { + if (member.endsWith(":")) { + member = member.replaceAll(":$", ""); + CollectionResource c = new CollectionResource(); + c.name = member; + items.add(c); + } else { + DocumentResource d = new DocumentResource(); + d.name = member; + items.add(d); + } + } + } + if (allowEmptyReturn && items.size() == 0) { + notFound(handler); + } else { + r.items = new ArrayList<>(items); + Collections.sort(r.items); + handler.handle(r); + } + } else { + notFound(handler); + } + } + + static class ByteArrayWriteStream implements WriteStream { + + private ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + public byte[] getBytes() { + return bos.toByteArray(); + } + + @Override + public ByteArrayWriteStream setWriteQueueMaxSize(int maxSize) { + return this; + } + + @Override + public boolean writeQueueFull() { + return false; + } + + @Override + public ByteArrayWriteStream drainHandler(Handler handler) { + return this; + } + + @Override + public ByteArrayWriteStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public Future write(Buffer data) { + try { + bos.write(data.getBytes()); + return Future.succeededFuture(); + } catch (IOException e) { + return Future.failedFuture(e.getMessage()); + } + + } + + @Override + public void write(Buffer data, Handler> handler) { + write(data).onComplete(handler); + } + + @Override + public Future end() { + try { + bos.close(); + return Future.succeededFuture(); + } catch (IOException e) { + return Future.failedFuture(e.getMessage()); + } + } + + @Override + public void end(Handler> handler) { + end().onComplete(handler); + } + } + + private String initEtagValue(String providedEtag) { + if (!isEmpty(providedEtag)) { + return providedEtag; + } + return UUID.randomUUID().toString(); + } + + + @Override + public void put(String path, final String etag, final boolean merge, final long expire, final String lockOwner, + final LockMode lockMode, final long lockExpire, final Handler handler) { + put(path, etag, merge, expire, lockOwner, lockMode, lockExpire, false, handler); + } + + @Override + public void put(String path, final String etag, final boolean merge, final long expire, final Handler handler) { + put(path, etag, merge, expire, "", LockMode.SILENT, 0, handler); + } + + @Override + public void put(String path, String etag, boolean merge, long expire, String lockOwner, LockMode lockMode, + long lockExpire, boolean storeCompressed, Handler handler) { + final String key = encodePath(path); + final DocumentResource d = new DocumentResource(); + final ByteArrayWriteStream stream = new ByteArrayWriteStream(); + + final String etagValue = initEtagValue(etag); + d.writeStream = stream; + d.closeHandler = event -> { + String expireInMillis = MAX_EXPIRE_IN_MILLIS; + if (expire > -1) { + expireInMillis = String.valueOf(System.currentTimeMillis() + (expire * 1000)); + } + + if (Long.parseLong(expireInMillis) > Long.parseLong(MAX_EXPIRE_IN_MILLIS)) { + // #76 reset to the defined max value + expireInMillis = MAX_EXPIRE_IN_MILLIS; + } + + String lockExpireInMillis = String.valueOf(System.currentTimeMillis() + (lockExpire * 1000)); + + List keys = Collections.singletonList(key); + + if (storeCompressed) { + String finalExpireInMillis = expireInMillis; + GZIPUtil.compressResource(vertx, log, stream.getBytes(), compressResourceResult -> { + if (compressResourceResult.succeeded()) { + List arg = Arrays.asList( + redisResourcesPrefix, + redisCollectionsPrefix, + expirableSet, + merge ? "true" : "false", + finalExpireInMillis, + MAX_EXPIRE_IN_MILLIS, + encodeBinary(compressResourceResult.result()), + etagValue, + redisLockPrefix, + lockOwner, + lockMode.text(), + lockExpireInMillis, + storeCompressed ? "1" : "0" + ); + reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.PUT, new Put(d, keys, arg, handler), 0); + } else { + error(handler, "Error during compression of resource"); + } + }); + } else { + List arguments = Arrays.asList( + redisResourcesPrefix, + redisCollectionsPrefix, + expirableSet, + merge ? "true" : "false", + expireInMillis, + MAX_EXPIRE_IN_MILLIS, + encodeBinary(stream.getBytes()), + etagValue, + redisLockPrefix, + lockOwner, + lockMode.text(), + lockExpireInMillis, + storeCompressed ? "1" : "0" + ); + reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.PUT, new Put(d, keys, arguments, handler), 0); + } + }; + handler.handle(d); + } + + /** + * The Put Command Execution. + * If the get script cannot be found under the sha in luaScriptState, reload the script. + * To avoid infinite recursion, we limit the recursion. + */ + private class Put implements RedisCommand { + + private final DocumentResource d; + private final List keys; + private final List arguments; + private final Handler handler; + + public Put(DocumentResource d, List keys, List arguments, Handler handler) { + this.d = d; + this.keys = keys; + this.arguments = arguments; + this.handler = handler; + } + + public void exec(final int executionCounter) { + List args = toPayload(luaScripts.get(LuaScript.PUT).getSha(), keys.size(), keys, arguments); + + redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { + if (event.succeeded()) { + String result = event.result().toString(); + if (log.isTraceEnabled()) { + log.trace("RedisStorage successful put. Result: {}", result); + } + if (result != null && result.startsWith("existingCollection")) { + CollectionResource c = new CollectionResource(); + handler.handle(c); + } else if (result != null && result.startsWith("existingResource")) { + DocumentResource d = new DocumentResource(); + d.exists = false; + handler.handle(d); + } else if ("notModified".equals(result)) { + notModified(handler); + } else if (LockMode.REJECT.text().equals(result)) { + rejected(handler); + } else { + d.endHandler.handle(null); + } + } else { + String message = event.cause().getMessage(); + if (message != null && message.startsWith("NOSCRIPT")) { + log.warn("put script couldn't be found, reload it"); + log.warn("amount the script got loaded: {}", executionCounter); + if (executionCounter > 10) { + log.error("amount the script got loaded is higher than 10, we abort"); + } else { + luaScripts.get(LuaScript.PUT).loadLuaScript(new Put(d, keys, arguments, handler), executionCounter); + } + } else if (message != null && d.errorHandler != null) { + log.error("PUT request failed with message: {}", message); + d.errorHandler.handle(event.cause()); + } + } + })) + .onFailure(event -> log.error("Redis: PUT request failed", event)); + } + } + + @Override + public void delete(String path, String lockOwner, LockMode lockMode, long lockExpire, boolean confirmCollectionDelete, + boolean deleteRecursive, final Handler handler) { + final String key = encodePath(path); + List keys = Collections.singletonList(key); + + String lockExpireInMillis = String.valueOf(System.currentTimeMillis() + (lockExpire * 1000)); + + List arguments = Arrays.asList( + redisResourcesPrefix, + redisCollectionsPrefix, + redisDeltaResourcesPrefix, + redisDeltaEtagsPrefix, + expirableSet, + String.valueOf(System.currentTimeMillis()), + MAX_EXPIRE_IN_MILLIS, + confirmCollectionDelete ? "true" : "false", + deleteRecursive ? "true" : "false", + redisLockPrefix, + lockOwner, + lockMode.text(), + lockExpireInMillis + ); + reloadScriptIfLoglevelChangedAndExecuteRedisCommand(LuaScript.DELETE, new Delete(keys, arguments, handler), 0); + } + + /** + * The Delete Command Execution. + * If the get script cannot be found under the sha in luaScriptState, reload the script. + * To avoid infinite recursion, we limit the recursion. + */ + private class Delete implements RedisCommand { + + private final List keys; + private final List arguments; + private final Handler handler; + + public Delete(List keys, List arguments, final Handler handler) { + this.keys = keys; + this.arguments = arguments; + this.handler = handler; + } + + public void exec(final int executionCounter) { + List args = toPayload(luaScripts.get(LuaScript.DELETE).getSha(), keys.size(), keys, arguments); + + redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { + if (event.cause() != null && event.cause().getMessage().startsWith("NOSCRIPT")) { + log.warn("delete script couldn't be found, reload it"); + log.warn("amount the script got loaded: {}", executionCounter); + if (executionCounter > 10) { + log.error("amount the script got loaded is higher than 10, we abort"); + } else { + luaScripts.get(LuaScript.DELETE).loadLuaScript(new Delete(keys, arguments, handler), executionCounter); + } + return; + } + + String result = null; + if (event.result() != null) { + result = event.result().toString(); + } + if (log.isTraceEnabled()) { + log.trace("RedisStorage delete result: {}", result); + } + if ("notEmpty".equals(result)) { + notEmpty(handler); + return; + } + if ("notFound".equals(result)) { + notFound(handler); + return; + } else if (LockMode.REJECT.text().equals(result)) { + rejected(handler); + return; + } + Resource r = new Resource(); + handler.handle(r); + })) + .onFailure(event -> log.error("Redis: DELETE request failed", event)); + } + } + + /** + * Cleans up the outdated resources recursive. + * If the script which is referred over the luaScriptState.sha, the execution is aborted and the script is reloaded. + * + * @param handler the handler to execute + * @param cleanedLastRun how many resources were cleaned in the last run + * @param maxdel max resources to clean + * @param bulkSize how many resources should be cleaned in one run + */ + public void cleanupRecursive(final Handler handler, final long cleanedLastRun, final long maxdel, + final int bulkSize) { + List arguments = Arrays.asList( + redisResourcesPrefix, + redisCollectionsPrefix, + redisDeltaResourcesPrefix, + redisDeltaEtagsPrefix, + expirableSet, + "0", + MAX_EXPIRE_IN_MILLIS, + "false", + "true", + String.valueOf(System.currentTimeMillis()), + String.valueOf(bulkSize) + ); + List args = toPayload(luaScripts.get(LuaScript.CLEANUP).getSha(), 0, Collections.emptyList(), arguments); + + redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> { + if (log.isTraceEnabled()) { + log.trace("RedisStorage cleanup resources succeeded: {}", event.succeeded()); + } + + if (event.failed() && event.cause() != null && event.cause().getMessage().startsWith("NOSCRIPT")) { + log.warn("the cleanup script is not loaded. Load it and exit. The Cleanup will success the next time"); + luaScripts.get(LuaScript.CLEANUP).loadLuaScript(new RedisCommandDoNothing(), 0); + return; + } + + long cleanedThisRun = 0; + if (event.succeeded() && event.result().toLong() != null) { + cleanedThisRun = event.result().toLong(); + } + if (log.isTraceEnabled()) { + log.trace("RedisStorage cleanup resources cleanded this run: {}", cleanedThisRun); + } + final long cleaned = cleanedLastRun + cleanedThisRun; + if (cleanedThisRun != 0 && cleaned < maxdel) { + if (log.isTraceEnabled()) { + log.trace("RedisStorage cleanup resources call recursive next bulk"); + } + cleanupRecursive(handler, cleaned, maxdel, bulkSize); + } else { + redisAPI.zcount(expirableSet, "0", String.valueOf(System.currentTimeMillis()), longAsyncResult -> { + Long result = longAsyncResult.result().toLong(); + if (log.isTraceEnabled()) { + log.trace("RedisStorage cleanup resources zcount on expirable set: {}", result); + } + int resToCleanLeft = 0; + if (result != null && result.intValue() >= 0) { + resToCleanLeft = result.intValue(); + } + JsonObject retObj = new JsonObject(); + retObj.put("cleanedResources", cleaned); + retObj.put("expiredResourcesLeft", resToCleanLeft); + DocumentResource r = new DocumentResource(); + byte[] content = decodeBinary(retObj.toString()); + r.readStream = new ByteArrayReadStream(content); + r.length = content.length; + r.closeHandler = event1 -> { + // nothing to close + }; + handler.handle(r); + }); + } + })) + .onFailure(event -> log.error("Redis: cleanupRecursive failed", event)); + } + + private String encodePath(String path) { + if (path.equals("/")) { + path = ""; + } + return ResourceNameUtil.replaceColonsAndSemiColons(path).replaceAll("/", ":"); + } + + private String encodeBinary(byte[] bytes) { + return new String(bytes, StandardCharsets.ISO_8859_1); + } + + private byte[] decodeBinary(String s) { + return s.getBytes(StandardCharsets.ISO_8859_1); + } + + private void notFound(Handler handler) { + Resource r = new Resource(); + r.exists = false; + handler.handle(r); + } + + private void notEmpty(Handler handler) { + Resource r = new Resource(); + r.error = true; + r.errorMessage = "directory not empty. Use recursive=true parameter to delete"; + handler.handle(r); + } + + private void notModified(Handler handler) { + Resource r = new Resource(); + r.modified = false; + handler.handle(r); + } + + private void rejected(Handler handler) { + Resource r = new Resource(); + r.rejected = true; + handler.handle(r); + } + + private void invalid(Handler handler, String invalidMessage) { + Resource r = new Resource(); + r.invalid = true; + r.invalidMessage = invalidMessage; + handler.handle(r); + } + + private void error(Handler handler, String errorMessage) { + Resource r = new Resource(); + r.error = true; + r.errorMessage = errorMessage; + handler.handle(r); + } + + @Override + public void cleanup(Handler handler, String cleanupResourcesAmountStr) { + long cleanupResourcesAmountUsed = cleanupResourcesAmount; + if (log.isTraceEnabled()) { + log.trace("RedisStorage cleanup resources, cleanupResourcesAmount: {}", cleanupResourcesAmountUsed); + } + try { + cleanupResourcesAmountUsed = Long.parseLong(cleanupResourcesAmountStr); + } catch (Exception e) { + // do nothing + } + cleanupRecursive(handler, 0, cleanupResourcesAmountUsed, CLEANUP_BULK_SIZE); + } + + private boolean isEmpty(CharSequence cs) { + return cs == null || cs.length() == 0; + } + + /** + * from https://github.com/vert-x3/vertx-redis-client/blob/3.9/src/main/java/io/vertx/redis/impl/RedisClientImpl.java#L94 + * + * @param parameters + * @return + */ + private static List toPayload(Object... parameters) { + List result = new ArrayList<>(parameters.length); + + for (Object param : parameters) { + // unwrap + if (param instanceof JsonArray) { + param = ((JsonArray) param).getList(); + } + // unwrap + if (param instanceof JsonObject) { + param = ((JsonObject) param).getMap(); + } + + if (param instanceof Collection) { + ((Collection) param).stream().filter(Objects::nonNull).forEach(o -> result.add(o.toString())); + } else if (param instanceof Map) { + for (Map.Entry pair : ((Map) param).entrySet()) { + result.add(pair.getKey().toString()); + result.add(pair.getValue().toString()); + } + } else if (param instanceof Stream) { + ((Stream) param).forEach(e -> { + if (e instanceof Object[]) { + Collections.addAll(result, (String[]) e); + } else { + result.add(e.toString()); + } + }); + } else if (param != null) { + result.add(param.toString()); + } + } + return result; + } + +} diff --git a/src/main/java/org/swisspush/reststorage/redis/RedisUtils.java b/src/main/java/org/swisspush/reststorage/redis/RedisUtils.java new file mode 100644 index 0000000..7c4ffd7 --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/redis/RedisUtils.java @@ -0,0 +1,59 @@ +package org.swisspush.reststorage.redis; + +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; + +import java.util.*; +import java.util.stream.Stream; + +/** + * Useful utilities for Redis + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public final class RedisUtils { + + private RedisUtils() {} + + /** + * from https://github.com/vert-x3/vertx-redis-client/blob/3.9/src/main/java/io/vertx/redis/impl/RedisClientImpl.java#L94 + * + * @param parameters + * @return + */ + public static List toPayload(Object... parameters) { + List result = new ArrayList<>(parameters.length); + + for (Object param : parameters) { + // unwrap + if (param instanceof JsonArray) { + param = ((JsonArray) param).getList(); + } + // unwrap + if (param instanceof JsonObject) { + param = ((JsonObject) param).getMap(); + } + + if (param instanceof Collection) { + ((Collection) param).stream().filter(Objects::nonNull).forEach(o -> result.add(o.toString())); + } else if (param instanceof Map) { + for (Map.Entry pair : ((Map) param).entrySet()) { + result.add(pair.getKey().toString()); + result.add(pair.getValue().toString()); + } + } else if (param instanceof Stream) { + ((Stream) param).forEach(e -> { + if (e instanceof Object[]) { + Collections.addAll(result, (String[]) e); + } else { + result.add(e.toString()); + } + }); + } else if (param != null) { + result.add(param.toString()); + } + } + return result; + } + +} diff --git a/src/main/java/org/swisspush/reststorage/util/FailedAsyncResult.java b/src/main/java/org/swisspush/reststorage/util/FailedAsyncResult.java new file mode 100644 index 0000000..d3c36b9 --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/util/FailedAsyncResult.java @@ -0,0 +1,31 @@ +package org.swisspush.reststorage.util; + +import io.vertx.core.AsyncResult; + +public class FailedAsyncResult implements AsyncResult { + private final Throwable cause; + + public FailedAsyncResult(Throwable cause) { + this.cause = cause; + } + + @Override + public T result() { + return null; + } + + @Override + public Throwable cause() { + return cause; + } + + @Override + public boolean succeeded() { + return false; + } + + @Override + public boolean failed() { + return true; + } +} diff --git a/src/main/java/org/swisspush/reststorage/util/LockMode.java b/src/main/java/org/swisspush/reststorage/util/LockMode.java index 07fa466..dcf5c1f 100644 --- a/src/main/java/org/swisspush/reststorage/util/LockMode.java +++ b/src/main/java/org/swisspush/reststorage/util/LockMode.java @@ -11,7 +11,7 @@ public enum LockMode { SILENT("silent"), REJECT("reject"); - private String lockMode; + private final String lockMode; LockMode(String lockMode) { this.lockMode = lockMode; diff --git a/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java b/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java index 85d7de0..98f190a 100644 --- a/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java +++ b/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java @@ -32,12 +32,19 @@ public enum StorageType { private String redisHost = "localhost"; private int redisPort = 6379; private boolean redisEnableTls; + /** + * @deprecated Instance authentication is considered as legacy. With Redis from 6.x on the ACL authentication method should be used. + */ + @Deprecated(since = "3.0.17") private String redisAuth = null; + private String redisPassword = null; + private String redisUser = null; private String expirablePrefix = "rest-storage:expirable"; private String resourcesPrefix = "rest-storage:resources"; private String collectionsPrefix = "rest-storage:collections"; private String deltaResourcesPrefix = "delta:resources"; private String deltaEtagsPrefix = "delta:etags"; + private Integer resourceCleanupIntervalSec = null; private long resourceCleanupAmount = 100_000L; private String lockPrefix = "rest-storage:locks"; private boolean confirmCollectionDelete = false; @@ -119,11 +126,22 @@ public ModuleConfiguration redisEnableTls(boolean redisEnableTls) { return this; } + @Deprecated(since = "3.0.17") public ModuleConfiguration redisAuth(String redisAuth) { this.redisAuth = redisAuth; return this; } + public ModuleConfiguration redisPassword(String redisPassword) { + this.redisPassword = redisPassword; + return this; + } + + public ModuleConfiguration redisUser(String redisUser) { + this.redisUser = redisUser; + return this; + } + public ModuleConfiguration expirablePrefix(String expirablePrefix) { this.expirablePrefix = expirablePrefix; return this; @@ -154,6 +172,16 @@ public ModuleConfiguration resourceCleanupAmount(long resourceCleanupAmount) { return this; } + public ModuleConfiguration resourceCleanupIntervalSec(Integer resourceCleanupIntervalSec) { + if(resourceCleanupIntervalSec == null || resourceCleanupIntervalSec < 1){ + log.warn("Resource cleanup interval value is either null or negative. Interval cleanup will not be activated"); + this.resourceCleanupIntervalSec = null; + }else { + this.resourceCleanupIntervalSec = resourceCleanupIntervalSec; + } + return this; + } + public ModuleConfiguration lockPrefix(String lockPrefix) { this.lockPrefix = lockPrefix; return this; @@ -250,6 +278,14 @@ public String getRedisAuth() { return redisAuth; } + public String getRedisPassword() { + return redisPassword; + } + + public String getRedisUser() { + return redisUser; + } + public String getExpirablePrefix() { return expirablePrefix; } @@ -270,6 +306,10 @@ public String getDeltaEtagsPrefix() { return deltaEtagsPrefix; } + public Integer getResourceCleanupIntervalSec() { + return resourceCleanupIntervalSec; + } + public long getResourceCleanupAmount() { return resourceCleanupAmount; } @@ -311,8 +351,7 @@ public JsonObject asJsonObject() { } public static ModuleConfiguration fromJsonObject(JsonObject json) { - ModuleConfiguration mc = json.mapTo(ModuleConfiguration.class); - return mc; + return json.mapTo(ModuleConfiguration.class); } @Override diff --git a/src/main/resources/cleanup.lua b/src/main/resources/cleanup.lua index c2e69df..104f766 100644 --- a/src/main/resources/cleanup.lua +++ b/src/main/resources/cleanup.lua @@ -12,7 +12,7 @@ local bulksize = tonumber(ARGV[11]) -- Important: The ARGV-Array is used again in the included del.lua script -- (see this funny comment with the percent sign below and Java-Method --- org.swisspush.reststorage.RedisStorage.LuaScriptState.composeLuaScript) +-- org.swisspush.reststorage.redis.RedisStorage.LuaScriptState.composeLuaScript) -- we need to initialize all parameters for del.lua here - otherwise we can have side effects -- See open issue https://github.com/swisspush/vertx-rest-storage/issues/83 ARGV[10] = '' diff --git a/src/main/resources/lock_release.lua b/src/main/resources/lock_release.lua new file mode 100644 index 0000000..d4968d1 --- /dev/null +++ b/src/main/resources/lock_release.lua @@ -0,0 +1,8 @@ +local lockKey = KEYS[1] +local token = ARGV[1] + +if redis.call("get", lockKey) == token then + return redis.call("del", lockKey) +else + return 0 +end \ No newline at end of file diff --git a/src/test/java/org/swisspush/reststorage/CleanupIntegrationTest.java b/src/test/java/org/swisspush/reststorage/CleanupIntegrationTest.java index 962adad..2a193de 100644 --- a/src/test/java/org/swisspush/reststorage/CleanupIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/CleanupIntegrationTest.java @@ -6,6 +6,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Test; import org.junit.runner.RunWith; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import java.util.concurrent.TimeUnit; diff --git a/src/test/java/org/swisspush/reststorage/CrudIntegrationTest.java b/src/test/java/org/swisspush/reststorage/CrudIntegrationTest.java index 5216fb5..b082fed 100644 --- a/src/test/java/org/swisspush/reststorage/CrudIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/CrudIntegrationTest.java @@ -8,6 +8,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Test; import org.junit.runner.RunWith; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import java.util.List; diff --git a/src/test/java/org/swisspush/reststorage/EtagIntegrationTest.java b/src/test/java/org/swisspush/reststorage/EtagIntegrationTest.java index 06eeeaa..a98c3cc 100644 --- a/src/test/java/org/swisspush/reststorage/EtagIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/EtagIntegrationTest.java @@ -5,6 +5,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Test; import org.junit.runner.RunWith; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import static io.restassured.RestAssured.*; import static org.hamcrest.CoreMatchers.equalTo; diff --git a/src/test/java/org/swisspush/reststorage/ExpirationIntegrationTest.java b/src/test/java/org/swisspush/reststorage/ExpirationIntegrationTest.java index 5778bff..4fd68fb 100644 --- a/src/test/java/org/swisspush/reststorage/ExpirationIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/ExpirationIntegrationTest.java @@ -5,6 +5,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Test; import org.junit.runner.RunWith; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import java.util.concurrent.TimeUnit; diff --git a/src/test/java/org/swisspush/reststorage/LockIntegrationTest.java b/src/test/java/org/swisspush/reststorage/LockIntegrationTest.java index 90d9bd6..1513958 100644 --- a/src/test/java/org/swisspush/reststorage/LockIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/LockIntegrationTest.java @@ -5,6 +5,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Test; import org.junit.runner.RunWith; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import org.swisspush.reststorage.util.LockMode; import org.swisspush.reststorage.util.StatusCode; diff --git a/src/test/java/org/swisspush/reststorage/OffsetIntegrationTest.java b/src/test/java/org/swisspush/reststorage/OffsetIntegrationTest.java index b85e0a4..048bef3 100644 --- a/src/test/java/org/swisspush/reststorage/OffsetIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/OffsetIntegrationTest.java @@ -5,6 +5,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Test; import org.junit.runner.RunWith; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import static io.restassured.RestAssured.given; import static io.restassured.RestAssured.with; diff --git a/src/test/java/org/swisspush/reststorage/PathLevelIntegrationTest.java b/src/test/java/org/swisspush/reststorage/PathLevelIntegrationTest.java index bbeb30d..a78902d 100644 --- a/src/test/java/org/swisspush/reststorage/PathLevelIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/PathLevelIntegrationTest.java @@ -6,6 +6,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Test; import org.junit.runner.RunWith; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import static io.restassured.RestAssured.*; diff --git a/src/test/java/org/swisspush/reststorage/RedirectIntegrationTest.java b/src/test/java/org/swisspush/reststorage/RedirectIntegrationTest.java index b9ece21..401cdc9 100644 --- a/src/test/java/org/swisspush/reststorage/RedirectIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/RedirectIntegrationTest.java @@ -7,6 +7,7 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Test; import org.junit.runner.RunWith; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import static io.restassured.RestAssured.get; import static io.restassured.RestAssured.with; diff --git a/src/test/java/org/swisspush/reststorage/Return200onDeleteNonExistingTest.java b/src/test/java/org/swisspush/reststorage/Return200onDeleteNonExistingTest.java index ca59be6..a8fc0e0 100644 --- a/src/test/java/org/swisspush/reststorage/Return200onDeleteNonExistingTest.java +++ b/src/test/java/org/swisspush/reststorage/Return200onDeleteNonExistingTest.java @@ -1,6 +1,7 @@ package org.swisspush.reststorage; import org.junit.Test; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import org.swisspush.reststorage.util.ModuleConfiguration; import static io.restassured.RestAssured.when; diff --git a/src/test/java/org/swisspush/reststorage/Return404onDeleteNonExistingTest.java b/src/test/java/org/swisspush/reststorage/Return404onDeleteNonExistingTest.java index eb1a18e..efef8ef 100644 --- a/src/test/java/org/swisspush/reststorage/Return404onDeleteNonExistingTest.java +++ b/src/test/java/org/swisspush/reststorage/Return404onDeleteNonExistingTest.java @@ -1,6 +1,7 @@ package org.swisspush.reststorage; import org.junit.Test; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import static io.restassured.RestAssured.when; diff --git a/src/test/java/org/swisspush/reststorage/StorageExpandIntegrationTest.java b/src/test/java/org/swisspush/reststorage/StorageExpandIntegrationTest.java index 263df81..6c117ff 100644 --- a/src/test/java/org/swisspush/reststorage/StorageExpandIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/StorageExpandIntegrationTest.java @@ -17,6 +17,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.swisspush.reststorage.redis.RedisStorageIntegrationTestCase; import static io.restassured.RestAssured.*; import static org.hamcrest.Matchers.*; diff --git a/src/test/java/org/swisspush/reststorage/lock/impl/RedisBasedLockTest.java b/src/test/java/org/swisspush/reststorage/lock/impl/RedisBasedLockTest.java new file mode 100644 index 0000000..be839d9 --- /dev/null +++ b/src/test/java/org/swisspush/reststorage/lock/impl/RedisBasedLockTest.java @@ -0,0 +1,250 @@ +package org.swisspush.reststorage.lock.impl; + +import com.jayway.awaitility.Duration; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.Timeout; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.redis.client.RedisAPI; +import io.vertx.redis.client.RedisOptions; +import io.vertx.redis.client.impl.RedisClient; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisConnectionException; + +import static com.jayway.awaitility.Awaitility.await; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * Tests for the {@link RedisBasedLock} class + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +@RunWith(VertxUnitRunner.class) +public class RedisBasedLockTest { + + private static Vertx vertx; + private Jedis jedis; + private static RedisBasedLock redisBasedLock; + + String lock_1 = "lock_1"; + String token_1 = "token_1"; + + @org.junit.Rule + public Timeout rule = Timeout.seconds(5); + + @BeforeClass + public static void setupLock(){ + vertx = Vertx.vertx(); + RedisAPI redisAPI = RedisAPI.api(new RedisClient(vertx, new RedisOptions())); + redisBasedLock = new RedisBasedLock(() -> Future.succeededFuture(redisAPI)); + } + + @Before + public void setUp(){ + jedis = new Jedis("localhost"); + try { + jedis.flushAll(); + } catch (JedisConnectionException e){ + org.junit.Assume.assumeNoException("Ignoring this test because no running redis is available. This is the case during release", e); + } + } + + @After + public void tearDown(){ + if(jedis != null){ + jedis.close(); + } + } + + private String lockKey(String lock){ + return RedisBasedLock.STORAGE_PREFIX + lock; + } + + @Test + public void testAcquireLock(TestContext context){ + Async async = context.async(); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, token_1, 500).onComplete(event -> { + context.assertTrue(event.succeeded()); + context.assertTrue(event.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + async.complete(); + }); + } + + @Test + public void testAcquireMultipleLocks(TestContext context){ + Async async = context.async(); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, token_1, 500).onComplete(event -> { + context.assertTrue(event.succeeded()); + context.assertTrue(event.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + redisBasedLock.acquireLock("lock_2", "token_X", 300).onComplete(event1 -> { + context.assertTrue(event1.succeeded()); + context.assertTrue(event1.result()); + context.assertTrue(jedis.exists(lockKey("lock_2"))); + context.assertEquals("token_X", jedis.get(lockKey("lock_2"))); + redisBasedLock.acquireLock("lock_3", "token_Z", 300).onComplete(event2 -> { + context.assertTrue(event2.succeeded()); + context.assertTrue(event2.result()); + context.assertTrue(jedis.exists(lockKey("lock_3"))); + context.assertEquals("token_Z", jedis.get(lockKey("lock_3"))); + async.complete(); + }); + }); + }); + } + + @Test + public void testAcquireLockAgain(TestContext context){ + Async async = context.async(); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, token_1, 300).onComplete(event -> { + context.assertTrue(event.succeeded()); + context.assertTrue(event.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, "token_X", 350).onComplete(event1 -> { + context.assertTrue(event1.succeeded()); + context.assertFalse(event1.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + async.complete(); + }); + }); + } + + @Test + public void testAcquireLockAfterExpired(TestContext context){ + Async async = context.async(); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, token_1, 300).onComplete(event -> { + context.assertTrue(event.succeeded()); + context.assertTrue(event.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + waitMaxUntilExpired(lockKey(lock_1), 350); + redisBasedLock.acquireLock(lock_1, "token_X", 500).onComplete(event1 -> { + context.assertTrue(event1.succeeded()); + context.assertTrue(event1.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals("token_X", jedis.get(lockKey(lock_1))); + async.complete(); + }); + }); + } + + @Test + public void testReleaseNonExistingLock(TestContext context){ + Async async = context.async(); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.releaseLock(lock_1, token_1).onComplete(event -> { + context.assertTrue(event.succeeded()); + context.assertFalse(event.result()); + context.assertFalse(jedis.exists(lockKey(lock_1))); + async.complete(); + }); + } + + @Test + public void testReleaseExpiredLock(TestContext context){ + Async async = context.async(); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, token_1, 500).onComplete(event -> { + context.assertTrue(event.succeeded()); + context.assertTrue(event.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + waitMaxUntilExpired(lockKey(lock_1), 600); + redisBasedLock.releaseLock(lock_1, token_1).onComplete(event1 -> { + context.assertTrue(event1.succeeded()); + context.assertFalse(event1.result()); + context.assertFalse(jedis.exists(lockKey(lock_1))); + async.complete(); + }); + }); + } + + @Test + public void testReleaseExistingLockWithCorrectToken(TestContext context){ + Async async = context.async(); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, token_1, 500).onComplete(event -> { + context.assertTrue(event.succeeded()); + context.assertTrue(event.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + redisBasedLock.releaseLock(lock_1, token_1).onComplete(event1 -> { + context.assertTrue(event1.succeeded()); + context.assertTrue(event1.result()); + context.assertFalse(jedis.exists(lockKey(lock_1))); + async.complete(); + }); + }); + } + + @Test + public void testReleaseExistingLockWithWrongToken(TestContext context){ + Async async = context.async(); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, token_1, 500).onComplete(event -> { + context.assertTrue(event.succeeded()); + context.assertTrue(event.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + redisBasedLock.releaseLock(lock_1, "token_X").onComplete(event1 -> { + context.assertTrue(event1.succeeded()); + context.assertFalse(event1.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + async.complete(); + }); + }); + } + + @Test + public void testReleaseLockRespectingOwnership(TestContext context){ + Async async = context.async(); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, token_1, 500).onComplete(event -> { + context.assertTrue(event.succeeded()); + context.assertTrue(event.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals(token_1, jedis.get(lockKey(lock_1))); + waitMaxUntilExpired(lockKey(lock_1), 600); + context.assertFalse(jedis.exists(lockKey(lock_1))); + redisBasedLock.acquireLock(lock_1, "token_X", 350).onComplete(event1 -> { + context.assertTrue(event1.succeeded()); + context.assertTrue(event1.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals("token_X", jedis.get(lockKey(lock_1))); + redisBasedLock.releaseLock(lock_1, token_1).onComplete(event2 -> { + context.assertTrue(event2.succeeded()); + context.assertFalse(event2.result()); + context.assertTrue(jedis.exists(lockKey(lock_1))); + context.assertEquals("token_X", jedis.get(lockKey(lock_1))); + redisBasedLock.releaseLock(lock_1, "token_X").onComplete(event3 -> { + context.assertTrue(event3.succeeded()); + context.assertTrue(event3.result()); + context.assertFalse(jedis.exists(lockKey(lock_1))); + async.complete(); + }); + }); + }); + }); + } + + private void waitMaxUntilExpired(String key, long expireMs){ + await().pollInterval(50, MILLISECONDS).atMost(new Duration(expireMs, MILLISECONDS)).until(() -> !jedis.exists(key)); + } +} diff --git a/src/test/java/org/swisspush/reststorage/lua/ReleaseLockLuaScriptTests.java b/src/test/java/org/swisspush/reststorage/lua/ReleaseLockLuaScriptTests.java new file mode 100644 index 0000000..d5ac075 --- /dev/null +++ b/src/test/java/org/swisspush/reststorage/lua/ReleaseLockLuaScriptTests.java @@ -0,0 +1,206 @@ +package org.swisspush.reststorage.lua; + +import com.jayway.awaitility.Duration; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.swisspush.reststorage.lock.lua.LockLuaScripts; +import redis.clients.jedis.params.SetParams; + +import java.util.Collections; +import java.util.List; + +import static com.jayway.awaitility.Awaitility.await; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.swisspush.reststorage.lock.impl.RedisBasedLock.STORAGE_PREFIX; + +/** + * Tests for the {@link LockLuaScripts#LOCK_RELEASE} lua script. + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +@RunWith(VertxUnitRunner.class) +public class ReleaseLockLuaScriptTests extends AbstractLuaScriptTest { + + @Test + public void testReleaseNotExistingLocks(){ + + String lock1 = buildLockKey("lock_1"); + String lock2 = buildLockKey("lock_2"); + String lock3 = buildLockKey("lock_3"); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + + assertThat(0L, equalTo(evalScriptReleaseLock(lock1, "token_1"))); + assertThat(0L, equalTo(evalScriptReleaseLock(lock2, "token_2"))); + assertThat(0L, equalTo(evalScriptReleaseLock(lock3, "token_3"))); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + } + + @Test + public void testReleaseLocksWithCorrectToken(){ + + String lock1 = buildLockKey("lock_1"); + String lock2 = buildLockKey("lock_2"); + String lock3 = buildLockKey("lock_3"); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + + acquireLock(lock1, "token_1", 200); + acquireLock(lock2, "token_2", 500); + acquireLock(lock3, "token_3", 1000); + + assertThat(jedis.exists(lock1), is(true)); + assertThat(jedis.exists(lock2), is(true)); + assertThat(jedis.exists(lock3), is(true)); + + assertThat(1L, equalTo(evalScriptReleaseLock(lock1, "token_1"))); + assertThat(1L, equalTo(evalScriptReleaseLock(lock2, "token_2"))); + assertThat(1L, equalTo(evalScriptReleaseLock(lock3, "token_3"))); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + } + + @Test + public void testReleaseLocksWithWrongToken(){ + + String lock1 = buildLockKey("lock_1"); + String lock2 = buildLockKey("lock_2"); + String lock3 = buildLockKey("lock_3"); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + + acquireLock(lock1, "token_1", 200); + acquireLock(lock2, "token_2", 500); + acquireLock(lock3, "token_3", 1000); + + assertThat(jedis.exists(lock1), is(true)); + assertThat(jedis.exists(lock2), is(true)); + assertThat(jedis.exists(lock3), is(true)); + + assertThat(0L, equalTo(evalScriptReleaseLock(lock1, "token_X"))); + assertThat(0L, equalTo(evalScriptReleaseLock(lock2, "token_Y"))); + assertThat(0L, equalTo(evalScriptReleaseLock(lock3, "token_Z"))); + + assertThat(jedis.exists(lock1), is(true)); + assertThat(jedis.exists(lock2), is(true)); + assertThat(jedis.exists(lock3), is(true)); + } + + @Test + public void testReleaseExpiredLocks(){ + + String lock1 = buildLockKey("lock_1"); + String lock2 = buildLockKey("lock_2"); + String lock3 = buildLockKey("lock_3"); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + + acquireLock(lock1, "token_1", 200); + acquireLock(lock2, "token_2", 500); + acquireLock(lock3, "token_3", 1000); + + assertThat(jedis.exists(lock1), is(true)); + assertThat(jedis.exists(lock2), is(true)); + assertThat(jedis.exists(lock3), is(true)); + + waitMaxUntilExpired(lock1, 250); + waitMaxUntilExpired(lock2, 550); + waitMaxUntilExpired(lock3, 1100); + + assertThat(0L, equalTo(evalScriptReleaseLock(lock1, "token_1"))); + assertThat(0L, equalTo(evalScriptReleaseLock(lock2, "token_2"))); + assertThat(0L, equalTo(evalScriptReleaseLock(lock3, "token_3"))); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + } + + @Test + public void testReleaseLockRespectingOwnership(){ + + String lock1 = buildLockKey("lock_4"); + String lock2 = buildLockKey("lock_5"); + String lock3 = buildLockKey("lock_6"); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + + acquireLock(lock1, "token_4", 200); + acquireLock(lock2, "token_5", 500); + acquireLock(lock3, "token_6", 1000); + + assertThat(jedis.exists(lock1), is(true)); + assertThat(jedis.exists(lock2), is(true)); + assertThat(jedis.exists(lock3), is(true)); + + waitMaxUntilExpired(lock1, 250); + waitMaxUntilExpired(lock2, 550); + waitMaxUntilExpired(lock3, 1100); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + + assertThat("OK", equalTo(acquireLock(lock1, "token_X", 200))); + assertThat("OK", equalTo(acquireLock(lock2, "token_Y", 500))); + assertThat("OK", equalTo(acquireLock(lock3, "token_Z", 1000))); + + assertThat(jedis.exists(lock1), is(true)); + assertThat(jedis.exists(lock2), is(true)); + assertThat(jedis.exists(lock3), is(true)); + + assertThat(0L, equalTo(evalScriptReleaseLock(lock1, "token_4"))); + assertThat(0L, equalTo(evalScriptReleaseLock(lock2, "token_5"))); + assertThat(0L, equalTo(evalScriptReleaseLock(lock3, "token_6"))); + + assertThat(jedis.exists(lock1), is(true)); + assertThat(jedis.exists(lock2), is(true)); + assertThat(jedis.exists(lock3), is(true)); + + assertThat(1L, equalTo(evalScriptReleaseLock(lock1, "token_X"))); + assertThat(1L, equalTo(evalScriptReleaseLock(lock2, "token_Y"))); + assertThat(1L, equalTo(evalScriptReleaseLock(lock3, "token_Z"))); + + assertThat(jedis.exists(lock1), is(false)); + assertThat(jedis.exists(lock2), is(false)); + assertThat(jedis.exists(lock3), is(false)); + } + + private String buildLockKey(String lock){ + return STORAGE_PREFIX + lock; + } + + private String acquireLock(String lock, String token, long expireMs){ + return jedis.set(lock, token, SetParams.setParams().nx().px(expireMs)); + } + + private Object evalScriptReleaseLock(String lock, String token){ + String script = readScript(LockLuaScripts.LOCK_RELEASE.getFilename()); + List keys = Collections.singletonList(lock); + List arguments = Collections.singletonList(token); + return jedis.eval(script, keys, arguments); + } + + private void waitMaxUntilExpired(String key, long expireMs){ + await().pollInterval(50, MILLISECONDS).atMost(new Duration(expireMs, MILLISECONDS)).until(() -> !jedis.exists(key)); + } +} diff --git a/src/test/java/org/swisspush/reststorage/RedisStorageIntegrationTestCase.java b/src/test/java/org/swisspush/reststorage/redis/RedisStorageIntegrationTestCase.java similarity index 91% rename from src/test/java/org/swisspush/reststorage/RedisStorageIntegrationTestCase.java rename to src/test/java/org/swisspush/reststorage/redis/RedisStorageIntegrationTestCase.java index d7237f8..e9831e1 100644 --- a/src/test/java/org/swisspush/reststorage/RedisStorageIntegrationTestCase.java +++ b/src/test/java/org/swisspush/reststorage/redis/RedisStorageIntegrationTestCase.java @@ -1,4 +1,4 @@ -package org.swisspush.reststorage; +package org.swisspush.reststorage.redis; import io.restassured.RestAssured; import io.restassured.parsing.Parser; @@ -9,6 +9,9 @@ import org.junit.After; import org.junit.Before; import org.junit.runner.RunWith; +import org.swisspush.reststorage.ConfigurableTestCase; +import org.swisspush.reststorage.JedisFactory; +import org.swisspush.reststorage.RestStorageMod; import org.swisspush.reststorage.util.ModuleConfiguration; import redis.clients.jedis.Jedis; diff --git a/src/test/java/org/swisspush/reststorage/RedisStorageTest.java b/src/test/java/org/swisspush/reststorage/redis/RedisStorageTest.java similarity index 99% rename from src/test/java/org/swisspush/reststorage/RedisStorageTest.java rename to src/test/java/org/swisspush/reststorage/redis/RedisStorageTest.java index 6718bef..152e429 100644 --- a/src/test/java/org/swisspush/reststorage/RedisStorageTest.java +++ b/src/test/java/org/swisspush/reststorage/redis/RedisStorageTest.java @@ -1,4 +1,4 @@ -package org.swisspush.reststorage; +package org.swisspush.reststorage.redis; import io.vertx.core.AsyncResult; import io.vertx.core.Future; diff --git a/src/test/java/org/swisspush/reststorage/ResourceCompressionIntegrationTest.java b/src/test/java/org/swisspush/reststorage/redis/ResourceCompressionIntegrationTest.java similarity index 99% rename from src/test/java/org/swisspush/reststorage/ResourceCompressionIntegrationTest.java rename to src/test/java/org/swisspush/reststorage/redis/ResourceCompressionIntegrationTest.java index 16412d6..4884e24 100644 --- a/src/test/java/org/swisspush/reststorage/ResourceCompressionIntegrationTest.java +++ b/src/test/java/org/swisspush/reststorage/redis/ResourceCompressionIntegrationTest.java @@ -1,5 +1,5 @@ -package org.swisspush.reststorage; +package org.swisspush.reststorage.redis; import io.restassured.specification.RequestSpecification; import io.vertx.ext.unit.Async; diff --git a/src/test/java/org/swisspush/reststorage/util/ModuleConfigurationTest.java b/src/test/java/org/swisspush/reststorage/util/ModuleConfigurationTest.java index cc2ab00..8d7ca14 100644 --- a/src/test/java/org/swisspush/reststorage/util/ModuleConfigurationTest.java +++ b/src/test/java/org/swisspush/reststorage/util/ModuleConfigurationTest.java @@ -40,12 +40,16 @@ public void testDefaultConfiguration(TestContext testContext) { testContext.assertEquals(config.getRedisHost(), "localhost"); testContext.assertEquals(config.getRedisPort(), 6379); testContext.assertFalse(config.isRedisEnableTls()); + testContext.assertNull(config.getRedisAuth()); + testContext.assertNull(config.getRedisUser()); + testContext.assertNull(config.getRedisPassword()); testContext.assertEquals(config.getExpirablePrefix(), "rest-storage:expirable"); testContext.assertEquals(config.getResourcesPrefix(), "rest-storage:resources"); testContext.assertEquals(config.getCollectionsPrefix(), "rest-storage:collections"); testContext.assertEquals(config.getDeltaResourcesPrefix(), "delta:resources"); testContext.assertEquals(config.getDeltaEtagsPrefix(), "delta:etags"); testContext.assertEquals(config.getResourceCleanupAmount(), 100000L); + testContext.assertNull(config.getResourceCleanupIntervalSec()); testContext.assertEquals(config.getLockPrefix(), "rest-storage:locks"); testContext.assertFalse(config.isConfirmCollectionDelete()); testContext.assertFalse(config.isRejectStorageWriteOnLowMemory()); @@ -60,6 +64,8 @@ public void testOverrideConfiguration(TestContext testContext) { .redisHost("anotherhost") .redisPort(1234) .redisEnableTls(true) + .redisUser("myUser") + .redisPassword("secretPassword") .editorConfig(new HashMap<>() {{ put("myKey", "myValue"); }}) @@ -70,6 +76,7 @@ public void testOverrideConfiguration(TestContext testContext) { .httpRequestHandlerPassword("bar") .rejectStorageWriteOnLowMemory(true) .freeMemoryCheckIntervalMs(10000) + .resourceCleanupIntervalSec(15) .maxRedisWaitingHandlers(4096) .return200onDeleteNonExisting(true); @@ -95,6 +102,8 @@ public void testOverrideConfiguration(TestContext testContext) { testContext.assertEquals(config.getRedisHost(), "anotherhost"); testContext.assertEquals(config.getRedisPort(), 1234); testContext.assertTrue(config.isRedisEnableTls()); + testContext.assertEquals(config.getRedisUser(), "myUser"); + testContext.assertEquals(config.getRedisPassword(), "secretPassword"); testContext.assertFalse(config.isHttpRequestHandlerEnabled()); testContext.assertNotNull(config.getEditorConfig()); testContext.assertTrue(config.getEditorConfig().containsKey("myKey")); @@ -102,6 +111,7 @@ public void testOverrideConfiguration(TestContext testContext) { testContext.assertTrue(config.isConfirmCollectionDelete()); testContext.assertTrue(config.isRejectStorageWriteOnLowMemory()); testContext.assertEquals(config.getFreeMemoryCheckIntervalMs(), 10000L); + testContext.assertEquals(config.getResourceCleanupIntervalSec(), 15); testContext.assertTrue(config.isReturn200onDeleteNonExisting()); testContext.assertEquals(config.getMaxRedisWaitingHandlers(), 4096); testContext.assertTrue(config.isHttpRequestHandlerAuthenticationEnabled()); @@ -129,6 +139,8 @@ public void testGetDefaultAsJsonObject(TestContext testContext){ testContext.assertEquals(json.getInteger("redisPort"), 6379); testContext.assertEquals(json.getInteger("maxRedisWaitingHandlers"), 2048); testContext.assertNull(json.getString("redisAuth")); + testContext.assertNull(json.getString("redisPassword")); + testContext.assertNull(json.getString("redisUser")); testContext.assertEquals(json.getString("expirablePrefix"), "rest-storage:expirable"); testContext.assertEquals(json.getString("resourcesPrefix"), "rest-storage:resources"); testContext.assertEquals(json.getString("collectionsPrefix"), "rest-storage:collections"); @@ -158,6 +170,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext){ .httpRequestHandlerPassword("bar") .confirmCollectionDelete(true) .rejectStorageWriteOnLowMemory(true) + .resourceCleanupIntervalSec(15) .freeMemoryCheckIntervalMs(5000); JsonObject json = config.asJsonObject(); @@ -184,8 +197,9 @@ public void testGetOverriddenAsJsonObject(TestContext testContext){ testContext.assertTrue(json.getBoolean("redisEnableTls")); testContext.assertTrue(json.getBoolean("confirmCollectionDelete")); testContext.assertTrue(json.getBoolean("rejectStorageWriteOnLowMemory")); - testContext.assertEquals(config.getFreeMemoryCheckIntervalMs(), 5000L); + testContext.assertEquals(json.getLong("freeMemoryCheckIntervalMs"), 5000L); testContext.assertEquals(json.getInteger("maxRedisWaitingHandlers"), 4096); + testContext.assertEquals(json.getInteger("resourceCleanupIntervalSec"), 15); testContext.assertNotNull(json.getJsonObject("editorConfig")); testContext.assertTrue(json.getJsonObject("editorConfig").containsKey("myKey")); @@ -216,6 +230,7 @@ public void testGetDefaultFromJsonObject(TestContext testContext){ testContext.assertEquals(config.getCollectionsPrefix(), "rest-storage:collections"); testContext.assertEquals(config.getDeltaResourcesPrefix(), "delta:resources"); testContext.assertEquals(config.getDeltaEtagsPrefix(), "delta:etags"); + testContext.assertNull(config.getResourceCleanupIntervalSec()); testContext.assertEquals(config.getResourceCleanupAmount(), 100000L); testContext.assertEquals(config.getLockPrefix(), "rest-storage:locks"); testContext.assertFalse(config.isConfirmCollectionDelete()); @@ -250,6 +265,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext){ json.put("collectionsPrefix", "newCollectionsPrefix"); json.put("deltaResourcesPrefix", "newDeltaResourcesPrefix"); json.put("deltaEtagsPrefix", "newDeltaEtagsPrefix"); + json.put("resourceCleanupIntervalSec", 30); json.put("resourceCleanupAmount", 999L); json.put("lockPrefix", "newLockPrefix"); json.put("confirmCollectionDelete", true); @@ -280,10 +296,45 @@ public void testGetOverriddenFromJsonObject(TestContext testContext){ testContext.assertEquals(config.getCollectionsPrefix(), "newCollectionsPrefix"); testContext.assertEquals(config.getDeltaResourcesPrefix(), "newDeltaResourcesPrefix"); testContext.assertEquals(config.getDeltaEtagsPrefix(), "newDeltaEtagsPrefix"); + testContext.assertEquals(config.getResourceCleanupIntervalSec(), 30); testContext.assertEquals(config.getResourceCleanupAmount(), 999L); testContext.assertEquals(config.getLockPrefix(), "newLockPrefix"); testContext.assertTrue(config.isConfirmCollectionDelete()); testContext.assertTrue(config.isRejectStorageWriteOnLowMemory()); testContext.assertEquals(config.getFreeMemoryCheckIntervalMs(), 30000L); } + + @Test + public void testResourceCleanupIntervalSec(TestContext testContext) { + ModuleConfiguration config = new ModuleConfiguration() + .resourceCleanupIntervalSec(0); + + String json = config.asJsonObject().encodePrettily(); + config = ModuleConfiguration.fromJsonObject(new JsonObject(json)); + testContext.assertNull(config.getResourceCleanupIntervalSec()); + + config = new ModuleConfiguration() + .resourceCleanupIntervalSec(20); + json = config.asJsonObject().encodePrettily(); + config = ModuleConfiguration.fromJsonObject(new JsonObject(json)); + testContext.assertEquals(20, config.getResourceCleanupIntervalSec()); + + config = new ModuleConfiguration() + .resourceCleanupIntervalSec(0); + json = config.asJsonObject().encodePrettily(); + config = ModuleConfiguration.fromJsonObject(new JsonObject(json)); + testContext.assertNull(config.getResourceCleanupIntervalSec()); + + config = new ModuleConfiguration() + .resourceCleanupIntervalSec(-50); + json = config.asJsonObject().encodePrettily(); + config = ModuleConfiguration.fromJsonObject(new JsonObject(json)); + testContext.assertNull(config.getResourceCleanupIntervalSec()); + + config = new ModuleConfiguration() + .resourceCleanupIntervalSec(null); + json = config.asJsonObject().encodePrettily(); + config = ModuleConfiguration.fromJsonObject(new JsonObject(json)); + testContext.assertNull(config.getResourceCleanupIntervalSec()); + } } \ No newline at end of file