Skip to content

Commit

Permalink
Allow TransactionOutbox#scheduler ThreadFactory customization
Browse files Browse the repository at this point in the history
  • Loading branch information
reda-alaoui committed Aug 12, 2024
1 parent 1253bb7 commit beee88b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;
import lombok.ToString;
import org.slf4j.MDC;
Expand Down Expand Up @@ -152,6 +153,7 @@ abstract class TransactionOutboxBuilder {
protected Boolean serializeMdc;
protected Duration retentionThreshold;
protected Boolean initializeImmediately;
protected ThreadFactory schedulerThreadFactory;

protected TransactionOutboxBuilder() {}

Expand Down Expand Up @@ -298,6 +300,15 @@ public TransactionOutboxBuilder initializeImmediately(boolean initializeImmediat
return this;
}

/**
* @param schedulerThreadFactory The {@link ThreadFactory} that will be used to build the scheduler executor.
* @return Builder.
*/
public TransactionOutboxBuilder schedulerThreadFactory(ThreadFactory schedulerThreadFactory) {
this.schedulerThreadFactory = schedulerThreadFactory;
return this;
}

/**
* Creates and initialises the {@link TransactionOutbox}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand Down Expand Up @@ -49,9 +50,10 @@ final class TransactionOutboxImpl implements TransactionOutbox, Validatable {
private final boolean serializeMdc;
private final Validator validator;
private final Duration retentionThreshold;
private final ThreadFactory schedulerThreadFactory;
private final AtomicBoolean initialized = new AtomicBoolean();
private final ProxyFactory proxyFactory = new ProxyFactory();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService scheduler = schedulerThreadFactory == null? Executors.newSingleThreadScheduledExecutor() : Executors.newSingleThreadScheduledExecutor(schedulerThreadFactory);

@Override
public void validate(Validator validator) {
Expand Down Expand Up @@ -435,7 +437,8 @@ public TransactionOutboxImpl build() {
Utils.firstNonNull(listener, () -> TransactionOutboxListener.EMPTY),
serializeMdc == null || serializeMdc,
validator,
retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold);
retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold,
schedulerThreadFactory);
validator.validate(impl);
if (initializeImmediately == null || initializeImmediately) {
impl.initialize();
Expand Down

0 comments on commit beee88b

Please sign in to comment.