diff --git a/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java b/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java index 47ee54e2..b8962297 100644 --- a/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java +++ b/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java @@ -49,7 +49,7 @@ public class DefaultPoolConfig implements PoolConfig { protected final PoolMetricsRecorder metricsRecorder; protected final Clock clock; protected final boolean isIdleLRU; - protected final int warmupConcurrency; + protected final boolean parallelizeWarmup; public DefaultPoolConfig(Mono allocator, AllocationStrategy allocationStrategy, @@ -64,7 +64,7 @@ public DefaultPoolConfig(Mono allocator, PoolMetricsRecorder metricsRecorder, Clock clock, boolean isIdleLRU, - int warmupConcurrency) { + boolean parallelizeWarmup) { this.pendingAcquireTimer = pendingAcquireTimer; this.allocator = allocator; this.allocationStrategy = allocationStrategy; @@ -78,12 +78,12 @@ public DefaultPoolConfig(Mono allocator, this.metricsRecorder = metricsRecorder; this.clock = clock; this.isIdleLRU = isIdleLRU; - this.warmupConcurrency = warmupConcurrency; + this.parallelizeWarmup = parallelizeWarmup; } /** - * @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, BiFunction, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean, int) other constructor} - * with explicit setting of warmupConcurrency, to be removed in 1.0.2 at the earliest. + * @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, BiFunction, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean, boolean) other constructor} + * with explicit setting of parallelizeWarmup, to be removed in 1.0.2 at the earliest. * @since 1.0.1 */ @Deprecated @@ -103,7 +103,7 @@ public DefaultPoolConfig(Mono allocator, this(allocator, allocationStrategy, maxPending, pendingAcquireTimer, releaseHandler, destroyHandler, evictionPredicate, evictInBackgroundInterval, evictInBackgroundScheduler, acquisitionScheduler, metricsRecorder, clock, isIdleLRU, - PoolBuilder.DEFAULT_WARMUP_CONCURRENCY); + PoolBuilder.DEFAULT_PARALLELIZE_WARMUP); } /** @@ -128,7 +128,7 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.metricsRecorder = toCopyDpc.metricsRecorder; this.clock = toCopyDpc.clock; this.isIdleLRU = toCopyDpc.isIdleLRU; - this.warmupConcurrency = toCopyDpc.warmupConcurrency; + this.parallelizeWarmup = toCopyDpc.parallelizeWarmup; } else { this.allocator = toCopy.allocator(); @@ -144,7 +144,7 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.metricsRecorder = toCopy.metricsRecorder(); this.clock = toCopy.clock(); this.isIdleLRU = toCopy.reuseIdleResourcesInLruOrder(); - this.warmupConcurrency = toCopy.warmupConcurrency(); + this.parallelizeWarmup = toCopy.parallelizeWarmup(); } } @@ -169,8 +169,8 @@ public BiFunction pendingAcquireTimer() { } @Override - public int warmupConcurrency() { - return this.warmupConcurrency; + public boolean parallelizeWarmup() { + return this.parallelizeWarmup; } @Override diff --git a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java index 257bd8e5..64c9f27f 100644 --- a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java +++ b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java @@ -78,7 +78,7 @@ public static PoolBuilder> from(Publisher allo PoolMetricsRecorder metricsRecorder = NoOpPoolMetricsRecorder.INSTANCE; boolean idleLruOrder = true; BiFunction pendingAcquireTimer = DEFAULT_PENDING_ACQUIRE_TIMER; - int warmupConcurrency = DEFAULT_WARMUP_CONCURRENCY; + boolean parallelizeWarmup = DEFAULT_PARALLELIZE_WARMUP; PoolBuilder(Mono allocator, Function, CONF> configModifier) { this.allocator = allocator; @@ -432,23 +432,24 @@ public > PoolBuilder extraConfiguration(Fu } /** - * Specifies the concurrency level used when the allocator is subscribed to during the warmup phase. - * During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are - * subscribed to at the same time. - * A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently. - * A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being - * subscribed to from the current thread. - * By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the - * highest possible concurrency level. + * Specifies if the allocator should be subscribed to eagerly during warmup phase. + *

+ * Setting the {@code enableParallelism} flag to {@code true} means that during warmup, all resources that must be pre-allocated will be + * created eagerly. The allocator will be eagerly subscribed to from the current thread for each pre-allocated resources. + *

+ * Setting the {@code enableParallelism} flag to {@code false} means that pre-allocation of resources is achieved by + * sequentially subscribing to the allocator, waiting for a resource to be created before subscribing a next time to the allocator, + * and so on until the last pre-allocated resource completes. + * + *

+ * By default, the warmup parallelism is disabled. + * + * @see #allocator * - * @param warmupConcurrency The concurrency level used when the allocator is subscribed to during the warmup phase, must be positive, - * {@code Integer.MAX_VALUE} by default + * @param enableParallelism Specifies if the allocator should be subscribed to eagerly during warmup phase, {@code false} by default */ - public PoolBuilder warmupConcurrency(int warmupConcurrency) { - if (warmupConcurrency < 1) { - throw new IllegalArgumentException("warmupConcurrency must be positive"); - } - this.warmupConcurrency = warmupConcurrency; + public PoolBuilder parallelizeWarmup(boolean enableParallelism) { + this.parallelizeWarmup = enableParallelism; return this; } @@ -502,7 +503,7 @@ CONF buildConfig() { metricsRecorder, clock, idleLruOrder, - warmupConcurrency); + parallelizeWarmup); return this.configModifier.apply(baseConfig); } @@ -524,5 +525,5 @@ static BiPredicate idlePredicate(Duration maxIdleTime) static final Function> NOOP_HANDLER = it -> Mono.empty(); static final BiPredicate NEVER_PREDICATE = (ignored1, ignored2) -> false; static final BiFunction DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS); - static final int DEFAULT_WARMUP_CONCURRENCY = Integer.MAX_VALUE; + static final boolean DEFAULT_PARALLELIZE_WARMUP = false; } diff --git a/reactor-pool/src/main/java/reactor/pool/PoolConfig.java b/reactor-pool/src/main/java/reactor/pool/PoolConfig.java index 0d1cf84e..96b91371 100644 --- a/reactor-pool/src/main/java/reactor/pool/PoolConfig.java +++ b/reactor-pool/src/main/java/reactor/pool/PoolConfig.java @@ -142,16 +142,23 @@ default BiFunction pendingAcquireTimer() { } /** - * Specifies the concurrency level used when the allocator is subscribed to during the warmup phase. - * During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are - * subscribed to at the same time. - * A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently. - * A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being - * subscribed to from the current thread. - * By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the - * highest possible concurrency level. + * Specifies if the allocator should be subscribed to eagerly during warmup phase. + *

+ * Returning {@code true} means that during warmup, all resources that must be pre-allocated will be + * created eagerly. The allocator will be eagerly subscribed to from the current thread for each pre-allocated resources. + *

+ * Returning {@code false} means that pre-allocation of resources is achieved by + * sequentially subscribing to the allocator, waiting for a resource to be created before subscribing a next time to the allocator, + * and so on until the last pre-allocated resource completes. + * + *

+ * By default, the warmup parallelism is disabled. + * + * @see #allocator + * + * @return {@code true} if the allocator should be subscribed to eagerly during warmup phase */ - default int warmupConcurrency() { - return PoolBuilder.DEFAULT_WARMUP_CONCURRENCY; + default boolean parallelizeWarmup() { + return PoolBuilder.DEFAULT_PARALLELIZE_WARMUP; } } diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 6e25dd68..6ed4349e 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -251,9 +251,9 @@ public Mono warmup() { }); } // merge will eagerly subscribe to all warmups from the current thread, but - // the concurrency can be controlled from configuration. - int warmupConcurrency = Math.min(allWarmups.length, poolConfig.warmupConcurrency()); - return Flux.merge(Flux.fromArray(allWarmups), warmupConcurrency) + // the parallelism can be controlled from configuration. + int mergeConcurrency = poolConfig.parallelizeWarmup() ? allWarmups.length : 1; + return Flux.merge(Flux.fromArray(allWarmups), mergeConcurrency) .reduce(0, (count, p) -> count + 1); }); } @@ -451,8 +451,8 @@ else if (sig.isOnError()) { // merge will eagerly subscribe to the allocator from the current thread, but the concurrency // can be controlled from configuration - int warmupConcurrency = Math.min(toWarmup + 1, poolConfig.warmupConcurrency()); - Flux.merge(monos, warmupConcurrency, Queues.XS_BUFFER_SIZE) + int mergeConcurrency = poolConfig.parallelizeWarmup() ? toWarmup + 1 : 1; + Flux.merge(monos, mergeConcurrency, Queues.XS_BUFFER_SIZE) .onErrorResume(e -> Mono.empty()) .subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain()); } diff --git a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java index b7f86abe..59a79303 100644 --- a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java +++ b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java @@ -1401,6 +1401,40 @@ void recordsAllocationLatenciesInWarmup(PoolStyle configAdjuster) { .clock(recorder.getClock()); AbstractPool pool = configAdjuster.apply(builder); + assertThatIllegalStateException() + .isThrownBy(() -> pool.warmup().block()); + + assertThat(recorder.getAllocationTotalCount()).isEqualTo(2); + + long minSuccess = recorder.getAllocationSuccessHistogram().getMinValue(); + long minError = recorder.getAllocationErrorHistogram().getMinValue(); + + assertThat(minSuccess).as("allocation success latency").isGreaterThanOrEqualTo(100L); + assertThat(minError).as("allocation error latency").isGreaterThanOrEqualTo(200L); + } + + @ParameterizedTestWithName + @MethodSource("allPools") + @Tag("metrics") + void recordsAllocationLatenciesInEagerWarmup(PoolStyle configAdjuster) { + AtomicBoolean flip = new AtomicBoolean(); + //note the starter method here is irrelevant, only the config is created and passed to createPool + PoolBuilder builder = PoolBuilder + .from(Mono.defer(() -> { + if (flip.compareAndSet(false, true)) { + return Mono.just("foo").delayElement(Duration.ofMillis(100)); + } + else { + flip.compareAndSet(true, false); + return Mono.delay(Duration.ofMillis(200)).then(Mono.error(new IllegalStateException("boom"))); + } + })) + .sizeBetween(10, Integer.MAX_VALUE) + .parallelizeWarmup(true) + .metricsRecorder(recorder) + .clock(recorder.getClock()); + AbstractPool pool = configAdjuster.apply(builder); + // warmup will eagerly subscribe 10 times to the allocator. // The five first subscribtions will success (after around 100 millis), and some allocation should fail after around // 200 millis. diff --git a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java index 0d272ab8..4d94039d 100644 --- a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java +++ b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java @@ -70,10 +70,14 @@ public class PoolWarmupTest { protected static Stream warmupTestArgs() { return Stream.of( - Arguments.of(true, 10, Schedulers.immediate()), - Arguments.of(true, 1, Schedulers.single()), - Arguments.of(false, 10, Schedulers.immediate()), - Arguments.of(false, 1, Schedulers.single()) + Arguments.of(true, true, Schedulers.immediate(), true), + Arguments.of(true, true, Schedulers.single(), true), + Arguments.of(true, false, Schedulers.single(), true), + Arguments.of(true, false, Schedulers.immediate(), false), + Arguments.of(false, true, Schedulers.immediate(), true), + Arguments.of(false, true, Schedulers.single(), true), + Arguments.of(false, false, Schedulers.single(), true), + Arguments.of(false, false, Schedulers.immediate(), false) ); } @@ -140,7 +144,7 @@ final static class DBConnectionPool { final InstrumentedPool pool; final static AtomicInteger roundRobin = new AtomicInteger(); - DBConnectionPool(int poolSize, int warmupConcurrency, Scheduler allocatorSubscribeScheduler) { + DBConnectionPool(int poolSize, boolean enableParallelWarmup, Scheduler allocatorSubscribeScheduler) { this.poolSize = poolSize; this.dbThreads = new DBConnectionThread[poolSize]; IntStream.range(0, poolSize).forEach(i -> dbThreads[i] = new DBConnectionThread("dbthread-" + i)); @@ -160,7 +164,7 @@ final static class DBConnectionPool { .subscribeOn(allocatorSubscribeScheduler)) .sizeBetween(10, 10) .idleResourceReuseOrder(false) - .warmupConcurrency(warmupConcurrency) + .parallelizeWarmup(enableParallelWarmup) .buildPool(); } @@ -182,9 +186,9 @@ void stop() { @ParameterizedTest @MethodSource("warmupTestArgs") - void warmupTest(boolean doWarmup, int warmupConcurrency, Scheduler allocatorSubscribeScheduler) { + void warmupTest(boolean doWarmup, boolean enableParallelWarmup, Scheduler allocatorSubscribeScheduler, boolean expectSuccess) { int poolSize = 10; - DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize, warmupConcurrency, allocatorSubscribeScheduler); + DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize, enableParallelWarmup, allocatorSubscribeScheduler); try { InstrumentedPool pool = dbConnectionPool.getPool(); @@ -212,7 +216,12 @@ void warmupTest(boolean doWarmup, int warmupConcurrency, Scheduler allocatorSubs long elapsed = (System.currentTimeMillis() - startTime); LOGGER.info("Elapsed time: " + elapsed + ", concurrency=" + dbConnectionPool.dbThreadsUsed()); - assertThat(dbConnectionPool.dbThreadsUsed()).isEqualTo(10); + if (expectSuccess) { + assertThat(dbConnectionPool.dbThreadsUsed()).isEqualTo(10); + } + else { + assertThat(dbConnectionPool.dbThreadsUsed()).isLessThan(10); + } } finally { dbConnectionPool.stop(); }