Skip to content

Commit

Permalink
Merge pull request #197 from swisspost/aws-s3-supports
Browse files Browse the repository at this point in the history
AWS S3 support added
  • Loading branch information
dominik-cnx authored Nov 18, 2024
2 parents b2ab80e + c320063 commit 4b2c665
Show file tree
Hide file tree
Showing 8 changed files with 1,066 additions and 6 deletions.
23 changes: 21 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[![GitHub release](https://img.shields.io/github/v/release/swisspost/vertx-rest-storage)](https://github.com/swisspost/vertx-rest-storage/releases/latest)
[![Maven Central](https://img.shields.io/maven-central/v/org.swisspush/rest-storage.svg)]()

Persistence for REST resources in the filesystem or a redis database.
Persistence for REST resources in the filesystem, Aws S3 storage or a redis database.

Stores resources in a hierarchical way according to their URI. It actually implements a generic CRUD REST service.

Expand Down Expand Up @@ -223,6 +223,15 @@ The following configuration values are available:
| rejectStorageWriteOnLowMemory | redis | false | When set to _true_, PUT requests with the x-importance-level header can be rejected when memory gets low |
| freeMemoryCheckIntervalMs | redis | 60000 | The interval in milliseconds to calculate the actual memory usage |
| redisReadyCheckIntervalMs | redis | -1 | The interval in milliseconds to calculate the "ready state" of redis. When value < 1, no "ready state" will be calculated |
| awsS3Region | s3 | | The region of AWS S3 server, with local service such localstack, also need set a valid region |
| s3BucketName | s3 | | The S3 bucket name |
| s3AccessKeyId | s3 | | The s3 access key Id |
| s3SecretAccessKey | s3 | | The s3 secret access key |
| localS3 | s3 | | Set to true in order to use a local S3 instance instead of AWS |
| localS3Endpoint | s3 | | The endpoint/host to use in case that localS3 is set to true, e.g. 127.0.0.1 (in my case it had to be an IP) |
| localS3Port | s3 | | The port to use in case that localS3 is set to true, e.g. 4566 |
| createBucketIfNotExist | s3 | | create bucket if bucket not exist, related permission required |


### Configuration util

Expand All @@ -249,11 +258,21 @@ JsonObject json = new ModuleConfiguration().asJsonObject();
```

## Storage types
Currently, there are two storage types supported. File system storage and redis storage.
Currently, there are three storage types supported. File system storage, S3 storage and redis storage.

### File System Storage
The data is stored hierarchically on the file system. This is the default storage type when not overridden in the configuration.

### S3 storage
The data is stored in a S3 instance.

#### AWS S3
See https://aws.amazon.com/s3 it is also possible to use a local instance using https://docs.localstack.cloud/user-guide/aws/s3/


docker run --rm -p 4566:4566 -v ./s3:/var/lib/localstack localstack/localstack:s3-latest


### Redis Storage
The data is stored in a redis database.
Caution: The redis storage implementation does not currently support streaming. Avoid transferring too big payloads since they will be entirely copied in memory.
Expand Down
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${awssdk.version}</version>
</dependency>
<!-- TEST dependencies -->
<dependency>
<!--
Expand Down Expand Up @@ -133,6 +137,11 @@
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.nio.s3</groupId>
<artifactId>aws-java-nio-spi-for-s3</artifactId>
<version>${nio.spi.s3.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
Expand Down Expand Up @@ -404,6 +413,8 @@
</profiles>
<properties>
<vertx.version>4.5.2</vertx.version>
<awssdk.version>2.27.17</awssdk.version>
<nio.spi.s3.version>2.1.0</nio.spi.s3.version>
<slf4j.version>2.0.10</slf4j.version>
<junit.version>4.13.2</junit.version>
<jackson.version>2.16.0</jackson.version>
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/org/swisspush/reststorage/RestStorageMod.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.swisspush.reststorage;

import io.vertx.core.*;
;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import org.slf4j.Logger;
Expand All @@ -9,6 +13,7 @@
import org.swisspush.reststorage.redis.DefaultRedisProvider;
import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.redis.RedisStorage;
import org.swisspush.reststorage.s3.S3FileSystemStorage;
import org.swisspush.reststorage.util.ModuleConfiguration;

import static org.swisspush.reststorage.exception.RestStorageExceptionFactory.newRestStorageThriftyExceptionFactory;
Expand Down Expand Up @@ -73,6 +78,14 @@ private Future<Storage> createStorage(ModuleConfiguration moduleConfiguration) {
case filesystem:
promise.complete(new FileSystemStorage(vertx, exceptionFactory, moduleConfiguration.getRoot()));
break;
case s3:
promise.complete(new S3FileSystemStorage(vertx, exceptionFactory, moduleConfiguration.getRoot(),
moduleConfiguration.getAwsS3Region(), moduleConfiguration.getS3BucketName(),
moduleConfiguration.getS3AccessKeyId(), moduleConfiguration.getS3SecretAccessKey(),
moduleConfiguration.getS3UseTlsConnection(), moduleConfiguration.isLocalS3(),
moduleConfiguration.getLocalS3Endpoint(), moduleConfiguration.getLocalS3Port(),
moduleConfiguration.getCreateBucketIfNotPresentYet()));
break;
case redis:
createRedisStorage(vertx, moduleConfiguration).onComplete(event -> {
if(event.succeeded()){
Expand Down
274 changes: 274 additions & 0 deletions src/main/java/org/swisspush/reststorage/s3/FileReadStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package org.swisspush.reststorage.s3;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import org.slf4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;

import static io.vertx.core.file.impl.AsyncFileImpl.DEFAULT_READ_BUFFER_SIZE;
import static org.slf4j.LoggerFactory.getLogger;


public class FileReadStream<T> implements ReadStream<T>, Closeable {

private static final Logger log = getLogger(FileReadStream.class);
private final long expectedSize;
private final String path;

private final ReadableByteChannel ch;
private final Vertx vertx;
private final Context context;

private boolean closed;
private boolean readInProgress;

private Handler<Buffer> dataHandler;
private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler;
private final InboundBuffer<Buffer> queue;

private final int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
private long writtenBytes = 0;

/**
* @param expectedSize Actual file size which is expected to be streamed through that stream
* in bytes.
* @param path Token printed alongside the logs so when reading logs, we can see which
* log belongs to which file. A possible candidate is to use the file path
* but it theoretically can be anything which helps you to find logs
* related to your observed file.
* @param stream The file (or stream) we wanna observe.
*/
public FileReadStream(Vertx vertx, long expectedSize, String path, InputStream stream) {
this.vertx = vertx;
this.context = vertx.getOrCreateContext();
this.expectedSize = expectedSize;
this.path = path;
this.ch = Channels.newChannel(stream);
this.queue = new InboundBuffer<>(context, 0);
queue.handler(buff -> {
if (buff.length() > 0) {
handleData(buff);
} else {
handleEnd();
}
});
queue.drainHandler(v -> {
doRead();
});
}

public void close() {
closeInternal(null);
}

public void close(Handler<AsyncResult<Void>> handler) {
closeInternal(handler);
}

@Override
public ReadStream<T> exceptionHandler(Handler<Throwable> exceptionHandler) {
log.trace("exceptionHandler registered for reading '{}'", path);
check();
this.exceptionHandler = exceptionHandler;
return this;
}

@Override
public ReadStream<T> handler(Handler handler) {
log.trace("handler registered");
check();
this.dataHandler = handler;
if (this.dataHandler != null && !this.closed) {
this.doRead();
} else {
queue.clear();
}
return this;
}

@Override
public ReadStream<T> pause() {
log.debug("Pause reading at offset {} for '{}'", writtenBytes, path);
check();
queue.pause();
return this;
}

@Override
public ReadStream<T> resume() {
log.debug("Resume reading at offset {} for '{}'", writtenBytes, path);
check();
if (!closed) {
queue.resume();
}
return this;
}

@Override
public ReadStream<T> fetch(long amount) {
log.debug("fetch amount {}", amount);
queue.fetch(amount);
return this;
}

@Override
public ReadStream<T> endHandler(Handler<Void> endHandler) {
log.trace("endHandler registered.");
check();
this.endHandler = endHandler;
log.debug("End handler called ({} bytes remaining) for '{}'", expectedSize - writtenBytes, path);
return this;
}

private void doRead() {
check();
doRead(ByteBuffer.allocate(readBufferSize));
}

private synchronized void doRead(ByteBuffer bb) {
if (!readInProgress) {
readInProgress = true;
Buffer buff = Buffer.buffer(readBufferSize);
doRead(buff, 0, bb, writtenBytes, ar -> {
if (ar.succeeded()) {
readInProgress = false;
Buffer buffer = ar.result();
writtenBytes += buffer.length();
// Empty buffer represents end of file
if (queue.write(buffer) && buffer.length() > 0) {
doRead(bb);
}
} else {
handleException(ar.cause());
}
});
}
}

private void doRead(Buffer writeBuff, int offset, ByteBuffer buff, long position, Handler<AsyncResult<Buffer>> handler) {
// ReadableByteChannel doesn't have a completion handler, so we wrap it into
// an executeBlocking and use the future there
vertx.executeBlocking(future -> {
try {
Integer bytesRead = ch.read(buff);
future.complete(bytesRead);
} catch (IOException e) {
log.error("Failed to read data from buffer.", e);
future.fail(e);
}

}, res -> {

if (res.failed()) {
log.error("Failed to read data from buffer.", res.cause());
context.runOnContext((v) -> handler.handle(Future.failedFuture(res.cause())));
} else {
// Do the completed check
Integer bytesRead = (Integer) res.result();
if (bytesRead == -1) {
//End of file
context.runOnContext((v) -> {
buff.flip();
writeBuff.setBytes(offset, buff);
buff.compact();
handler.handle(Future.succeededFuture(writeBuff));
});
} else if (buff.hasRemaining()) {
long pos = position;
pos += bytesRead;
// resubmit
doRead(writeBuff, offset, buff, pos, handler);
} else {
// It's been fully written

context.runOnContext((v) -> {
buff.flip();
writeBuff.setBytes(offset, buff);
buff.compact();
handler.handle(Future.succeededFuture(writeBuff));
});
}
}
});
}

private void handleData(Buffer buff) {
Handler<Buffer> handler;
synchronized (this) {
handler = this.dataHandler;
}
if (handler != null) {
checkContext();
handler.handle(buff);
}
}

private synchronized void handleEnd() {
Handler<Void> endHandler;
synchronized (this) {
dataHandler = null;
endHandler = this.endHandler;
}
if (endHandler != null) {
checkContext();
endHandler.handle(null);
}
}

private void handleException(Throwable t) {
if (exceptionHandler != null && t instanceof Exception) {
exceptionHandler.handle(t);
} else {
log.error("Unhandled exception", t);
}
}

private void check() {
if (this.closed) {
throw new IllegalStateException("Inputstream is closed");
}
}

public boolean isClosed() {
return this.closed;
}

private void checkContext() {
if (!vertx.getOrCreateContext().equals(context)) {
throw new IllegalStateException("AsyncInputStream must only be used in the context that created it, expected: " + this.context
+ " actual " + vertx.getOrCreateContext());
}
}

private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) {
check();
closed = true;
doClose(handler);
}

private void doClose(Handler<AsyncResult<Void>> handler) {
try {
ch.close();
if (handler != null) {
this.vertx.runOnContext(v -> handler.handle(Future.succeededFuture()));
}
} catch (IOException e) {
if (handler != null) {
this.vertx.runOnContext(v -> handler.handle(Future.failedFuture(e)));
}
}
}
}
Loading

0 comments on commit 4b2c665

Please sign in to comment.