Skip to content

Commit

Permalink
Add feature to gracefully shutdown connections in HttpClientFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
seonWKim committed Mar 6, 2024
1 parent 75d5af1 commit c9b85ba
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,30 @@ public ClientFactoryBuilder connectionPoolListener(
return this;
}

/**
* Sets the graceful connection shutdown drain duration. When the client factory is closed, it waits for
* the specified {@code duration} to allow pre-existing connections to be closed gracefully.
*/
public ClientFactoryBuilder connectionDrainDuration(Duration duration) {
requireNonNull(duration, "duration");
option(ClientFactoryOptions.CLIENT_CONNECTION_DRAIN_DURATION_MILLIS, duration.toMillis());
return this;
}

/**
* Sets the graceful connection shutdown drain duration in milliseconds. When the client factory is closed,
* it waits for the specified {@code duration} to allow pre-existing connections to be closed gracefully.
*/
public ClientFactoryBuilder clientConnectionDrainDurationMillis(
long clientConnectionDrainDurationMillis) {
checkArgument(clientConnectionDrainDurationMillis >= 0,
"clientConnectionDrainDurationMillis: %s (expected: >= 0)",
clientConnectionDrainDurationMillis);
option(ClientFactoryOptions.CLIENT_CONNECTION_DRAIN_DURATION_MILLIS,
clientConnectionDrainDurationMillis);
return this;
}

/**
* Sets the {@link MeterRegistry} which collects various stats.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ private static long clampedDefaultMaxClientConnectionAge() {
public static final ClientFactoryOption<ConnectionPoolListener> CONNECTION_POOL_LISTENER =
ClientFactoryOption.define("CONNECTION_POOL_LISTENER", ConnectionPoolListener.noop());

/**
* The client-side maximum allowed duration of a connection drain in milliseconds.
*/
public static final ClientFactoryOption<Long> CLIENT_CONNECTION_DRAIN_DURATION_MILLIS =
ClientFactoryOption.define("CLIENT_CONNECTION_DRAIN_DURATION_MILLIS",
Flags.defaultClientConnectionDrainDurationMillis());

/**
* The {@link MeterRegistry} which collects various stats.
*/
Expand Down Expand Up @@ -557,6 +564,13 @@ public ConnectionPoolListener connectionPoolListener() {
return get(CONNECTION_POOL_LISTENER);
}

/**
* Returns the client-side maximum allowed duration of a connection drain in milliseconds.
*/
public long clientConnectionDrainDurationMillis() {
return get(CLIENT_CONNECTION_DRAIN_DURATION_MILLIS);
}

/**
* Returns the {@link MeterRegistry} which collects various stats.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand Down Expand Up @@ -88,12 +90,12 @@ final class HttpClientFactory implements ClientFactory {

private static void setupTlsMetrics(List<X509Certificate> certificates, MeterRegistry registry) {
final MeterIdPrefix meterIdPrefix = new MeterIdPrefix("armeria.client");
try {
MoreMeterBinders.certificateMetrics(certificates, meterIdPrefix)
.bindTo(registry);
} catch (Exception ex) {
logger.warn("Failed to set up TLS certificate metrics: {}", certificates, ex);
}
try {
MoreMeterBinders.certificateMetrics(certificates, meterIdPrefix)
.bindTo(registry);
} catch (Exception ex) {
logger.warn("Failed to set up TLS certificate metrics: {}", certificates, ex);
}
}

private final EventLoopGroup workerGroup;
Expand All @@ -120,6 +122,9 @@ private static void setupTlsMetrics(List<X509Certificate> certificates, MeterReg
private final boolean useHttp2WithoutAlpn;
private final boolean useHttp1Pipelining;
private final ConnectionPoolListener connectionPoolListener;
private final long drainDurationMillis;
@Nullable
private ScheduledFuture<?> drainFuture;
private MeterRegistry meterRegistry;
private final ProxyConfigSelector proxyConfigSelector;
private final Http1HeaderNaming http1HeaderNaming;
Expand Down Expand Up @@ -200,6 +205,7 @@ private static void setupTlsMetrics(List<X509Certificate> certificates, MeterReg
useHttp2WithoutAlpn = options.useHttp2WithoutAlpn();
useHttp1Pipelining = options.useHttp1Pipelining();
connectionPoolListener = options.connectionPoolListener();
drainDurationMillis = options.clientConnectionDrainDurationMillis();
meterRegistry = options.meterRegistry();
proxyConfigSelector = options.proxyConfigSelector();
http1HeaderNaming = options.http1HeaderNaming();
Expand Down Expand Up @@ -425,8 +431,35 @@ public CompletableFuture<?> closeAsync() {
}

private void closeAsync(CompletableFuture<?> future) {
if (drainFuture != null) {
final boolean cancelled;
if (drainFuture.getDelay(TimeUnit.MICROSECONDS) > drainDurationMillis) {
// Maybe reschedule below.
cancelled = drainFuture.cancel(false);
} else {
cancelled = false;
}

if (cancelled) {
drainFuture = null;
} else {
// Drain is already scheduled to finish earlier.
return;
}
}

if (drainDurationMillis > 0) {
drainFuture = workerGroup.schedule(() -> finish(future),
drainDurationMillis, TimeUnit.MICROSECONDS);
} else {
finish(future);
}
}

private void finish(CompletableFuture<?> future) {
final List<CompletableFuture<?>> dependencies = new ArrayList<>(pools.size());
for (final Iterator<HttpChannelPool> i = pools.values().iterator(); i.hasNext();) {
// Close all channels gracefully just like Http2serverConnectionHandler
dependencies.add(i.next().closeAsync());
i.remove();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ final class DefaultFlagsProvider implements FlagsProvider {
static final long DEFAULT_MAX_SERVER_CONNECTION_AGE_MILLIS = 0; // Disabled
static final long DEFAULT_MAX_CLIENT_CONNECTION_AGE_MILLIS = 0; // Disabled
static final long DEFAULT_SERVER_CONNECTION_DRAIN_DURATION_MICROS = 1000000;
static final long DEFAULT_CLIENT_CONNECTION_DRAIN_DURATION_MILLIS = 1000;
static final int DEFAULT_HTTP2_INITIAL_CONNECTION_WINDOW_SIZE = 1024 * 1024; // 1MiB
static final int DEFAULT_HTTP2_INITIAL_STREAM_WINDOW_SIZE = 1024 * 1024; // 1MiB
static final int DEFAULT_HTTP2_MAX_FRAME_SIZE = 16384; // From HTTP/2 specification
Expand Down Expand Up @@ -299,6 +300,11 @@ public Long defaultServerConnectionDrainDurationMicros() {
return DEFAULT_SERVER_CONNECTION_DRAIN_DURATION_MICROS;
}

@Override
public Long defaultClientConnectionDrainDurationMillis() {
return DEFAULT_CLIENT_CONNECTION_DRAIN_DURATION_MILLIS;
}

@Override
public Integer defaultHttp2InitialConnectionWindowSize() {
return DEFAULT_HTTP2_INITIAL_CONNECTION_WINDOW_SIZE;
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/Flags.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ private static boolean validateTransportType(TransportType transportType, String
getValue(FlagsProvider::defaultServerConnectionDrainDurationMicros,
"defaultServerConnectionDrainDurationMicros", value -> value >= 0);

private static final long DEFAULT_CLIENT_CONNECTION_DRAIN_DURATION_MILLIS =
getValue(FlagsProvider::defaultClientConnectionDrainDurationMillis,
"defaultClientConnectionDrainDurationMillis", value -> value >= 0);

private static final int DEFAULT_HTTP2_INITIAL_CONNECTION_WINDOW_SIZE =
getValue(FlagsProvider::defaultHttp2InitialConnectionWindowSize,
"defaultHttp2InitialConnectionWindowSize", value -> value > 0);
Expand Down Expand Up @@ -980,6 +984,24 @@ public static long defaultServerConnectionDrainDurationMicros() {
return DEFAULT_SERVER_CONNECTION_DRAIN_DURATION_MICROS;
}

/**
* Returns the default client-side graceful connection shutdown drain duration in milliseconds.
* If the value of this flag is greater than {@code 0}, a connection shutdown will have a drain period.
*
* <p>The default value of this flag is
* {@value DefaultFlagsProvider#DEFAULT_CLIENT_CONNECTION_DRAIN_DURATION_MILLIS}.
* Specify the {@code -Dcom.linecorp.armeria.defaultClientConnectionDrainDurationMillis=<long>}
* JVM option to override the default value.
* After the drain period end client will close all the connections.
* </p>
*
* @see ClientFactoryBuilder#connectionDrainDuration(Duration)
* @see ClientFactoryBuilder#clientConnectionDrainDurationMillis(long)
*/
public static long defaultClientConnectionDrainDurationMillis() {
return DEFAULT_CLIENT_CONNECTION_DRAIN_DURATION_MILLIS;
}

/**
* Returns the default value of the {@link ServerBuilder#http2InitialConnectionWindowSize(int)} and
* {@link ClientFactoryBuilder#http2InitialConnectionWindowSize(int)} option.
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/FlagsProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,23 @@ default Integer defaultMaxClientNumRequestsPerConnection() {
return null;
}

/**
* Returns the graceful connection shutdown drain duration. When the client factory is closed, it waits for
* the specified {@code duration} to allow pre-existing connections to be closed gracefully.
*
* <p></p>Note that this flag has no effect if a user specified the value explicitly via
* {@link ClientFactoryBuilder#clientConnectionDrainDurationMillis(long)}.
*
* <p>The default value of this flag is
* {@value DefaultFlagsProvider#DEFAULT_CLIENT_CONNECTION_DRAIN_DURATION_MILLIS}.
* Specify the {@code -Dcom.linecorp.armeria.defaultClientConnectionDrainDurationMillis=<long>} JVM option
* to override the default value. {@code 0} disables the drain duration.
*/
@Nullable
default Long defaultClientConnectionDrainDurationMillis() {
return null;
}

/**
* Returns the default server-side max age of a connection for keep-alive in milliseconds.
* If the value of this flag is greater than {@code 0}, a connection is disconnected after the specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,17 @@ void defaultKeepAliveOnPingSet() {
assertThat(factory.options().keepAliveOnPing()).isTrue();
}
}

@Test
void clientConnectionDrainDurationMillis() {
try (ClientFactory factory = ClientFactory.builder().build()) {
assertThat(factory.options().clientConnectionDrainDurationMillis()).isEqualTo(1000);
}

try (ClientFactory factory = ClientFactory.builder()
.clientConnectionDrainDurationMillis(500)
.build()) {
assertThat(factory.options().clientConnectionDrainDurationMillis()).isEqualTo(500);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public Long defaultServerConnectionDrainDurationMicros() {
return 500L;
}

@Override
public Long defaultClientConnectionDrainDurationMillis() {
return 500L;
}

@Override
public String routeCacheSpec() {
return "off";
Expand Down

0 comments on commit c9b85ba

Please sign in to comment.