Skip to content

Commit

Permalink
Replaced PoolBuilder.warmupConcurrenty(int concurrency) method by Poo…
Browse files Browse the repository at this point in the history
…lBuilder.parallelizeWarmup(boolean).

Warmups are not parallel (as before).
Reverted CommonPoolTest.recordsAllocationLatenciesInWarmup test, because warmup is like before, by default: no concurrency.
Improved PoolWarmupTest.
  • Loading branch information
pderop committed Jul 5, 2023
1 parent e484c26 commit c4a5af8
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 52 deletions.
20 changes: 10 additions & 10 deletions reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class DefaultPoolConfig<POOLABLE> implements PoolConfig<POOLABLE> {
protected final PoolMetricsRecorder metricsRecorder;
protected final Clock clock;
protected final boolean isIdleLRU;
protected final int warmupConcurrency;
protected final boolean parallelizeWarmup;

public DefaultPoolConfig(Mono<POOLABLE> allocator,
AllocationStrategy allocationStrategy,
Expand All @@ -64,7 +64,7 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
PoolMetricsRecorder metricsRecorder,
Clock clock,
boolean isIdleLRU,
int warmupConcurrency) {
boolean parallelizeWarmup) {
this.pendingAcquireTimer = pendingAcquireTimer;
this.allocator = allocator;
this.allocationStrategy = allocationStrategy;
Expand All @@ -78,12 +78,12 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
this.metricsRecorder = metricsRecorder;
this.clock = clock;
this.isIdleLRU = isIdleLRU;
this.warmupConcurrency = warmupConcurrency;
this.parallelizeWarmup = parallelizeWarmup;
}

/**
* @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, BiFunction, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean, int) other constructor}
* with explicit setting of warmupConcurrency, to be removed in 1.0.2 at the earliest.
* @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, BiFunction, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean, boolean) other constructor}
* with explicit setting of parallelizeWarmup, to be removed in 1.0.2 at the earliest.
* @since 1.0.1
*/
@Deprecated
Expand All @@ -103,7 +103,7 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
this(allocator, allocationStrategy, maxPending, pendingAcquireTimer, releaseHandler,
destroyHandler, evictionPredicate, evictInBackgroundInterval, evictInBackgroundScheduler,
acquisitionScheduler, metricsRecorder, clock, isIdleLRU,
PoolBuilder.DEFAULT_WARMUP_CONCURRENCY);
PoolBuilder.DEFAULT_PARALLELIZE_WARMUP);
}

/**
Expand All @@ -128,7 +128,7 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.metricsRecorder = toCopyDpc.metricsRecorder;
this.clock = toCopyDpc.clock;
this.isIdleLRU = toCopyDpc.isIdleLRU;
this.warmupConcurrency = toCopyDpc.warmupConcurrency;
this.parallelizeWarmup = toCopyDpc.parallelizeWarmup;
}
else {
this.allocator = toCopy.allocator();
Expand All @@ -144,7 +144,7 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.metricsRecorder = toCopy.metricsRecorder();
this.clock = toCopy.clock();
this.isIdleLRU = toCopy.reuseIdleResourcesInLruOrder();
this.warmupConcurrency = toCopy.warmupConcurrency();
this.parallelizeWarmup = toCopy.parallelizeWarmup();
}
}

Expand All @@ -169,8 +169,8 @@ public BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer() {
}

@Override
public int warmupConcurrency() {
return this.warmupConcurrency;
public boolean parallelizeWarmup() {
return this.parallelizeWarmup;
}

@Override
Expand Down
37 changes: 19 additions & 18 deletions reactor-pool/src/main/java/reactor/pool/PoolBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static <T> PoolBuilder<T, PoolConfig<T>> from(Publisher<? extends T> allo
PoolMetricsRecorder metricsRecorder = NoOpPoolMetricsRecorder.INSTANCE;
boolean idleLruOrder = true;
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer = DEFAULT_PENDING_ACQUIRE_TIMER;
int warmupConcurrency = DEFAULT_WARMUP_CONCURRENCY;
boolean parallelizeWarmup = DEFAULT_PARALLELIZE_WARMUP;

PoolBuilder(Mono<T> allocator, Function<PoolConfig<T>, CONF> configModifier) {
this.allocator = allocator;
Expand Down Expand Up @@ -432,23 +432,24 @@ public <CONF2 extends PoolConfig<T>> PoolBuilder<T, CONF2> extraConfiguration(Fu
}

/**
* Specifies the concurrency level used when the allocator is subscribed to during the warmup phase.
* During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are
* subscribed to at the same time.
* A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently.
* A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being
* subscribed to from the current thread.
* By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the
* highest possible concurrency level.
* Specifies if the allocator should be subscribed to eagerly during warmup phase.
* <p>
* Setting the {@code enableParallelism} flag to {@code true} means that during warmup, all resources that must be pre-allocated will be
* created eagerly. The allocator will be eagerly subscribed to from the current thread for each pre-allocated resources.
* <p>
* Setting the {@code enableParallelism} flag to {@code false} means that pre-allocation of resources is achieved by
* sequentially subscribing to the allocator, waiting for a resource to be created before subscribing a next time to the allocator,
* and so on until the last pre-allocated resource completes.
*
* <p>
* By default, the warmup parallelism is disabled.
*
* @see #allocator
*
* @param warmupConcurrency The concurrency level used when the allocator is subscribed to during the warmup phase, must be positive,
* {@code Integer.MAX_VALUE} by default
* @param enableParallelism Specifies if the allocator should be subscribed to eagerly during warmup phase, {@code false} by default
*/
public PoolBuilder<T, CONF> warmupConcurrency(int warmupConcurrency) {
if (warmupConcurrency < 1) {
throw new IllegalArgumentException("warmupConcurrency must be positive");
}
this.warmupConcurrency = warmupConcurrency;
public PoolBuilder<T, CONF> parallelizeWarmup(boolean enableParallelism) {
this.parallelizeWarmup = enableParallelism;
return this;
}

Expand Down Expand Up @@ -502,7 +503,7 @@ CONF buildConfig() {
metricsRecorder,
clock,
idleLruOrder,
warmupConcurrency);
parallelizeWarmup);

return this.configModifier.apply(baseConfig);
}
Expand All @@ -524,5 +525,5 @@ static <T> BiPredicate<T, PooledRefMetadata> idlePredicate(Duration maxIdleTime)
static final Function<?, Mono<Void>> NOOP_HANDLER = it -> Mono.empty();
static final BiPredicate<?, ?> NEVER_PREDICATE = (ignored1, ignored2) -> false;
static final BiFunction<Runnable, Duration, Disposable> DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS);
static final int DEFAULT_WARMUP_CONCURRENCY = Integer.MAX_VALUE;
static final boolean DEFAULT_PARALLELIZE_WARMUP = false;
}
27 changes: 17 additions & 10 deletions reactor-pool/src/main/java/reactor/pool/PoolConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,23 @@ default BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer() {
}

/**
* Specifies the concurrency level used when the allocator is subscribed to during the warmup phase.
* During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are
* subscribed to at the same time.
* A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently.
* A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being
* subscribed to from the current thread.
* By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the
* highest possible concurrency level.
* Specifies if the allocator should be subscribed to eagerly during warmup phase.
* <p>
* Returning {@code true} means that during warmup, all resources that must be pre-allocated will be
* created eagerly. The allocator will be eagerly subscribed to from the current thread for each pre-allocated resources.
* <p>
* Returning {@code false} means that pre-allocation of resources is achieved by
* sequentially subscribing to the allocator, waiting for a resource to be created before subscribing a next time to the allocator,
* and so on until the last pre-allocated resource completes.
*
* <p>
* By default, the warmup parallelism is disabled.
*
* @see #allocator
*
* @return {@code true} if the allocator should be subscribed to eagerly during warmup phase
*/
default int warmupConcurrency() {
return PoolBuilder.DEFAULT_WARMUP_CONCURRENCY;
default boolean parallelizeWarmup() {
return PoolBuilder.DEFAULT_PARALLELIZE_WARMUP;
}
}
10 changes: 5 additions & 5 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ public Mono<Integer> warmup() {
});
}
// merge will eagerly subscribe to all warmups from the current thread, but
// the concurrency can be controlled from configuration.
int warmupConcurrency = Math.min(allWarmups.length, poolConfig.warmupConcurrency());
return Flux.merge(Flux.fromArray(allWarmups), warmupConcurrency)
// the parallelism can be controlled from configuration.
int mergeConcurrency = poolConfig.parallelizeWarmup() ? allWarmups.length : 1;
return Flux.merge(Flux.fromArray(allWarmups), mergeConcurrency)
.reduce(0, (count, p) -> count + 1);
});
}
Expand Down Expand Up @@ -451,8 +451,8 @@ else if (sig.isOnError()) {

// merge will eagerly subscribe to the allocator from the current thread, but the concurrency
// can be controlled from configuration
int warmupConcurrency = Math.min(toWarmup + 1, poolConfig.warmupConcurrency());
Flux.merge(monos, warmupConcurrency, Queues.XS_BUFFER_SIZE)
int mergeConcurrency = poolConfig.parallelizeWarmup() ? toWarmup + 1 : 1;
Flux.merge(monos, mergeConcurrency, Queues.XS_BUFFER_SIZE)
.onErrorResume(e -> Mono.empty())
.subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain());
}
Expand Down
34 changes: 34 additions & 0 deletions reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,40 @@ void recordsAllocationLatenciesInWarmup(PoolStyle configAdjuster) {
.clock(recorder.getClock());
AbstractPool<String> pool = configAdjuster.apply(builder);

assertThatIllegalStateException()
.isThrownBy(() -> pool.warmup().block());

assertThat(recorder.getAllocationTotalCount()).isEqualTo(2);

long minSuccess = recorder.getAllocationSuccessHistogram().getMinValue();
long minError = recorder.getAllocationErrorHistogram().getMinValue();

assertThat(minSuccess).as("allocation success latency").isGreaterThanOrEqualTo(100L);
assertThat(minError).as("allocation error latency").isGreaterThanOrEqualTo(200L);
}

@ParameterizedTestWithName
@MethodSource("allPools")
@Tag("metrics")
void recordsAllocationLatenciesInEagerWarmup(PoolStyle configAdjuster) {
AtomicBoolean flip = new AtomicBoolean();
//note the starter method here is irrelevant, only the config is created and passed to createPool
PoolBuilder<String, ?> builder = PoolBuilder
.from(Mono.defer(() -> {
if (flip.compareAndSet(false, true)) {
return Mono.just("foo").delayElement(Duration.ofMillis(100));
}
else {
flip.compareAndSet(true, false);
return Mono.delay(Duration.ofMillis(200)).then(Mono.error(new IllegalStateException("boom")));
}
}))
.sizeBetween(10, Integer.MAX_VALUE)
.parallelizeWarmup(true)
.metricsRecorder(recorder)
.clock(recorder.getClock());
AbstractPool<String> pool = configAdjuster.apply(builder);

// warmup will eagerly subscribe 10 times to the allocator.
// The five first subscribtions will success (after around 100 millis), and some allocation should fail after around
// 200 millis.
Expand Down
27 changes: 18 additions & 9 deletions reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ public class PoolWarmupTest {

protected static Stream<Arguments> warmupTestArgs() {
return Stream.of(
Arguments.of(true, 10, Schedulers.immediate()),
Arguments.of(true, 1, Schedulers.single()),
Arguments.of(false, 10, Schedulers.immediate()),
Arguments.of(false, 1, Schedulers.single())
Arguments.of(true, true, Schedulers.immediate(), true),
Arguments.of(true, true, Schedulers.single(), true),
Arguments.of(true, false, Schedulers.single(), true),
Arguments.of(true, false, Schedulers.immediate(), false),
Arguments.of(false, true, Schedulers.immediate(), true),
Arguments.of(false, true, Schedulers.single(), true),
Arguments.of(false, false, Schedulers.single(), true),
Arguments.of(false, false, Schedulers.immediate(), false)
);
}

Expand Down Expand Up @@ -140,7 +144,7 @@ final static class DBConnectionPool {
final InstrumentedPool<DBConnection> pool;
final static AtomicInteger roundRobin = new AtomicInteger();

DBConnectionPool(int poolSize, int warmupConcurrency, Scheduler allocatorSubscribeScheduler) {
DBConnectionPool(int poolSize, boolean enableParallelWarmup, Scheduler allocatorSubscribeScheduler) {
this.poolSize = poolSize;
this.dbThreads = new DBConnectionThread[poolSize];
IntStream.range(0, poolSize).forEach(i -> dbThreads[i] = new DBConnectionThread("dbthread-" + i));
Expand All @@ -160,7 +164,7 @@ final static class DBConnectionPool {
.subscribeOn(allocatorSubscribeScheduler))
.sizeBetween(10, 10)
.idleResourceReuseOrder(false)
.warmupConcurrency(warmupConcurrency)
.parallelizeWarmup(enableParallelWarmup)
.buildPool();
}

Expand All @@ -182,9 +186,9 @@ void stop() {

@ParameterizedTest
@MethodSource("warmupTestArgs")
void warmupTest(boolean doWarmup, int warmupConcurrency, Scheduler allocatorSubscribeScheduler) {
void warmupTest(boolean doWarmup, boolean enableParallelWarmup, Scheduler allocatorSubscribeScheduler, boolean expectSuccess) {
int poolSize = 10;
DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize, warmupConcurrency, allocatorSubscribeScheduler);
DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize, enableParallelWarmup, allocatorSubscribeScheduler);

try {
InstrumentedPool<DBConnection> pool = dbConnectionPool.getPool();
Expand Down Expand Up @@ -212,7 +216,12 @@ void warmupTest(boolean doWarmup, int warmupConcurrency, Scheduler allocatorSubs
long elapsed = (System.currentTimeMillis() - startTime);
LOGGER.info("Elapsed time: " + elapsed + ", concurrency=" + dbConnectionPool.dbThreadsUsed());

assertThat(dbConnectionPool.dbThreadsUsed()).isEqualTo(10);
if (expectSuccess) {
assertThat(dbConnectionPool.dbThreadsUsed()).isEqualTo(10);
}
else {
assertThat(dbConnectionPool.dbThreadsUsed()).isLessThan(10);
}
} finally {
dbConnectionPool.stop();
}
Expand Down

0 comments on commit c4a5af8

Please sign in to comment.