Skip to content

Commit

Permalink
- reverted delivery of borrower through the acquisition scheduler: th…
Browse files Browse the repository at this point in the history
…is can be done in a separate PR.

- resources allocated during warmup are subscribed to eagerly, but now you can configure the level of concurrency using `PoolBuilder.warmupConcurrency(int concurrency)`
- improved the warpmupTest
  • Loading branch information
pderop committed Jul 5, 2023
1 parent 3ce0667 commit e484c26
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 163 deletions.
40 changes: 37 additions & 3 deletions reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,6 @@
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* A default {@link PoolConfig} that can be extended to bear more configuration options
Expand All @@ -50,6 +49,7 @@ public class DefaultPoolConfig<POOLABLE> implements PoolConfig<POOLABLE> {
protected final PoolMetricsRecorder metricsRecorder;
protected final Clock clock;
protected final boolean isIdleLRU;
protected final int warmupConcurrency;

public DefaultPoolConfig(Mono<POOLABLE> allocator,
AllocationStrategy allocationStrategy,
Expand All @@ -63,7 +63,8 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
Scheduler acquisitionScheduler,
PoolMetricsRecorder metricsRecorder,
Clock clock,
boolean isIdleLRU) {
boolean isIdleLRU,
int warmupConcurrency) {
this.pendingAcquireTimer = pendingAcquireTimer;
this.allocator = allocator;
this.allocationStrategy = allocationStrategy;
Expand All @@ -77,6 +78,32 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
this.metricsRecorder = metricsRecorder;
this.clock = clock;
this.isIdleLRU = isIdleLRU;
this.warmupConcurrency = warmupConcurrency;
}

/**
* @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, BiFunction, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean, int) other constructor}
* with explicit setting of warmupConcurrency, to be removed in 1.0.2 at the earliest.
* @since 1.0.1
*/
@Deprecated
public DefaultPoolConfig(Mono<POOLABLE> allocator,
AllocationStrategy allocationStrategy,
int maxPending,
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer,
Function<POOLABLE, ? extends Publisher<Void>> releaseHandler,
Function<POOLABLE, ? extends Publisher<Void>> destroyHandler,
BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate,
Duration evictInBackgroundInterval,
Scheduler evictInBackgroundScheduler,
Scheduler acquisitionScheduler,
PoolMetricsRecorder metricsRecorder,
Clock clock,
boolean isIdleLRU) {
this(allocator, allocationStrategy, maxPending, pendingAcquireTimer, releaseHandler,
destroyHandler, evictionPredicate, evictInBackgroundInterval, evictInBackgroundScheduler,
acquisitionScheduler, metricsRecorder, clock, isIdleLRU,
PoolBuilder.DEFAULT_WARMUP_CONCURRENCY);
}

/**
Expand All @@ -101,6 +128,7 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.metricsRecorder = toCopyDpc.metricsRecorder;
this.clock = toCopyDpc.clock;
this.isIdleLRU = toCopyDpc.isIdleLRU;
this.warmupConcurrency = toCopyDpc.warmupConcurrency;
}
else {
this.allocator = toCopy.allocator();
Expand All @@ -116,6 +144,7 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.metricsRecorder = toCopy.metricsRecorder();
this.clock = toCopy.clock();
this.isIdleLRU = toCopy.reuseIdleResourcesInLruOrder();
this.warmupConcurrency = toCopy.warmupConcurrency();
}
}

Expand All @@ -139,6 +168,11 @@ public BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer() {
return this.pendingAcquireTimer;
}

@Override
public int warmupConcurrency() {
return this.warmupConcurrency;
}

@Override
public Function<POOLABLE, ? extends Publisher<Void>> releaseHandler() {
return this.releaseHandler;
Expand Down
29 changes: 26 additions & 3 deletions reactor-pool/src/main/java/reactor/pool/PoolBuilder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,6 +78,7 @@ public static <T> PoolBuilder<T, PoolConfig<T>> from(Publisher<? extends T> allo
PoolMetricsRecorder metricsRecorder = NoOpPoolMetricsRecorder.INSTANCE;
boolean idleLruOrder = true;
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer = DEFAULT_PENDING_ACQUIRE_TIMER;
int warmupConcurrency = DEFAULT_WARMUP_CONCURRENCY;

PoolBuilder(Mono<T> allocator, Function<PoolConfig<T>, CONF> configModifier) {
this.allocator = allocator;
Expand Down Expand Up @@ -430,6 +431,27 @@ public <CONF2 extends PoolConfig<T>> PoolBuilder<T, CONF2> extraConfiguration(Fu
return new PoolBuilder<>(this, this.configModifier.andThen(configModifier));
}

/**
* Specifies the concurrency level used when the allocator is subscribed to during the warmup phase.
* During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are
* subscribed to at the same time.
* A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently.
* A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being
* subscribed to from the current thread.
* By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the
* highest possible concurrency level.
*
* @param warmupConcurrency The concurrency level used when the allocator is subscribed to during the warmup phase, must be positive,
* {@code Integer.MAX_VALUE} by default
*/
public PoolBuilder<T, CONF> warmupConcurrency(int warmupConcurrency) {
if (warmupConcurrency < 1) {
throw new IllegalArgumentException("warmupConcurrency must be positive");
}
this.warmupConcurrency = warmupConcurrency;
return this;
}

/**
* Construct a default reactor pool with the builder's configuration.
*
Expand Down Expand Up @@ -479,7 +501,8 @@ CONF buildConfig() {
acquisitionScheduler,
metricsRecorder,
clock,
idleLruOrder);
idleLruOrder,
warmupConcurrency);

return this.configModifier.apply(baseConfig);
}
Expand All @@ -501,5 +524,5 @@ static <T> BiPredicate<T, PooledRefMetadata> idlePredicate(Duration maxIdleTime)
static final Function<?, Mono<Void>> NOOP_HANDLER = it -> Mono.empty();
static final BiPredicate<?, ?> NEVER_PREDICATE = (ignored1, ignored2) -> false;
static final BiFunction<Runnable, Duration, Disposable> DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS);

static final int DEFAULT_WARMUP_CONCURRENCY = Integer.MAX_VALUE;
}
15 changes: 14 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/PoolConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -141,4 +141,17 @@ default BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer() {
return PoolBuilder.DEFAULT_PENDING_ACQUIRE_TIMER;
}

/**
* Specifies the concurrency level used when the allocator is subscribed to during the warmup phase.
* During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are
* subscribed to at the same time.
* A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently.
* A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being
* subscribed to from the current thread.
* By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the
* highest possible concurrency level.
*/
default int warmupConcurrency() {
return PoolBuilder.DEFAULT_WARMUP_CONCURRENCY;
}
}
22 changes: 12 additions & 10 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import reactor.core.scheduler.Schedulers;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/**
* The {@link SimpleDequePool} is based on {@link Deque} for idle resources and pending {@link Pool#acquire()} Monos,
Expand Down Expand Up @@ -249,8 +250,10 @@ public Mono<Integer> warmup() {
.returnPermits(1);
});
}
// merge will eagerly subscribe to all warmups from the current thread
return Flux.merge(allWarmups.length, allWarmups)
// merge will eagerly subscribe to all warmups from the current thread, but
// the concurrency can be controlled from configuration.
int warmupConcurrency = Math.min(allWarmups.length, poolConfig.warmupConcurrency());
return Flux.merge(Flux.fromArray(allWarmups), warmupConcurrency)
.reduce(0, (count, p) -> count + 1);
});
}
Expand Down Expand Up @@ -419,8 +422,7 @@ private void drainLoop() {
assert newInstance != null;
ACQUIRED.incrementAndGet(this);
metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start);
poolConfig.acquisitionScheduler()
.schedule(() -> borrower.deliver(createSlot(newInstance)));
borrower.deliver(createSlot(newInstance));
}
else if (sig.isOnError()) {
Throwable error = sig.getThrowable();
Expand All @@ -444,13 +446,13 @@ else if (sig.isOnError()) {
logger.debug("should warm up {} extra resources", toWarmup);

final long startWarmupIteration = clock.millis();
Flux<POOLABLE> warmupFlux = Flux.range(1, toWarmup)
//individual warmup failures decrement the permit and are logged
.flatMap(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator));
Flux<Mono<POOLABLE>> monos = Flux.range(0, toWarmup + 1)
.map(n -> (n == 0) ? primary : warmupMono(n, toWarmup, startWarmupIteration, allocator));

// mergeSequential will eagerly subscribe to the primary and to all warmupFlux from the current thread.
// The first completed source will be the primary, then the warmupFlux sources.
Flux.mergeSequential(toWarmup + 1, primary, warmupFlux)
// merge will eagerly subscribe to the allocator from the current thread, but the concurrency
// can be controlled from configuration
int warmupConcurrency = Math.min(toWarmup + 1, poolConfig.warmupConcurrency());
Flux.merge(monos, warmupConcurrency, Queues.XS_BUFFER_SIZE)
.onErrorResume(e -> Mono.empty())
.subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain());
}
Expand Down
Loading

0 comments on commit e484c26

Please sign in to comment.