diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 7adf4fceeb..2481ef0547 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -28,15 +28,19 @@ import io.servicetalk.concurrent.api.AsyncCloseable; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.CompositeCloseable; +import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.internal.DelayedCancellable; import io.servicetalk.concurrent.internal.SequentialCancellable; import io.servicetalk.concurrent.internal.ThrowableUtils; +import io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.SharedExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Arrays; @@ -48,6 +52,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; +import javax.annotation.Nullable; import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT; import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_READY_EVENT; @@ -56,11 +61,15 @@ import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter; import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_INTERVAL; import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.EAGER_CONNECTION_SHUTDOWN_ENABLED; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -129,7 +138,9 @@ public final class RoundRobinLoadBalancer> eventPublisher, final ConnectionFactory connectionFactory) { - this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED); + this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, + new HealthCheckConfig(SharedExecutor.getInstance(), + DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD)); } /** @@ -141,10 +152,15 @@ public RoundRobinLoadBalancer(final Publisher> eventPublisher, final ConnectionFactory connectionFactory, - final boolean eagerConnectionShutdown) { + final boolean eagerConnectionShutdown, + @Nullable final HealthCheckConfig healthCheckConfig) { Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); this.eventStream = fromSource(eventStreamProcessor); this.connectionFactory = requireNonNull(connectionFactory); @@ -221,7 +237,7 @@ private List> markHostAsExpired( } private Host createHost(ResolvedAddress addr) { - Host host = new Host<>(addr); + Host host = new Host<>(addr, healthCheckConfig); if (!eagerConnectionShutdown) { host.onClose().afterFinally(() -> usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> { @@ -244,9 +260,9 @@ private List> addHostToList( // duplicates are not allowed for (Host host : oldHostsTyped) { if (host.address.equals(addr)) { - if (handleExpired && !host.tryToMarkActive()) { - // If the new state is not ACTIVE, the host is already in CLOSED state, we should create - // a new entry. For duplicate ACTIVE events or for repeated activation due to failed CAS + if (handleExpired && !host.markActiveIfNotClosed()) { + // If the host is already in CLOSED state, we should create a new entry. + // For duplicate ACTIVE events or for repeated activation due to failed CAS // of replacing the usedHosts array the marking succeeds so we will not add a new entry. break; } @@ -324,6 +340,10 @@ RoundRobinLoadBalancerFactory newRoundRobinFactory() { return new RoundRobinLoadBalancerFactory<>(); } + private static Single failedLBClosed() { + return failed(new IllegalStateException("LoadBalancer has closed")); + } + @Override public Single selectConnection(Predicate selector) { return defer(() -> selectConnection0(selector).subscribeShareContext()); @@ -367,8 +387,9 @@ private Single selectConnection0(Predicate selector) { } } - // don't open new connections for expired hosts, try a different one - if (host.isActive()) { + // Don't open new connections for expired or unhealthy hosts, try a different one. + // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. + if (host.isActiveAndHealthy()) { pickedHost = host; break; } @@ -380,9 +401,15 @@ private Single selectConnection0(Predicate selector) { } // No connection was selected: create a new one. final Host host = pickedHost; + // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. - return connectionFactory.newConnection(host.address, null) + Single establishConnection = connectionFactory.newConnection(host.address, null); + if (host.healthCheckConfig != null) { + // Schedule health check before returning + establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy(connectionFactory)); + } + return establishConnection .flatMap(newCnx -> { // Invoke the selector before adding the connection to the pool, otherwise, connection can be // used concurrently and hence a new connection can be rejected by the selector. @@ -416,6 +443,11 @@ public Completable closeAsyncGracefully() { return asyncCloseable.closeAsyncGracefully(); } + // Visible for testing + List>> usedAddresses() { + return usedHosts.stream().map(Host::asEntry).collect(toList()); + } + /** * Please use {@link io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory} instead of this factory. * @@ -432,63 +464,73 @@ public static final class RoundRobinLoadBalancerFactory LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { - return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED); + return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, + new HealthCheckConfig(SharedExecutor.getInstance(), + DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD)); } } - // Visible for testing - List>> usedAddresses() { - return usedHosts.stream().map(Host::asEntry).collect(toList()); - } + static final class HealthCheckConfig { + private final Executor executor; + private final Duration healthCheckInterval; + private final int failedThreshold; - private static final class Host implements ListenableAsyncCloseable { - private static final ConnState EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, State.ACTIVE); - private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); + HealthCheckConfig(final Executor executor, final Duration healthCheckInterval, final int failedThreshold) { + this.executor = executor; + this.healthCheckInterval = healthCheckInterval; + this.failedThreshold = failedThreshold; + } + } - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater connStateUpdater = - AtomicReferenceFieldUpdater.newUpdater(Host.class, ConnState.class, "connState"); + private static final class Host implements ListenableAsyncCloseable { private enum State { - ACTIVE, + // The enum is not exhaustive, as other states have dynamic properties. + // For clarity, the other state classes are listed as comments: + // ACTIVE - see ActiveState + // UNHEALTHY - see HealthCheck EXPIRED, CLOSED } - private static final class ConnState { - final Object[] connections; - final State state; + private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState(); + private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE_NO_FAILURES); + private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); - ConnState(final Object[] connections, final State state) { - this.connections = connections; - this.state = state; - } - } + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater connStateUpdater = + AtomicReferenceFieldUpdater.newUpdater(Host.class, ConnState.class, "connState"); final Addr address; + @Nullable + private final HealthCheckConfig healthCheckConfig; private final ListenableAsyncCloseable closeable; + private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE; - private volatile ConnState connState = EMPTY_CONN_STATE; - - Host(Addr address) { + Host(Addr address, @Nullable HealthCheckConfig healthCheckConfig) { this.address = requireNonNull(address); + this.healthCheckConfig = healthCheckConfig; this.closeable = toAsyncCloseable(graceful -> graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); } - boolean tryToMarkActive() { - return connStateUpdater.updateAndGet(this, oldConnState -> { + boolean markActiveIfNotClosed() { + final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> { if (oldConnState.state == State.EXPIRED) { - return new ConnState(oldConnState.connections, State.ACTIVE); + return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES); } // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, // or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything. + // UNHEALTHY state cannot transition to ACTIVE without passing the health check. return oldConnState; - }).state == State.ACTIVE; + }).state; + return oldState != State.CLOSED; } void markClosed() { - final Object[] toRemove = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE).connections; + final ConnState oldState = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE); + final Object[] toRemove = oldState.connections; + cancelIfHealthCheck(oldState.state); LOGGER.debug("Closing {} connection(s) gracefully to closed address: {}", toRemove.length, address); for (Object conn : toRemove) { @SuppressWarnings("unchecked") @@ -498,30 +540,96 @@ void markClosed() { } void markExpired() { - final ConnState newState = connStateUpdater.updateAndGet(this, - oldConnState -> oldConnState.connections.length == 0 ? - CLOSED_CONN_STATE : new ConnState(oldConnState.connections, State.EXPIRED)); - if (newState == CLOSED_CONN_STATE) { - // Trigger the callback to remove the host from usedHosts array. - this.closeAsync().subscribe(); + for (;;) { + ConnState oldState = connStateUpdater.get(this); + if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) { + break; + } + Object nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED; + + if (connStateUpdater.compareAndSet(this, oldState, + new ConnState(oldState.connections, nextState))) { + cancelIfHealthCheck(oldState.state); + if (nextState == State.CLOSED) { + // Trigger the callback to remove the host from usedHosts array. + this.closeAsync().subscribe(); + } + break; + } } } - boolean isActive() { - return connState.state == State.ACTIVE; + void markHealthy() { + // Marking healthy is generally called from a successful health check, after a connection was added. + // However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed + // to open connections and entered the UNHEALTHY state before the original thread continues execution here. + // In such case, the flipped state is not the same as the one that just succeeded to open a connection. + // In an unlikely scenario that the following connection attempts fail indefinitely, a health check task + // would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new + // health check. + Object oldState = connStateUpdater.getAndUpdate(this, previous -> { + if (HealthCheck.class.equals(previous.state.getClass())) { + return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES); + } + return previous; + }).state; + cancelIfHealthCheck(oldState); + } + + void markUnhealthy(ConnectionFactory connectionFactory) { + assert healthCheckConfig != null; + for (;;) { + ConnState previous = connStateUpdater.get(this); + + if (!ActiveState.class.equals(previous.state.getClass()) || previous.connections.length > 0) { + break; + } + + ActiveState previousState = (ActiveState) previous.state; + if (previousState.failedConnections + 1 < this.healthCheckConfig.failedThreshold) { + final ActiveState nextState = previousState.forNextFailedConnection(); + if (connStateUpdater.compareAndSet(this, previous, + new ConnState(previous.connections, nextState))) { + LOGGER.debug("Active host for address {} failed to open {} connections" + + " ({} consecutive failures trigger health check).", + address, nextState.failedConnections, healthCheckConfig.failedThreshold); + break; + } + // another thread won the race, try again + continue; + } + + final HealthCheck healthCheck = new HealthCheck<>(connectionFactory, this); + final ConnState nextState = new ConnState(previous.connections, healthCheck); + if (connStateUpdater.compareAndSet(this, previous, nextState)) { + LOGGER.debug("Triggering health check for address {} after {} failed attempts" + + " to open a new connection", address, previousState.failedConnections); + healthCheck.schedule(); + break; + } + } + } + + boolean isActiveAndHealthy() { + return ActiveState.class.equals(connState.state.getClass()); } boolean addConnection(C connection) { for (;;) { - final ConnState currentConnState = this.connState; - if (currentConnState == CLOSED_CONN_STATE) { + final ConnState previous = connStateUpdater.get(this); + if (previous == CLOSED_CONN_STATE) { return false; } - final Object[] existing = currentConnState.connections; + + final Object[] existing = previous.connections; Object[] newList = Arrays.copyOf(existing, existing.length + 1); newList[existing.length] = connection; + + Object newState = ActiveState.class.equals(previous.state.getClass()) ? + STATE_ACTIVE_NO_FAILURES : previous.state; + if (connStateUpdater.compareAndSet(this, - currentConnState, new ConnState(newList, currentConnState.state))) { + previous, new ConnState(newList, newState))) { break; } } @@ -543,8 +651,9 @@ currentConnState, new ConnState(newList, currentConnState.state))) { if (i == connections.length) { break; } else if (connections.length == 1) { - if (currentConnState.state == State.ACTIVE) { - if (connStateUpdater.compareAndSet(this, currentConnState, EMPTY_CONN_STATE)) { + if (ActiveState.class.equals(currentConnState.state.getClass())) { + if (connStateUpdater.compareAndSet(this, currentConnState, + new ConnState(EMPTY_ARRAY, currentConnState.state))) { break; } } else if (currentConnState.state == State.EXPIRED @@ -572,8 +681,8 @@ currentConnState, new ConnState(newList, currentConnState.state))) { }).subscribe(); return true; } - // Used for testing only + // Used for testing only @SuppressWarnings("unchecked") Entry> asEntry() { return new SimpleImmutableEntry<>(address, @@ -598,12 +707,23 @@ public Completable onClose() { @SuppressWarnings("unchecked") private Completable doClose(final Function closeFunction) { return Completable.defer(() -> { - final Object[] connections = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE).connections; + final ConnState oldState = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE); + cancelIfHealthCheck(oldState.state); + final Object[] connections = oldState.connections; return connections.length == 0 ? completed() : from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn)); }); } + private void cancelIfHealthCheck(Object o) { + if (HealthCheck.class.equals(o.getClass())) { + @SuppressWarnings("unchecked") + HealthCheck healthCheck = (HealthCheck) o; + LOGGER.debug("Health check for address {} cancelled.", healthCheck.host.address); + healthCheck.cancel(); + } + } + @Override public String toString() { final ConnState connState = this.connState; @@ -613,6 +733,91 @@ public String toString() { ", #connections=" + connState.connections.length + '}'; } + + private static final class ActiveState { + private final int failedConnections; + + ActiveState() { + this(0); + } + + private ActiveState(int failedConnections) { + this.failedConnections = failedConnections; + } + + ActiveState forNextFailedConnection() { + return new ActiveState(addWithOverflowProtection(this.failedConnections, 1)); + } + + @Override + public String toString() { + return "ACTIVE(failedConnections=" + failedConnections + ')'; + } + } + + private static final class HealthCheck + extends DelayedCancellable { + private static final Exception RESCHEDULE_SIGNAL = ThrowableUtils.unknownStackTrace( + new ConnectionRejectedException("Connection rejected during health check."), + HealthCheck.class, "run()"); + private final ConnectionFactory connectionFactory; + private final Host host; + + private HealthCheck(final ConnectionFactory connectionFactory, + final Host host) { + this.connectionFactory = connectionFactory; + this.host = host; + } + + public void schedule() { + assert host.healthCheckConfig != null; + delayedCancellable( + retryWithConstantBackoffFullJitter(cause -> true, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.executor) + .apply(0, RESCHEDULE_SIGNAL) + .concat(reconnect() + .retryWhen(retryWithConstantBackoffFullJitter( + cause -> cause == RESCHEDULE_SIGNAL, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.executor))) + .subscribe()); + } + + public Completable reconnect() { + return connectionFactory.newConnection(host.address, null) + .onErrorMap(cause -> { + LOGGER.debug("Health check failed for address {}.", host.address, cause); + return RESCHEDULE_SIGNAL; + }) + .flatMapCompletable(newCnx -> { + if (host.addConnection(newCnx)) { + LOGGER.debug("Health check passed for address {}.", host.address); + host.markHealthy(); + } else { + LOGGER.debug("Health check finished for address {}. Host rejected connection.", + host.address); + return newCnx.closeAsync(); + } + return completed(); + }); + } + + @Override + public String toString() { + return "UNHEALTHY"; + } + } + + private static final class ConnState { + final Object[] connections; + final Object state; + + ConnState(final Object[] connections, final Object state) { + this.connections = connections; + this.state = state; + } + } } private static final class StacklessNoAvailableHostException extends NoAvailableHostException { @@ -631,8 +836,4 @@ public static StacklessNoAvailableHostException newInstance(String message, Clas return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method); } } - - private static Single failedLBClosed() { - return failed(new IllegalStateException("LoadBalancer has closed")); - } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 797507eef4..5c2513b66f 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -20,9 +20,17 @@ import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.LoadBalancerFactory; import io.servicetalk.client.api.ServiceDiscovererEvent; +import io.servicetalk.concurrent.api.DefaultThreadFactory; +import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.loadbalancer.RoundRobinLoadBalancer.HealthCheckConfig; +import java.time.Duration; import java.util.function.Predicate; +import javax.annotation.Nullable; + +import static java.util.Objects.requireNonNull; /** * {@link LoadBalancerFactory} that creates {@link LoadBalancer} instances which use a round robin strategy @@ -40,6 +48,12 @@ * to addresses marked as {@link ServiceDiscovererEvent#isAvailable() unavailable} are used for requests, * but no new connections are created for them. In case the address' connections are busy, another host is tried. * If all hosts are busy, selection fails with a {@link io.servicetalk.client.api.ConnectionRejectedException}. + *
  • For hosts to which consecutive connection attempts fail, a background health checking task is created and + * the host is not considered for opening new connections until the background check succeeds to create a connection. + * Upon such event, the connection can immediately be reused and future attempts will again consider this host. + * This behaviour can be disabled using a negative argument for + * {@link Builder#healthCheckFailedConnectionsThreshold(int)} and the failing host will take part in the regular + * round robin cycle for trying to establish a connection on the request path.
  • * * * @param The resolved address type. @@ -49,17 +63,26 @@ public final class RoundRobinLoadBalancerFactory { static final boolean EAGER_CONNECTION_SHUTDOWN_ENABLED = true; + static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = Duration.ofSeconds(1); + static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy + private final boolean eagerConnectionShutdown; - private RoundRobinLoadBalancerFactory(boolean eagerConnectionShutdown) { + @Nullable + private final HealthCheckConfig healthCheckConfig; + + private RoundRobinLoadBalancerFactory(boolean eagerConnectionShutdown, + @Nullable HealthCheckConfig healthCheckConfig) { this.eagerConnectionShutdown = eagerConnectionShutdown; + this.healthCheckConfig = healthCheckConfig; } @Override public LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { - return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, eagerConnectionShutdown); + return new RoundRobinLoadBalancer<>( + eventPublisher, connectionFactory, eagerConnectionShutdown, healthCheckConfig); } /** @@ -70,6 +93,10 @@ public LoadBalancer newLoadBalancer( */ public static final class Builder { private boolean eagerConnectionShutdown = EAGER_CONNECTION_SHUTDOWN_ENABLED; + @Nullable + private Executor backgroundExecutor; + private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL; + private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; /** * Creates a new instance with default settings. @@ -95,13 +122,95 @@ public RoundRobinLoadBalancerFactory.Builder eagerConnection return this; } + /** + * This {@link LoadBalancer} may monitor hosts to which connection establishment has failed + * using health checks that run in the background. The health check tries to establish a new connection + * and if it succeeds, the host is returned to the load balancing pool. As long as the connection + * establishment fails, the host is not considered for opening new connections for processed requests. + * If an {@link Executor} is not provided using this method, a default shared instance is used + * for all {@link LoadBalancer LoadBalancers} created by this factory. + *

    + * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable this mechanism and always + * consider all hosts for establishing new connections. + * + * @param backgroundExecutor {@link Executor} on which to schedule health checking. + * @return {@code this}. + * @see #healthCheckFailedConnectionsThreshold(int) + */ + public RoundRobinLoadBalancerFactory.Builder backgroundExecutor( + Executor backgroundExecutor) { + this.backgroundExecutor = requireNonNull(backgroundExecutor); + return this; + } + + /** + * Configure an interval for health checking a host that failed to open connections. If no interval is provided + * using this method, a default value will be used. + *

    + * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism + * and always consider all hosts for establishing new connections. + * @param interval interval at which a background health check will be scheduled. + * @return {@code this}. + * @see #healthCheckFailedConnectionsThreshold(int) + */ + public RoundRobinLoadBalancerFactory.Builder healthCheckInterval(Duration interval) { + if (interval.isNegative() || interval.isZero()) { + throw new IllegalArgumentException("Health check interval should be greater than 0"); + } + this.healthCheckInterval = interval; + return this; + } + + /** + * Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer} + * consecutively fails to open connections in the amount greater or equal to the specified value, + * the host will be marked as unhealthy and connection establishment will take place in the background + * repeatedly until a connection is established. During that time, the host will not take part in + * load balancing selection. + *

    + * Use a negative value of the argument to disable health checking. + * @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for + * background health checking. Use negative value to disable the health checking mechanism. + * @return {@code this}. + * @see #backgroundExecutor(Executor) + * @see #healthCheckInterval(Duration) + */ + public RoundRobinLoadBalancerFactory.Builder healthCheckFailedConnectionsThreshold( + int threshold) { + if (threshold == 0) { + throw new IllegalArgumentException("Health check failed connections threshold should not be 0"); + } + this.healthCheckFailedConnectionsThreshold = threshold; + return this; + } + /** * Builds the {@link RoundRobinLoadBalancerFactory} configured by this builder. * * @return a new instance of {@link RoundRobinLoadBalancerFactory} with settings from this builder. */ public RoundRobinLoadBalancerFactory build() { - return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown); + if (this.healthCheckFailedConnectionsThreshold < 0) { + return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, null); + } + + HealthCheckConfig healthCheckConfig = new HealthCheckConfig( + this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor, + healthCheckInterval, healthCheckFailedConnectionsThreshold); + + return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, healthCheckConfig); + } + } + + static final class SharedExecutor { + private static final Executor INSTANCE = Executors.newFixedSizeExecutor(1, + new DefaultThreadFactory("round-robin-load-balancer-executor")); + + private SharedExecutor() { + } + + static Executor getInstance() { + return INSTANCE; } } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java index c21e3dc629..ee51ebd05c 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java @@ -106,13 +106,7 @@ private void validateConnectionClosedGracefully(final TestLoadBalancedConnection } @Override - protected RoundRobinLoadBalancer defaultLb() { - return newTestLoadBalancer(true); - } - - @Override - protected RoundRobinLoadBalancer defaultLb( - RoundRobinLoadBalancerTest.DelegatingConnectionFactory connectionFactory) { - return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory, true); + protected boolean eagerConnectionShutdown() { + return true; } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java index 7770370e23..5c79fb7b02 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java @@ -309,13 +309,7 @@ public void handleDiscoveryEventsForNotConnectedHosts() { } @Override - protected RoundRobinLoadBalancer defaultLb() { - return newTestLoadBalancer(false); - } - - @Override - protected RoundRobinLoadBalancer defaultLb( - RoundRobinLoadBalancerTest.DelegatingConnectionFactory connectionFactory) { - return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory, false); + protected boolean eagerConnectionShutdown() { + return false; } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index 07fbca0d08..eda50e7991 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -25,10 +25,13 @@ import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.ExecutorRule; import io.servicetalk.concurrent.api.LegacyTestSingle; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.api.TestExecutor; import io.servicetalk.concurrent.api.TestPublisher; import io.servicetalk.concurrent.api.TestSubscription; import io.servicetalk.concurrent.internal.DeliberateException; @@ -44,6 +47,7 @@ import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; +import java.time.Duration; import java.util.AbstractMap; import java.util.Arrays; import java.util.List; @@ -65,14 +69,18 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable; import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; +import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter; +import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static io.servicetalk.concurrent.internal.TestTimeoutConstants.DEFAULT_TIMEOUT_SECONDS; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; @@ -90,6 +98,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -105,6 +114,9 @@ abstract class RoundRobinLoadBalancerTest { @Rule public final ExpectedException thrown = ExpectedException.none(); + @Rule + public final ExecutorRule executor = ExecutorRule.withTestExecutor(); + protected final TestSingleSubscriber selectConnectionListener = new TestSingleSubscriber<>(); protected final List connectionsCreated = new CopyOnWriteArrayList<>(); @@ -116,6 +128,8 @@ abstract class RoundRobinLoadBalancerTest { protected RoundRobinLoadBalancer lb; + protected TestExecutor testExecutor; + protected static Predicate any() { return __ -> true; } @@ -124,13 +138,20 @@ protected Predicate alwaysNewConnectionFilter() { return cnx -> lb.usedAddresses().stream().noneMatch(addr -> addr.getValue().stream().anyMatch(cnx::equals)); } - protected abstract RoundRobinLoadBalancer defaultLb(); + protected RoundRobinLoadBalancer defaultLb() { + return newTestLoadBalancer(eagerConnectionShutdown()); + } + + protected RoundRobinLoadBalancer defaultLb( + DelegatingConnectionFactory connectionFactory) { + return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory, eagerConnectionShutdown()); + } - protected abstract RoundRobinLoadBalancer defaultLb( - DelegatingConnectionFactory connectionFactory); + protected abstract boolean eagerConnectionShutdown(); @Before public void initialize() { + testExecutor = executor.executor(); lb = defaultLb(); connectionsCreated.clear(); connectionRealizers.clear(); @@ -381,6 +402,182 @@ public void newConnectionIsClosedWhenSelectorRejects() throws Exception { awaitIndefinitely(connection.onClose()); } + @Test + public void unhealthyHostTakenOutOfPoolForSelection() throws Exception { + serviceDiscoveryPublisher.onComplete(); + + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); + + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + lb = defaultLb(connectionFactory); + + sendServiceDiscoveryEvents(upEvent("address-1")); + sendServiceDiscoveryEvents(upEvent("address-2")); + + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD * 2; ++i) { + try { + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } catch (Exception e) { + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); + } + } + + for (int i = 0; i < 10; ++i) { + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + + assertThat(testExecutor.scheduledTasksPending(), equalTo(1)); + + for (int i = 0; i < timeAdvancementsTillHealthy; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + } + + assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); + + for (int i = 0; i < 10; ++i) { + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + } + + @Test + public void disabledHealthCheckDoesntRun() throws Exception { + serviceDiscoveryPublisher.onComplete(); + + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + + lb = (RoundRobinLoadBalancer) + new RoundRobinLoadBalancerFactory.Builder() + .healthCheckFailedConnectionsThreshold(-1) + .build() + .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory); + + sendServiceDiscoveryEvents(upEvent("address-1")); + + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); + } + + assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); + + for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); + } + + unhealthyHostConnectionFactory.advanceTime(testExecutor); + assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); + + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + + @Test + public void hostUnhealthyIsHealthChecked() throws Exception { + serviceDiscoveryPublisher.onComplete(); + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + + lb = defaultLb(connectionFactory); + + sendServiceDiscoveryEvents(upEvent("address-1")); + + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); + } + + for (int i = 0; i < timeAdvancementsTillHealthy; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + } + + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + + // 5 failed attempts trigger health check, 2 health check attempts fail, 3rd health check attempt + // uses the proper connection, final selection reuses that connection. 8 total creation attempts. + int expectedConnectionAttempts = 8; + assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(expectedConnectionAttempts)); + } + + // Concurrency test, run multiple times (at least 1000). + @Test + public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { + serviceDiscoveryPublisher.onComplete(); + + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + + lb = defaultLb(connectionFactory); + sendServiceDiscoveryEvents(upEvent("address-1")); + + // Imitate concurrency by running multiple threads attempting to establish connections. + ExecutorService executor = Executors.newFixedThreadPool(3); + try { + final Runnable runnable = () -> + assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + + for (int i = 0; i < 1000; i++) { + executor.submit(runnable); + } + + // From test main thread, wait until the host becomes UNHEALTHY, which is apparent from + // NoHostAvailableException being thrown from selection AFTER a health check was scheduled by any thread. + final Executor executorForRetries = io.servicetalk.concurrent.api.Executors.newFixedSizeExecutor(1); + try { + awaitIndefinitely(lb.selectConnection(any()).retryWhen(retryWithConstantBackoffFullJitter((t) -> + // DeliberateException comes from connection opening, check for that first + // Next, NoAvailableHostException is thrown when the host is unhealthy, + // but we still wait until the health check is scheduled and only then stop retrying. + t instanceof DeliberateException || testExecutor.scheduledTasksPending() == 0, + // try to prevent stack overflow + Duration.ofMillis(30), executorForRetries))); + } catch (Exception e) { + assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + } finally { + executorForRetries.closeAsync().toFuture().get(); + } + + // At this point, either the above selection caused the host to be marked as UNHEALTHY, + // or any background thread. We also know that a health check is pending to be executed. + // Now we can validate if there is just one health check happening and confirm that by asserting the host + // is not selected. If our assumption doesn't hold, it means more than one health check was scheduled. + for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + + // Assert still unhealthy + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + } + } finally { + // Shutdown the concurrent validation of unhealthiness. + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + + unhealthyHostConnectionFactory.advanceTime(testExecutor); + + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + @SuppressWarnings("unchecked") protected void sendServiceDiscoveryEvents(final ServiceDiscovererEvent... events) { sendServiceDiscoveryEvents(serviceDiscoveryPublisher, events); @@ -408,7 +605,12 @@ protected RoundRobinLoadBalancer newTestLoad protected RoundRobinLoadBalancer newTestLoadBalancer( final TestPublisher> serviceDiscoveryPublisher, final DelegatingConnectionFactory connectionFactory, final boolean eagerConnectionShutdown) { - return new RoundRobinLoadBalancer<>(serviceDiscoveryPublisher, connectionFactory, eagerConnectionShutdown); + return (RoundRobinLoadBalancer) + new RoundRobinLoadBalancerFactory.Builder() + .eagerConnectionShutdown(eagerConnectionShutdown) + .backgroundExecutor(testExecutor) + .build() + .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory); } @SafeVarargs @@ -510,4 +712,48 @@ boolean isClosed() { return closed.get(); } } + + protected static class UnhealthyHostConnectionFactory { + private final String failingHost; + private final AtomicInteger momentInTime = new AtomicInteger(); + final AtomicInteger requests = new AtomicInteger(); + final Single properConnection; + final List> connections; + + Function> factory = + new Function>() { + + @Override + public Single apply(final String s) { + return defer(() -> { + if (s.equals(failingHost)) { + requests.incrementAndGet(); + if (momentInTime.get() >= connections.size()) { + return properConnection; + } + return connections.get(momentInTime.get()); + } + return properConnection; + }); + } + }; + + UnhealthyHostConnectionFactory(final String failingHost, int timeAdvancementsTillHealthy, + Single properConnection) { + this.failingHost = failingHost; + this.connections = IntStream.range(0, timeAdvancementsTillHealthy) + .>mapToObj(__ -> failed(DELIBERATE_EXCEPTION)) + .collect(Collectors.toList()); + this.properConnection = properConnection; + } + + DelegatingConnectionFactory createFactory() { + return new DelegatingConnectionFactory(this.factory); + } + + void advanceTime(TestExecutor executor) { + momentInTime.incrementAndGet(); + executor.advanceTimeBy(1, SECONDS); + } + } }