Skip to content

Commit

Permalink
feat: support configurable keepalive for managed channel. (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Jan 14, 2025
1 parent f2cce5e commit be15f41
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 64 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,6 +57,10 @@ static OxiaClientBuilder create(String serviceAddress) {

OxiaClientBuilder maxConnectionPerNode(int connections);

OxiaClientBuilder connectionKeepAliveTimeout(Duration connectionKeepAliveTimeout);

OxiaClientBuilder connectionKeepAliveTime(Duration connectionKeepAlive);

/**
* Configure the authentication plugin and its parameters.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,23 +78,15 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
final var oxiaBackoffProvider =
OxiaBackoffProvider.create(
config.connectionBackoffMinDelay(), config.connectionBackoffMaxDelay());
var stubManager =
new OxiaStubManager(
config.authentication(),
config.enableTls(),
oxiaBackoffProvider,
config.maxConnectionPerNode());
var stubManager = new OxiaStubManager(config, oxiaBackoffProvider);

var instrumentProvider = new InstrumentProvider(config.openTelemetry(), config.namespace());
var serviceAddrStub = stubManager.getStub(config.serviceAddress());
var shardManager =
new ShardManager(executor, serviceAddrStub, instrumentProvider, config.namespace());
var notificationManager =
new NotificationManager(executor, stubManager, shardManager, instrumentProvider);

OxiaStubProvider stubProvider =
new OxiaStubProvider(config.namespace(), stubManager, shardManager);

final var stubProvider = new OxiaStubProvider(config.namespace(), stubManager, shardManager);
shardManager.addCallback(notificationManager);
var readBatchManager =
BatchManager.newReadBatchManager(config, stubProvider, instrumentProvider);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,4 +35,6 @@ public record ClientConfig(
boolean enableTls,
@NonNull Duration connectionBackoffMinDelay,
@NonNull Duration connectionBackoffMaxDelay,
Duration connectionKeepAliveTime,
Duration connectionKeepAliveTimeout,
int maxConnectionPerNode) {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,9 @@ public class OxiaClientBuilderImpl implements OxiaClientBuilder {
@NonNull protected Duration connectionBackoffMinDelay = Duration.ofMillis(100);
@NonNull protected Duration connectionBackoffMaxDelay = Duration.ofSeconds(30);

protected Duration connectionKeepAliveTime = Duration.ofSeconds(10);
protected Duration connectionKeepAliveTimeout = Duration.ofSeconds(5);

protected int maxConnectionsPerNode = DefaultMaxConnectionPerNode;

@Override
Expand Down Expand Up @@ -164,6 +167,18 @@ public OxiaClientBuilder maxConnectionPerNode(int connections) {
return this;
}

@Override
public OxiaClientBuilder connectionKeepAliveTimeout(Duration connectionKeepAliveTimeout) {
this.connectionKeepAliveTimeout = connectionKeepAliveTimeout;
return this;
}

@Override
public OxiaClientBuilder connectionKeepAliveTime(Duration keepAliveTime) {
this.connectionKeepAliveTime = keepAliveTime;
return this;
}

@Override
public OxiaClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
Expand Down Expand Up @@ -238,23 +253,27 @@ public OxiaClientBuilder loadConfig(Properties properties) {

@Override
public @NonNull CompletableFuture<AsyncOxiaClient> asyncClient() {
var config =
new ClientConfig(
serviceAddress,
requestTimeout,
batchLinger,
maxRequestsPerBatch,
DefaultMaxBatchSize,
sessionTimeout,
clientIdentifierSupplier.get(),
openTelemetry,
namespace,
authentication,
enableTls,
connectionBackoffMinDelay,
connectionBackoffMaxDelay,
maxConnectionsPerNode);
return AsyncOxiaClientImpl.newInstance(config);
return AsyncOxiaClientImpl.newInstance(getClientConfig());
}

public ClientConfig getClientConfig() {
return new ClientConfig(
serviceAddress,
requestTimeout,
batchLinger,
maxRequestsPerBatch,
DefaultMaxBatchSize,
sessionTimeout,
clientIdentifierSupplier.get(),
openTelemetry,
namespace,
authentication,
enableTls,
connectionBackoffMinDelay,
connectionBackoffMaxDelay,
connectionKeepAliveTime,
connectionKeepAliveTimeout,
maxConnectionsPerNode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,11 +25,13 @@
import io.grpc.ManagedChannel;
import io.grpc.TlsChannelCredentials;
import io.grpc.internal.BackoffPolicy;
import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.api.Authentication;
import io.streamnative.oxia.proto.OxiaClientGrpc;

import java.lang.reflect.Field;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

import lombok.NonNull;
Expand All @@ -56,14 +58,15 @@ static ChannelCredentials getChannelCredential(String address, boolean tlsEnable

public OxiaStub(
String address,
@Nullable Authentication authentication,
boolean enableTls,
ClientConfig clientConfig,
@Nullable BackoffPolicy.Provider backoffProvider) {

this(Grpc.newChannelBuilder(getAddress(address), getChannelCredential(address, enableTls))
this(Grpc.newChannelBuilder(getAddress(address), getChannelCredential(address, clientConfig.enableTls()))
.keepAliveTime(clientConfig.connectionKeepAliveTime().toMillis(), MILLISECONDS)
.keepAliveTimeout(clientConfig.connectionKeepAliveTimeout().toMillis(), MILLISECONDS)
.keepAliveWithoutCalls(true)
.directExecutor()
.build(),
authentication, backoffProvider);
.build(), clientConfig.authentication(), backoffProvider);
}

public OxiaStub(ManagedChannel channel) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,30 +17,22 @@

import com.google.common.annotations.VisibleForTesting;
import io.grpc.internal.BackoffPolicy;
import io.streamnative.oxia.client.api.Authentication;
import io.streamnative.oxia.client.ClientConfig;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

public class OxiaStubManager implements AutoCloseable {
@VisibleForTesting final Map<Key, OxiaStub> stubs = new ConcurrentHashMap<>();

@Nullable private final Authentication authentication;
private final boolean enableTls;
@Nullable private final BackoffPolicy.Provider backoffProvider;

private final BackoffPolicy.Provider backoffProvider;
private final int maxConnectionPerNode;
private final ClientConfig clientConfig;

public OxiaStubManager(
@Nullable Authentication authentication,
boolean enableTls,
@Nullable BackoffPolicy.Provider backoffProvider,
int maxConnectionPerNode) {
this.authentication = authentication;
this.enableTls = enableTls;
public OxiaStubManager(ClientConfig clientConfig, BackoffPolicy.Provider backoffProvider) {
this.backoffProvider = backoffProvider;
this.maxConnectionPerNode = maxConnectionPerNode;
this.clientConfig = clientConfig;
this.maxConnectionPerNode = clientConfig.maxConnectionPerNode();
}

public OxiaStub getStub(String address) {
Expand All @@ -50,8 +42,7 @@ public OxiaStub getStub(String address) {
modKey += maxConnectionPerNode;
}
return stubs.computeIfAbsent(
new Key(address, modKey),
key -> new OxiaStub(key.address, authentication, enableTls, backoffProvider));
new Key(address, modKey), key -> new OxiaStub(key.address, clientConfig, backoffProvider));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import io.streamnative.oxia.client.auth.TokenAuthentication;
import java.time.Duration;
import java.util.Properties;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class OxiaClientBuilderTest {
Expand Down Expand Up @@ -132,4 +133,15 @@ void loadConfigFromFile() {
String token = metadata.get(Metadata.Key.of("Authorization", ASCII_STRING_MARSHALLER));
assertThat(token).isEqualTo("Bearer 1234");
}

@Test
void connectionKeepAlive() {
final var keepAliveTime = Duration.ofMillis(10);
final var keepAliveTimeout = Duration.ofMillis(10);
builder.connectionKeepAliveTime(keepAliveTime);
builder.connectionKeepAliveTimeout(keepAliveTimeout);
final var impl = (OxiaClientBuilderImpl) builder;
Assertions.assertEquals(keepAliveTimeout, impl.connectionKeepAliveTimeout);
Assertions.assertEquals(keepAliveTime, impl.connectionKeepAliveTime);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -104,6 +104,8 @@ class BatchTest {
authentication != null,
Duration.ofMillis(100),
Duration.ofSeconds(30),
Duration.ofSeconds(10),
Duration.ofSeconds(5),
1);

private final OxiaClientImplBase serviceImpl =
Expand Down Expand Up @@ -498,6 +500,8 @@ class FactoryTests {
false,
Duration.ofMillis(100),
Duration.ofSeconds(30),
Duration.ofSeconds(10),
Duration.ofSeconds(5),
1);

@Nested
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,6 +66,8 @@ class BatcherTest {
false,
Duration.ofMillis(100),
Duration.ofSeconds(30),
Duration.ofSeconds(10),
Duration.ofSeconds(5),
1);

BatchedArrayBlockingQueue<Operation<?>> queue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,8 @@
*/
package io.streamnative.oxia.client.grpc;

import static io.streamnative.oxia.client.util.ConfigUtils.*;

import io.grpc.InsecureChannelCredentials;
import io.grpc.TlsChannelCredentials;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -52,9 +54,9 @@ public enum BackoffType {
public void testOxiaReconnectBackoff(BackoffType type) throws Exception {
final OxiaStubManager stubManager;
if (type == BackoffType.Oxia) {
stubManager = new OxiaStubManager(null, false, OxiaBackoffProvider.DEFAULT, 1);
stubManager = new OxiaStubManager(getDefaultClientConfig(), OxiaBackoffProvider.DEFAULT);
} else {
stubManager = new OxiaStubManager(null, false, null, 1);
stubManager = new OxiaStubManager(getDefaultClientConfig(), null);
}

final OxiaStub stub = stubManager.getStub(oxia.getServiceAddress());
Expand Down Expand Up @@ -131,9 +133,12 @@ public void onCompleted() {
@SneakyThrows
public void testMaxConnectionPerNode() {
final var maxConnectionPerNode = 10;
@Cleanup
var stubManager =
new OxiaStubManager(null, false, OxiaBackoffProvider.DEFAULT, maxConnectionPerNode);
final var clientConfig =
getDefaultClientConfig(
builder -> {
builder.maxConnectionPerNode(maxConnectionPerNode);
});
@Cleanup var stubManager = new OxiaStubManager(clientConfig, OxiaBackoffProvider.DEFAULT);
for (int i = 0; i < 1000; i++) {
stubManager.getStub(oxia.getServiceAddress());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,6 +82,8 @@ void setup() throws IOException {
false,
Duration.ofMillis(100),
Duration.ofSeconds(30),
Duration.ofSeconds(10),
Duration.ofSeconds(5),
1);

String serverName = InProcessServerBuilder.generateName();
Expand Down
Loading

0 comments on commit be15f41

Please sign in to comment.