Skip to content

Commit

Permalink
Warmup throws IllegalArgumentException in case no permits are availab…
Browse files Browse the repository at this point in the history
…le (#181)

This fixes a regression that comes from #171, from 1.0.1 version: since the pool warmup method allows to be called at anytime, then if no more permits are available from the allocation strategy, the Flux.merge method is then called with a mergeConcurrencypameter that is set to 0, causing the following exception:

maxConcurrency > 0 required but it was 0
java.lang.IllegalArgumentException: maxConcurrency > 0 required but it was 0
	at reactor.core.publisher.FluxFlatMap.<init>(FluxFlatMap.java:74)
	at reactor.core.publisher.Flux.merge(Flux.java:1406)
	at reactor.core.publisher.Flux.merge(Flux.java:1375)
Added a check in warmup to immediately return in case no permits are available.
Also added a test case.

Fixes #180
  • Loading branch information
pderop authored Jan 4, 2024
1 parent 74bfb01 commit 3dae6b7
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
3 changes: 3 additions & 0 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ public Mono<Integer> warmup() {
recordInteractionTimestamp();
int initSize = poolConfig.allocationStrategy()
.getPermits(0);
if (initSize <= 0) {
return Mono.just(0);
}
@SuppressWarnings({ "unchecked", "rawtypes" }) //rawtypes added since javac actually complains
Mono<POOLABLE>[] allWarmups = new Mono[initSize];
for (int i = 0; i < initSize; i++) {
Expand Down
39 changes: 39 additions & 0 deletions reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2730,4 +2730,43 @@ void recordsPendingCountAndLatencies(PoolStyle configAdjuster) {
.as("pending error latency")
.isGreaterThanOrEqualTo(1L);
}


@ParameterizedTestWithName
@MethodSource("allPools")
void gh180_warmupIdempotent(PoolStyle configAdjuster) {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();

AtomicInteger allocCounter = new AtomicInteger();
Mono<Integer> allocator = Mono.fromCallable(allocCounter::incrementAndGet);

PoolBuilder<Integer, ?> builder =
PoolBuilder.from(allocator)
.sizeBetween(10, 10)
.evictionPredicate((poolable, metadata) -> metadata.idleTime() >= 4000)
.evictInBackground(Duration.ofSeconds(5), vts)
.clock(SchedulerClock.of(vts));

InstrumentedPool<Integer> pool = configAdjuster.apply(builder);
pool.warmup().block();
assertThat(allocCounter).as("allocations").hasValue(10);
assertThat(pool.metrics().allocatedSize()).as("allocatedSize").isEqualTo(10);

assertThatCode(() -> vts.advanceTimeBy(Duration.ofSeconds(10))).doesNotThrowAnyException();

assertThat(pool.metrics().allocatedSize()).as("allocatedSize").isEqualTo(0);
assertThat(allocCounter).as("allocations").hasValue(10);

pool.warmup().block();
assertThat(allocCounter).as("allocations").hasValue(20);
assertThat(pool.metrics().allocatedSize()).as("allocatedSize").isEqualTo(10);

// Since warmup can be called at anytime, calling warmup again should not cause any troubles and should keep the
// pool unchanged.
pool.warmup().block();
assertThat(allocCounter).as("allocations").hasValue(20);
assertThat(pool.metrics().allocatedSize()).as("allocatedSize").isEqualTo(10);

pool.disposeLater().block(Duration.ofSeconds(3));
}
}

0 comments on commit 3dae6b7

Please sign in to comment.