Skip to content

Commit

Permalink
RetryingHttpRequesterFilter: don't wait for LB if it's unhealthy (a…
Browse files Browse the repository at this point in the history
…pple#2648)

Motivation:

Today, `NoAvailableHostException` can be thrown when either LB is not
ready yet or when it's already ready, but all hosts turned into
unhealthy state. In this case, there is no need to wait for "ready"
event, because the onHostsAvailable completable is already completed.
There is a risk to never exit from the retry loop if
`RetryingHttpRequesterFilter` is configured with
`maxTotalRetries(Integer.MAX_VALUE)`.

Modifications:

- Introduce `NoActiveHostException` to avoid interacting with
`LoadBalancerReadySubscriber` when LB turns into unhealthy state;
- Add tests to verify expected behavior;
- Adjust `RoundRobinLoadBalancerTest`;

Result:

`RetryingHttpRequesterFilter` doesn't involve
`LoadBalancerReadySubscriber` when LB is unhealthy.
  • Loading branch information
idelpivnitskiy authored Jul 12, 2023
1 parent 140abce commit c30e076
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.client.api;

import io.servicetalk.transport.api.RetryableException;

import java.io.IOException;

/**
* Thrown when no host is active to establish a new connection.
*/
public class NoActiveHostException extends IOException implements RetryableException {

private static final long serialVersionUID = -4764627055167224323L;

/**
* Creates a new instance.
*
* @param message the detail message.
*/
public NoActiveHostException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
*/
public final class RetryingHttpRequesterFilter
implements StreamingHttpClientFilterFactory, ExecutionStrategyInfluencer<HttpExecutionStrategy> {
private static final int DEFAULT_MAX_TOTAL_RETRIES = 4;
static final int DEFAULT_MAX_TOTAL_RETRIES = 4;
private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES =
new RetryingHttpRequesterFilter(true, false, false, 1, null,
(__, ___) -> NO_RETRIES);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.DeliberateException;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.BlockingHttpClient;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpServerContext;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;

import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.net.InetSocketAddress;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.http.netty.BuilderUtils.newClientBuilder;
import static io.servicetalk.http.netty.BuilderUtils.newServerBuilder;
import static java.lang.Integer.MAX_VALUE;
import static org.junit.jupiter.api.Assertions.assertThrows;

class LoadBalancerUnhealthyTest {

@RegisterExtension
static final ExecutionContextExtension SERVER_CTX =
ExecutionContextExtension.cached("server-io", "server-executor")
.setClassLevel(true);
@RegisterExtension
static final ExecutionContextExtension CLIENT_CTX =
ExecutionContextExtension.cached("client-io", "client-executor")
.setClassLevel(true);

@ParameterizedTest(name = "{displayName} [{index}] protocol={0}")
@EnumSource(HttpProtocol.class)
void doesNotRetryIndefinitely(HttpProtocol protocol) throws Exception {
try (HttpServerContext serverContext = newServerBuilder(SERVER_CTX, protocol)
.listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok());
BlockingHttpClient client = newClientBuilder(serverContext, CLIENT_CTX, protocol)
// Intentionally set maxTotalRetries to MAX_VALUE
.appendClientFilter(new RetryingHttpRequesterFilter.Builder().maxTotalRetries(MAX_VALUE).build())
.appendConnectionFactoryFilter(factory -> new DelegatingConnectionFactory<InetSocketAddress,
FilterableStreamingHttpConnection>(factory) {
@Override
public Single<FilterableStreamingHttpConnection> newConnection(
InetSocketAddress address, @Nullable ContextMap context,
@Nullable TransportObserver observer) {
return Single.failed(DELIBERATE_EXCEPTION);
}
})
.buildBlocking()) {
// Turn LoadBalancer into "unhealthy" state:
for (int i = 0; i < 5; i++) {
assertThrows(DeliberateException.class, () -> client.request(client.get("/")));
}
assertThrows(NoActiveHostException.class, () -> client.request(client.get("/")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.client.api.RetryableConnectException;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.api.BiIntFunction;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.TestCompletable;
import io.servicetalk.concurrent.api.TestPublisher;
Expand Down Expand Up @@ -49,6 +51,7 @@
import static io.servicetalk.http.api.HttpRequestMethod.GET;
import static io.servicetalk.http.api.StreamingHttpRequests.newRequest;
import static io.servicetalk.http.netty.RetryingHttpRequesterFilter.BackOffPolicy.ofNoRetries;
import static io.servicetalk.http.netty.RetryingHttpRequesterFilter.DEFAULT_MAX_TOTAL_RETRIES;
import static io.servicetalk.http.netty.RetryingHttpRequesterFilter.disableAutoRetries;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -68,6 +71,8 @@ class RetryingHttpRequesterFilterAutoRetryStrategiesTest {
new RetryableConnectException("deliberate exception");
private static final NoAvailableHostException NO_AVAILABLE_HOST =
new NoAvailableHostException("deliberate exception");
private static final NoActiveHostException NO_ACTIVE_HOST =
new NoActiveHostException("deliberate exception");
private static final UnknownHostException UNKNOWN_HOST_EXCEPTION =
new UnknownHostException("deliberate exception");

Expand Down Expand Up @@ -242,6 +247,27 @@ void defaultForNoAvailableHostWhenServiceDiscovererTerminated(boolean offloading
verifyRetryResultCompleted();
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void noActiveHostException(boolean offloading) {
final ContextAwareRetryingHttpClientFilter filter = newFilter(new RetryingHttpRequesterFilter.Builder()
.maxTotalRetries(Integer.MAX_VALUE), offloading);
lbEvents.onNext(LOAD_BALANCER_READY_EVENT); // LB is ready before subscribing to the response
BiIntFunction<Throwable, Completable> retryStrategy =
filter.retryStrategy(REQUEST_META_DATA, filter.executionContext());
for (int i = 1; i <= DEFAULT_MAX_TOTAL_RETRIES * 2; i++) {
Completable retry = retryStrategy.apply(i, NO_ACTIVE_HOST);
TestCompletableSubscriber subscriber = new TestCompletableSubscriber();
toSource(retry).subscribe(subscriber);
assertThat("Unexpected subscribe for SD errors.", sdStatus.isSubscribed(), is(false));
if (i < DEFAULT_MAX_TOTAL_RETRIES) {
subscriber.awaitOnComplete();
} else {
assertThat(subscriber.awaitOnError(), is(NO_ACTIVE_HOST));
}
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void maxRetriesAreHonored(boolean offloading) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2022 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import io.servicetalk.client.api.ConnectionRejectedException;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.PublisherSource.Processor;
Expand Down Expand Up @@ -554,7 +555,7 @@ private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final
subscribeToEvents(true);
}
}
return failed(StacklessNoAvailableHostException.newInstance("Failed to pick an active host for " +
return failed(StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts,
RoundRobinLoadBalancer.class, "selectConnection0(...)"));
}
Expand Down Expand Up @@ -1102,6 +1103,24 @@ public static StacklessNoAvailableHostException newInstance(String message, Clas
}
}

private static final class StacklessNoActiveHostException extends NoActiveHostException {

private static final long serialVersionUID = 7500474499335155869L;

private StacklessNoActiveHostException(final String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
return this;
}

public static StacklessNoActiveHostException newInstance(String message, Class<?> clazz, String method) {
return ThrowableUtils.unknownStackTrace(new StacklessNoActiveHostException(message), clazz, method);
}
}

private static final class StacklessConnectionRejectedException extends ConnectionRejectedException {
private static final long serialVersionUID = -4940708893680455819L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ConnectionRejectedException;
import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Executors;
Expand Down Expand Up @@ -223,7 +224,7 @@ void handleDiscoveryEventsForExpiredHostBecomingAvailable() throws Exception {
final Predicate<TestLoadBalancedConnection> createNewConnection = alwaysNewConnectionFilter();
Exception e = assertThrows(ExecutionException.class, () ->
lb.selectConnection(createNewConnection, null).toFuture().get());
assertThat(e.getCause(), instanceOf(NoAvailableHostException.class));
assertThat(e.getCause(), instanceOf(NoActiveHostException.class));

// When the host becomes available again, new connections can be created
sendServiceDiscoveryEvents(upEvent("address-1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancerReadyEvent;
import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
Expand Down Expand Up @@ -678,7 +679,7 @@ void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception {
// try to prevent stack overflow
ofMillis(30), executorForRetries)));
} catch (Exception e) {
assertThat(e.getCause(), instanceOf(NoAvailableHostException.class));
assertThat(e.getCause(), instanceOf(NoActiveHostException.class));
} finally {
executorForRetries.closeAsync().toFuture().get();
}
Expand All @@ -691,7 +692,7 @@ void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception {
unhealthyHostConnectionFactory.advanceTime(testExecutor);

// Assert still unhealthy
assertSelectThrows(instanceOf(NoAvailableHostException.class));
assertSelectThrows(instanceOf(NoActiveHostException.class));
}
} finally {
// Shutdown the concurrent validation of unhealthiness.
Expand Down Expand Up @@ -739,7 +740,7 @@ void resubscribeToEventsWhenAllHostsAreUnhealthy(boolean sdReturnsDelta) throws
// Assert the next select attempt after resubscribe internal triggers re-subscribe
testExecutor.advanceTimeBy(DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.toMillis() * 2, MILLISECONDS);
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2));
assertSelectThrows(instanceOf(NoAvailableHostException.class));
assertSelectThrows(instanceOf(NoActiveHostException.class));
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(3));

// Verify state after re-subscribe
Expand Down Expand Up @@ -796,7 +797,7 @@ void resubscribeToEventsNotTriggeredWhenDisabled() throws Exception {
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2));
testExecutor.advanceTimeBy(DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.toMillis() * 2, MILLISECONDS);
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2));
assertSelectThrows(instanceOf(NoAvailableHostException.class));
assertSelectThrows(instanceOf(NoActiveHostException.class));
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2));
}

Expand Down

0 comments on commit c30e076

Please sign in to comment.