Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 provider with sdk v2 #2398

Merged
merged 1 commit into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<module>providers/spanner/shedlock-provider-spanner</module>
<module>providers/neo4j/shedlock-provider-neo4j</module>
<module>providers/s3/shedlock-provider-s3</module>
<module>providers/s3/shedlock-provider-s3v2</module>
</modules>

<properties>
Expand Down
77 changes: 77 additions & 0 deletions providers/s3/shedlock-provider-s3v2/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>shedlock-parent</artifactId>
<groupId>net.javacrumbs.shedlock</groupId>
<version>6.2.1-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>shedlock-provider-s3v2</artifactId>
<version>6.2.1-SNAPSHOT</version>

<properties>
<aws-java-sdk2-s3.version>2.30.19</aws-java-sdk2-s3.version>
</properties>

<dependencies>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${aws-java-sdk2-s3.version}</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${test-containers.ver}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>${test-containers.ver}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-test-support</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.ver}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Automatic-Module-Name>
net.javacrumbs.shedlock.provider.s3v2
</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package net.javacrumbs.shedlock.provider.s3v2;

import java.time.Instant;

record Lock(Instant lockUntil, Instant lockedAt, String lockedBy, String eTag) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package net.javacrumbs.shedlock.provider.s3v2;

import net.javacrumbs.shedlock.support.StorageBasedLockProvider;
import software.amazon.awssdk.services.s3.S3Client;

/**
* Lock provider implementation for S3.
*/
public class S3LockProvider extends StorageBasedLockProvider {

/**
* Constructs an S3LockProvider.
*
* @param s3Client S3 client used to interact with the S3 bucket.
* @param bucketName The name of the S3 bucket where locks are stored.
* @param objectPrefix The prefix of the S3 object lock.
*/
public S3LockProvider(S3Client s3Client, String bucketName, String objectPrefix) {
super(new S3StorageAccessor(s3Client, bucketName, objectPrefix));
}

/**
* Constructs an S3LockProvider.
*
* @param s3Client S3 client used to interact with the S3 bucket.
* @param bucketName The name of the S3 bucket where locks are stored.
*/
public S3LockProvider(S3Client s3Client, String bucketName) {
this(s3Client, bucketName, "shedlock/");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package net.javacrumbs.shedlock.provider.s3v2;

import static net.javacrumbs.shedlock.core.ClockProvider.now;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.support.AbstractStorageAccessor;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

/**
* Implementation of StorageAccessor for S3 as a lock storage backend.
* Manages locks using S3 objects with metadata for expiration and conditional writes.
*/
class S3StorageAccessor extends AbstractStorageAccessor {

private static final String LOCK_UNTIL = "lock-until";
private static final String LOCKED_AT = "locked-at";
private static final String LOCKED_BY = "locked-by";
private static final int PRECONDITION_FAILED = 412;

private final S3Client s3Client;
private final String bucketName;
private final String objectPrefix;

public S3StorageAccessor(S3Client s3Client, String bucketName, String objectPrefix) {
this.s3Client = s3Client;
this.bucketName = bucketName;
this.objectPrefix = objectPrefix;
}

/**
* Finds the lock in the S3 bucket.
*/
Optional<Lock> find(String name, String action) {
try {
HeadObjectResponse metadataResponse = this.s3Client.headObject(HeadObjectRequest.builder()
.bucket(this.bucketName)
.key(this.objectName(name))
.build());

Map<String, String> metadata = metadataResponse.metadata();

Instant lockUntil = Instant.parse(metadata.get(LOCK_UNTIL));
Instant lockedAt = Instant.parse(metadata.get(LOCKED_AT));
String lockedBy = metadata.get(LOCKED_BY);
String eTag = metadataResponse.eTag();

logger.debug("Lock found. action: {}, name: {}, lockUntil: {}, e-tag: {}", action, name, lockUntil, eTag);
return Optional.of(new Lock(lockUntil, lockedAt, lockedBy, eTag));
} catch (AwsServiceException e) {
if (e.statusCode() == 404) {
logger.debug("Lock not found. action: {}, name: {}", action, name);
return Optional.empty();
}
throw e;
}
}

@Override
public boolean insertRecord(LockConfiguration lockConfiguration) {
String name = lockConfiguration.getName();
if (find(name, "insertRecord").isPresent()) {
logger.debug("Lock already exists. name: {}", name);
return false;
}

try {
var lockContent = getLockContent();
Map<String, String> metadata = createMetadata(lockConfiguration.getLockAtMostUntil(), now(), getHostname());

PutObjectRequest request = PutObjectRequest.builder()
.bucket(this.bucketName)
.key(this.objectName(name))
.metadata(metadata)
.ifNoneMatch("*")
.build();

s3Client.putObject(request, RequestBody.fromBytes(lockContent));
logger.debug("Lock created successfully. name: {}, metadata: {}", name, metadata);
return true;
} catch (AwsServiceException e) {
if (e.statusCode() == PRECONDITION_FAILED) {
logger.debug("Lock already in use. name: {}", name);
} else {
logger.warn("Failed to create lock. name: {}", name, e);
}
return false;
}
}

@Override
public boolean updateRecord(LockConfiguration lockConfiguration) {
Optional<Lock> lock = find(lockConfiguration.getName(), "updateRecord");
if (lock.isEmpty()) {
logger.warn("Update skipped. Lock not found. name: {}, lock: {}", lockConfiguration.getName(), lock);
return false;
}
if (lock.get().lockUntil().isAfter(now())) {
logger.debug("Update skipped. Lock still valid. name: {}, lock: {}", lockConfiguration.getName(), lock);
return false;
}

Map<String, String> newMetadata = createMetadata(lockConfiguration.getLockAtMostUntil(), now(), getHostname());
return replaceObjectMetadata(
lockConfiguration.getName(), newMetadata, lock.get().eTag(), "updateRecord");
}

@Override
public void unlock(LockConfiguration lockConfiguration) {
Optional<Lock> lock = find(lockConfiguration.getName(), "unlock");
if (lock.isEmpty()) {
logger.warn("Unlock skipped. Lock not found. name: {}, lock: {}", lockConfiguration.getName(), lock);
return;
}

updateUntil(lockConfiguration.getName(), lock.get(), lockConfiguration.getUnlockTime(), "unlock");
}

@Override
public boolean extend(LockConfiguration lockConfiguration) {
Optional<Lock> lock = find(lockConfiguration.getName(), "extend");
if (lock.isEmpty()
|| lock.get().lockUntil().isBefore(now())
|| !lock.get().lockedBy().equals(getHostname())) {
logger.debug(
"Extend skipped. Lock invalid or not owned by host. name: {}, lock: {}",
lockConfiguration.getName(),
lock);
return false;
}

return updateUntil(lockConfiguration.getName(), lock.get(), lockConfiguration.getLockAtMostUntil(), "extend");
}

private boolean updateUntil(String name, Lock lock, Instant until, String action) {

var existingMetadata = this.s3Client.headObject(HeadObjectRequest.builder()
.bucket(this.bucketName)
.key(this.objectName(name))
.build());

Map<String, String> newMetadata =
createMetadata(until, Instant.parse(existingMetadata.metadata().get(LOCKED_AT)), getHostname());

return replaceObjectMetadata(name, newMetadata, lock.eTag(), action);
}

private boolean replaceObjectMetadata(String name, Map<String, String> newMetadata, String eTag, String action) {
var lockContent = getLockContent();

PutObjectRequest request = PutObjectRequest.builder()
.bucket(this.bucketName)
.key(this.objectName(name))
.metadata(newMetadata)
.ifMatch(eTag)
.build();

try {
PutObjectResponse response = s3Client.putObject(request, RequestBody.fromBytes(lockContent));
logger.debug(
"Lock {} successfully. name: {}, old e-tag: {}, new e-tag: {}",
action,
name,
eTag,
response.eTag());
return true;
} catch (AwsServiceException e) {
if (e.statusCode() == PRECONDITION_FAILED) {
logger.debug("Lock not exists to {}. name: {}, e-tag {}", action, name, eTag);
} else {
logger.warn("Failed to {} lock. name: {}", action, name, e);
}
return false;
}
}

private static byte[] getLockContent() {
var uuid = UUID.randomUUID();
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
return bb.array();
}

private Map<String, String> createMetadata(Instant lockUntil, Instant lockedAt, String lockedBy) {
Map<String, String> metadata = new HashMap<>();
metadata.put(LOCK_UNTIL, lockUntil.toString());
metadata.put(LOCKED_AT, lockedAt.toString());
metadata.put(LOCKED_BY, lockedBy);
return metadata;
}

private String objectName(String name) {
return objectPrefix + name;
}
}
Loading
Loading