Skip to content

Commit

Permalink
Experiment work stealing pools
Browse files Browse the repository at this point in the history
  • Loading branch information
pderop committed Nov 25, 2023
1 parent ea5a599 commit 88ced6e
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 27 deletions.
4 changes: 4 additions & 0 deletions reactor-netty-core/src/main/java/reactor/netty/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ public class Metrics {
*/
public static final String PENDING_STREAMS = ".pending.streams";

/**
* The number of HTTP/2 stream acquisitions steal count.
*/
public static final String STEAL_STREAMS = ".steal.streams";

// ByteBufAllocator Metrics
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ public interface Builder {
* @return {@code this}
*/
Builder minConnections(int minConnections);

/**
* Enables or disables work stealing mode for managing HTTP2 Connection Pools.
* <p>
* By default, a single Connection Pool is used by multiple Netty event loop threads.
* When work stealing is enabled, each Netty event loop will maintain its own
* HTTP2 Connection Pool, and HTTP2 streams allocation will be distributed over all available
* pools using a work stealing strategy. This approach maximizes throughput and
* resource utilization in a multithreaded environment.
*
* @return {@code this}
*/
Builder enableWorkStealing();
}

/**
Expand All @@ -77,6 +90,18 @@ public static Http2AllocationStrategy.Builder builder() {
return new Http2AllocationStrategy.Build();
}

/**
* Creates a builder for {@link Http2AllocationStrategy} and initialize it
* with an existing strategy. This method can be used to create a mutated version
* of an existing strategy.
*
* @return a new {@link Http2AllocationStrategy.Builder} initialized with an existing http2
* allocation strategy.
*/
public static Http2AllocationStrategy.Builder builder(Http2AllocationStrategy existing) {
return new Http2AllocationStrategy.Build(existing);
}

@Override
public Http2AllocationStrategy copy() {
return new Http2AllocationStrategy(this);
Expand Down Expand Up @@ -141,9 +166,14 @@ public void returnPermits(int returned) {
}
}

public boolean enableWorkStealing() {
return enableWorkStealing;
}

final long maxConcurrentStreams;
final int maxConnections;
final int minConnections;
final boolean enableWorkStealing;

volatile int permits;
static final AtomicIntegerFieldUpdater<Http2AllocationStrategy> PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits");
Expand All @@ -152,13 +182,15 @@ public void returnPermits(int returned) {
this.maxConcurrentStreams = build.maxConcurrentStreams;
this.maxConnections = build.maxConnections;
this.minConnections = build.minConnections;
this.enableWorkStealing = build.enableWorkStealing;
PERMITS.lazySet(this, this.maxConnections);
}

Http2AllocationStrategy(Http2AllocationStrategy copy) {
this.maxConcurrentStreams = copy.maxConcurrentStreams;
this.maxConnections = copy.maxConnections;
this.minConnections = copy.minConnections;
this.enableWorkStealing = copy.enableWorkStealing;
PERMITS.lazySet(this, this.maxConnections);
}

Expand All @@ -170,6 +202,17 @@ static final class Build implements Builder {
long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
int maxConnections = DEFAULT_MAX_CONNECTIONS;
int minConnections = DEFAULT_MIN_CONNECTIONS;
boolean enableWorkStealing = Boolean.getBoolean("reactor.netty.pool.h2.enableworkstealing");

Build() {
}

Build(Http2AllocationStrategy existing) {
this.maxConcurrentStreams = existing.maxConcurrentStreams;
this.minConnections = existing.minConnections;
this.maxConnections = existing.maxConnections;
this.enableWorkStealing = existing.enableWorkStealing;
}

@Override
public Http2AllocationStrategy build() {
Expand Down Expand Up @@ -206,5 +249,11 @@ public Builder minConnections(int minConnections) {
this.minConnections = minConnections;
return this;
}

@Override
public Builder enableWorkStealing() {
this.enableWorkStealing = true;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import reactor.netty.ConnectionObserver;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.shaded.reactor.pool.decorators.InstrumentedPoolDecorators;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.PooledConnectionProvider;
import reactor.netty.transport.TransportConfig;
Expand All @@ -50,13 +51,19 @@
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.function.Tuples;

import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.ReactorNetty.getChannelContext;
Expand Down Expand Up @@ -536,12 +543,46 @@ static final class PooledConnectionAllocator {
this.config = (HttpClientConfig) config;
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = id == null ?
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));

Http2AllocationStrategy http2Strategy = poolFactory.allocationStrategy() instanceof Http2AllocationStrategy ?
(Http2AllocationStrategy) poolFactory.allocationStrategy() : null;

if (http2Strategy == null || !http2Strategy.enableWorkStealing) {
this.pool = id == null ?
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
}
else {
// Create one connection allocator (it will be shared by all Http2Pool instances)
Publisher<Connection> connPublisher = connectChannel();

List<Executor> execs = StreamSupport.stream(config.loopResources().onClient(true).spliterator(), false)
.limit(http2Strategy.maxConnections)
.collect(Collectors.toList());
Iterator<Executor> execsIter = execs.iterator();

MicrometerPoolMetricsRecorder micrometerRecorder = id == null ? null : new MicrometerPoolMetricsRecorder(id, name, remoteAddress);
this.pool = InstrumentedPoolDecorators.concurrentPools(execs.size(),
http2Strategy.minConnections, http2Strategy.maxConnections, (minConn, maxConn) -> {
Http2AllocationStrategy adaptedH2Strategy = Http2AllocationStrategy.builder(http2Strategy)
.minConnections(minConn)
.maxConnections(maxConn)
.build();

InstrumentedPool<Connection> pool =
id == null ?
poolFactory.newPool(connPublisher, null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)) :
poolFactory.newPool(connPublisher, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
micrometerRecorder,
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy));

return Tuples.of(pool, execsIter.next());
});
}
}

Publisher<Connection> connectChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ public Meter.Type getType() {
}
},

/**
* The number of HTTP/2 stream acquisition steal count.
*/
STEAL_STREAMS {
@Override
public String getName() {
return "reactor.netty.connection.provider.steal.streams";
}

@Override
public KeyName[] getKeyNames() {
return Http2ConnectionProviderMetersTags.values();
}

@Override
public Meter.Type getType() {
return Meter.Type.COUNTER;
}
},

/**
* The number of the idle connections in the connection pool.
*/
Expand Down
Loading

0 comments on commit 88ced6e

Please sign in to comment.