Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature : Blocked entries methods #338

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
**/.classpath
**/.project
**/.settings
**/.factorypath
/.idea
/*.iml
**/target/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,4 +302,33 @@ public void clear(Transaction tx) throws SQLException {
stmt.execute("DELETE FROM " + tableName);
}
}

@Override
public int unblockAll(Transaction tx) throws Exception {
PreparedStatement stmt =
tx.prepareBatchStatement(
"UPDATE "
+ tableName
+ " SET attempts = 0, blocked = false "
+ "WHERE blocked = true AND processed = false");
stmt.setQueryTimeout(writeLockTimeoutSeconds);
return stmt.executeUpdate();
}

@Override
public List<TransactionOutboxEntry> selectBlocked(Transaction tx, int page, int batchSize)
throws Exception {
try (PreparedStatement stmt =
tx.connection()
.prepareStatement(
"SELECT "
+ ALL_FIELDS
+ " FROM "
+ tableName
+ " WHERE blocked = true AND processed = false ORDER BY lastAttemptTime DESC LIMIT ? OFFSET ?")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated to handle different dialects (LIMIT isn't supported by Oracle).

Sorry @RomainWilbert; I got around to re-checking this PR after the Oracle PR was merged.

stmt.setInt(1, batchSize);
stmt.setInt(2, page * batchSize);
return gatherResults(batchSize, stmt);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Connection obtainConnection() {
public void validate(Validator validator) {
validator.notBlank("driverClassName", driverClassName);
validator.notBlank("url", url);
validator.notBlank("user",user);
validator.notBlank("user", user);
validator.notBlank("password", password);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@ static DefaultPersistor forDialect(Dialect dialect) {
*/
boolean unblock(Transaction tx, String entryId) throws Exception;

/**
* Clears the blocked flag and resets the attempt count to zero for all blocked entries.
*
* @param tx The current {@link Transaction}.
* @return unblocked entries count
* @throws Exception Any other exception.
*/
int unblockAll(Transaction tx) throws Exception;

/**
* Selects up to a specified maximum number of blocked records with pagination.
*
* @param tx The current {@link Transaction}.
* @param page the page to get
* @param batchSize The number of records to select.
* @return The records.
* @throws Exception Any exception.
*/
List<TransactionOutboxEntry> selectBlocked(Transaction tx, int page, int batchSize)
throws Exception;

/**
* Selects up to a specified maximum number of non-blocked records which have passed their {@link
* TransactionOutboxEntry#getNextAttemptTime()}. Until a subsequent call to {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,15 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I
public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) {
return 0;
}

@Override
public int unblockAll(Transaction tx) throws Exception {
return 0;
}

@Override
public List<TransactionOutboxEntry> selectBlocked(Transaction tx, int page, int batchSize)
throws Exception {
return List.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import lombok.ToString;
Expand Down Expand Up @@ -38,8 +39,9 @@ static TransactionOutboxBuilder builder() {
*
* <p>Usage:
*
* <pre>transactionOutbox.schedule(MyService.class)
* .runMyMethod("with", "some", "arguments");</pre>
* <pre>
* transactionOutbox.schedule(MyService.class).runMyMethod("with", "some", "arguments");
* </pre>
*
* <p>This will write a record to the database using the supplied {@link Persistor} and {@link
* Instantiator}, using the current database transaction, which will get rolled back if the rest
Expand Down Expand Up @@ -112,6 +114,26 @@ static TransactionOutboxBuilder builder() {
@SuppressWarnings({"unchecked", "rawtypes"})
boolean unblock(String entryId, Object transactionContext);

/**
* Unblocks all blocked entries and resets the attempt count so that they will be retried again.
* Requires an active transaction and a transaction manager that supports thread local context.
*
* @return unblocked entries count.
*/
int unblockAll();

/**
* Clears failed entries of their failed state and resets the attempt count so that they will be
* retried again. Requires an active transaction and a transaction manager that supports supplied
* context.
*
* @param transactionContext The transaction context ({@link TransactionManager} implementation
* specific).
* @return unblocked entries count.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
int unblockAll(Object transactionContext);

/**
* Processes an entry immediately in the current thread. Intended for use in custom
* implementations of {@link Submitter} and should not generally otherwise be called.
Expand All @@ -121,22 +143,43 @@ static TransactionOutboxBuilder builder() {
@SuppressWarnings("WeakerAccess")
void processNow(TransactionOutboxEntry entry);

/**
* Get blocked entries with pagination.
*
* @param page page number
* @param batchSize The number of records to select.
* @return blocked entries
*/
List<TransactionOutboxEntry> getBlockedEntries(int page, int batchSize);

/** Builder for {@link TransactionOutbox}. */
@ToString
abstract class TransactionOutboxBuilder {

protected TransactionManager transactionManager;

protected Instantiator instantiator;

protected Submitter submitter;

protected Duration attemptFrequency;

protected int blockAfterAttempts;

protected int flushBatchSize;

protected Supplier<Clock> clockProvider;

protected TransactionOutboxListener listener;

protected Persistor persistor;

protected Level logLevelTemporaryFailure;

protected Boolean serializeMdc;

protected Duration retentionThreshold;

protected Boolean initializeImmediately;

protected TransactionOutboxBuilder() {}
Expand Down Expand Up @@ -316,10 +359,9 @@ interface ParameterizedScheduleBuilder {
*
* <p>Usage example:
*
* <pre>transactionOutbox.with()
* .uniqueRequestId("my-request")
* .schedule(MyService.class)
* .runMyMethod("with", "some", "arguments");</pre>
* <pre>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lost formatting here

* transactionOutbox.with().uniqueRequestId("my-request").schedule(MyService.class).runMyMethod("with", "some", "arguments");
* </pre>
*
* @param clazz The class to proxy.
* @param <T> The type to proxy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,33 @@ class TransactionOutboxImpl implements TransactionOutbox, Validatable {
private static final int DEFAULT_FLUSH_BATCH_SIZE = 4096;

private final TransactionManager transactionManager;

private final Persistor persistor;

private final Instantiator instantiator;

private final Submitter submitter;

private final Duration attemptFrequency;

private final Level logLevelTemporaryFailure;

private final int blockAfterAttempts;

private final int flushBatchSize;

private final Supplier<Clock> clockProvider;

private final TransactionOutboxListener listener;

private final boolean serializeMdc;

private final Validator validator;

private final Duration retentionThreshold;

private final AtomicBoolean initialized = new AtomicBoolean();

private final ProxyFactory proxyFactory = new ProxyFactory();

private TransactionOutboxImpl(
Expand Down Expand Up @@ -416,4 +430,66 @@ public <T> T schedule(Class<T> clazz) {
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId);
}
}

@Override
public int unblockAll() {
if (!initialized.get()) {
throw new IllegalStateException("Not initialized");
}
if (!(transactionManager instanceof ThreadLocalContextTransactionManager)) {
throw new UnsupportedOperationException(
"This method requires a ThreadLocalContextTransactionManager");
}
log.info("Unblocking entries for retry.");
try {
return ((ThreadLocalContextTransactionManager) transactionManager)
.requireTransactionReturns(tx -> persistor.unblockAll(tx));
} catch (Exception e) {
throw (RuntimeException) Utils.uncheckAndThrow(e);
}
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public int unblockAll(Object transactionContext) {
if (!initialized.get()) {
throw new IllegalStateException("Not initialized");
}
if (!(transactionManager instanceof ParameterContextTransactionManager)) {
throw new UnsupportedOperationException(
"This method requires a ParameterContextTransactionManager");
}
log.info("Unblocking entries for retry");
try {
if (transactionContext instanceof Transaction) {
return persistor.unblockAll((Transaction) transactionContext);
}
Transaction transaction =
((ParameterContextTransactionManager) transactionManager)
.transactionFromContext(transactionContext);
return persistor.unblockAll(transaction);
} catch (Exception e) {
throw (RuntimeException) Utils.uncheckAndThrow(e);
}
}

@Override
public List<TransactionOutboxEntry> getBlockedEntries(int page, int batchSize) {
return transactionManager.inTransactionReturns(
transaction -> {
List<TransactionOutboxEntry> result = new ArrayList<>(batchSize);
uncheckedly(() -> persistor.selectBlocked(transaction, page, batchSize))
.forEach(
entry -> {
log.debug("Reprocessing {}", entry.description());
try {
pushBack(transaction, entry);
result.add(entry);
} catch (OptimisticLockException e) {
log.debug("Beaten to optimistic lock on {}", entry.description());
}
});
return result;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public void valid(String propertyName, Object object) {
if (!(object instanceof Validatable)) {
return;
}
((Validatable)object).validate(new Validator(path.isEmpty() ? propertyName : (path + "." + propertyName), this));
((Validatable) object)
.validate(new Validator(path.isEmpty() ? propertyName : (path + "." + propertyName), this));
}

public void notNull(String propertyName, Object object) {
Expand Down Expand Up @@ -68,6 +69,7 @@ public void min(String propertyName, int object, int minimumValue) {
}

private void error(String propertyName, String message) {
throw new IllegalArgumentException((path.isEmpty() ? "" : path + ".") + propertyName + " " + message);
throw new IllegalArgumentException(
(path.isEmpty() ? "" : path + ".") + propertyName + " " + message);
}
}
Loading