Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spike on fluent ConnectionFactory configuration API #1242

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ public ConnectionFactory setVirtualHost(String virtualHost) {
public ConnectionFactory setUri(URI uri)
throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
{
if ("amqp".equals(uri.getScheme().toLowerCase())) {
if ("amqp".equalsIgnoreCase(uri.getScheme())) {
// nothing special to do
} else if ("amqps".equals(uri.getScheme().toLowerCase())) {
} else if ("amqps".equalsIgnoreCase(uri.getScheme())) {
setPort(DEFAULT_AMQP_OVER_SSL_PORT);
// SSL context factory not set yet, we use the default one
if (this.sslContextFactory == null) {
Expand Down Expand Up @@ -1253,7 +1253,7 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
ConnectionParams params = params(executor);
// set client-provided via a client property
if (clientProvidedName != null) {
Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
Map<String, Object> properties = new HashMap<>(params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}
Expand All @@ -1277,16 +1277,14 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
conn.start();
this.metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
} catch (IOException | TimeoutException e) {
lastException = e;
} catch (TimeoutException te) {
lastException = te;
}
}
if (lastException != null) {
if (lastException instanceof IOException) {
throw (IOException) lastException;
} else if (lastException instanceof TimeoutException) {
} else {
throw (TimeoutException) lastException;
}
}
Expand Down Expand Up @@ -1762,4 +1760,8 @@ public ConnectionFactory setTrafficListener(TrafficListener trafficListener) {
this.trafficListener = trafficListener;
return this;
}

public static ConnectionFactoryConfiguration configure() {
return null;
}
}
207 changes: 207 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactoryConfiguration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package com.rabbitmq.client;

import com.rabbitmq.client.impl.CredentialsProvider;
import com.rabbitmq.client.impl.CredentialsRefreshService;
import com.rabbitmq.client.impl.ErrorOnWriteListener;
import com.rabbitmq.client.impl.nio.ByteBufferFactory;
import com.rabbitmq.client.impl.nio.NioContext;
import com.rabbitmq.client.impl.nio.NioQueue;
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
import com.rabbitmq.client.observation.ObservationCollector;
import java.net.HttpURLConnection;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.net.SocketFactory;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;

public interface ConnectionFactoryConfiguration {

ConnectionFactoryConfiguration host(String name);
ConnectionFactoryConfiguration port(int port);
ConnectionFactoryConfiguration username(String username);
ConnectionFactoryConfiguration password(String username);
ConnectionFactoryConfiguration virtualHost(String virtualHost);
ConnectionFactoryConfiguration uri(URI uri);
ConnectionFactoryConfiguration uri(String uri);

ConnectionFactoryConfiguration requestedChannelMax(int requestedChannelMax);
ConnectionFactoryConfiguration requestedFrameMax(int requestedFrameMax);
ConnectionFactoryConfiguration requestedHeartbeat(Duration heartbeat);
ConnectionFactoryConfiguration connectionTimeout(Duration timeout);
ConnectionFactoryConfiguration handshakeTimeout(Duration timeout);
ConnectionFactoryConfiguration shutdownTimeout(Duration timeout);
ConnectionFactoryConfiguration channelRpcTimeout(Duration timeout);

ConnectionFactoryConfiguration maxInboundMessageBodySize(int maxInboundMessageBodySize);
ConnectionFactoryConfiguration channelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType);
ConnectionFactoryConfiguration workPoolTimeout(Duration timeout);

ConnectionFactoryConfiguration errorOnWriteListener(ErrorOnWriteListener errorOnWriteListener);

ConnectionFactoryConfiguration trafficListener(TrafficListener trafficListener);

// TODO provide helper for client properties
ConnectionFactoryConfiguration clientProperties(Map<String, Object> clientProperties);
ConnectionFactoryConfiguration clientProperty(String name, Object value);

ConnectionFactoryConfiguration saslConfig(SaslConfig saslConfig);

ConnectionFactoryConfiguration socketFactory(SocketFactory socketFactory);

ConnectionFactoryConfiguration socketConfigurator(SocketConfigurator socketConfigurator);

ConnectionFactoryConfiguration sharedExecutor(ExecutorService executorService);
ConnectionFactoryConfiguration shutdownExecutor(ExecutorService executorService);
ConnectionFactoryConfiguration heartbeatExecutor(ExecutorService executorService);
ConnectionFactoryConfiguration threadFactory(ThreadFactory threadFactory);

ConnectionFactoryConfiguration exceptionHandler(ExceptionHandler exceptionHandler);

ConnectionFactoryConfiguration metricsCollector(MetricsCollector metricsCollector);
ConnectionFactoryConfiguration observationCollector(ObservationCollector observationCollector);

// TODO special configuration for credentials, especially for OAuth?
ConnectionFactoryConfiguration credentialsProvider(CredentialsProvider credentialsProvider);
ConnectionFactoryConfiguration credentialsRefreshService(CredentialsRefreshService credentialsRefreshService);

NioConfiguration nio();

TlsConfiguration tls();

OAuth2Configuration oauth2();

ConnectionFactory create();

interface NioConfiguration {

NioConfiguration readByteBufferSize(int readByteBufferSize);

NioConfiguration writeByteBufferSize(int writeByteBufferSize);

NioConfiguration nbIoThreads(int nbIoThreads);

NioConfiguration writeEnqueuingTimeout(Duration writeEnqueuingTimeout);

NioConfiguration writeQueueCapacity(int writeQueueCapacity);

NioConfiguration executor(ExecutorService executorService);

NioConfiguration threadFactory(ThreadFactory threadFactory);

NioConfiguration socketChannelConfigurator(SocketChannelConfigurator configurator);

NioConfiguration sslEngineConfigurator(SslEngineConfigurator configurator);

NioConfiguration connectionShutdownExecutor(ExecutorService executorService);

NioConfiguration byteBufferFactory(ByteBufferFactory byteBufferFactory);

NioConfiguration writeQueueFactory(Function<NioContext, NioQueue> writeQueueFactory);

ConnectionFactoryConfiguration configuration();


}

interface TlsConfiguration {

TlsConfiguration hostnameVerification();

TlsConfiguration hostnameVerification(boolean hostnameVerification);

TlsConfiguration sslContextFactory(SslContextFactory sslContextFactory);

TlsConfiguration protocol(String protocol);

TlsConfiguration trustManager(TrustManager trustManager);

TlsConfiguration trustEverything();

TlsConfiguration sslContext(SSLContext sslContext);

ConnectionFactoryConfiguration configuration();

}

interface RecoveryConfiguration {

RecoveryConfiguration enableConnectionRecovery();
RecoveryConfiguration enableConnectionRecovery(boolean connectionRecovery);

RecoveryConfiguration enableTopologyRecovery();
RecoveryConfiguration enableTopologyRecovery(boolean connectionRecovery);

RecoveryConfiguration topologyRecoveryExecutor(ExecutorService executorService);

RecoveryConfiguration recoveryInterval(Duration interval);

RecoveryConfiguration recoveryDelayHandler(RecoveryDelayHandler recoveryDelayHandler);

RecoveryConfiguration topologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter);

RecoveryConfiguration recoveryTriggeringCondition(Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition);

RecoveryConfiguration recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier);

ConnectionFactoryConfiguration configuration();
}

interface OAuth2Configuration {

OAuth2Configuration tokenEndpointUri(String tokenEndpointUri);

OAuth2Configuration clientId(String clientId);

OAuth2Configuration clientSecret(String clientSecret);

OAuth2Configuration grantType(String grantType);

OAuth2Configuration parameter(String name, String value);

OAuth2Configuration connectionConfigurator(Consumer<HttpURLConnection> connectionConfigurator);

OAuth2TlsConfiguration tls();

OAuth2CredentialsRefreshConfiguration refresh();

ConnectionFactoryConfiguration configuration();
}

interface OAuth2TlsConfiguration {

OAuth2TlsConfiguration hostnameVerifier(HostnameVerifier hostnameVerifier);

OAuth2TlsConfiguration sslSocketFactory(SSLSocketFactory sslSocketFactory);

OAuth2TlsConfiguration sslContext(SSLContext sslContext);

OAuth2TlsConfiguration trustEverything();

OAuth2Configuration oauth2();

}

interface OAuth2CredentialsRefreshConfiguration {

OAuth2CredentialsRefreshConfiguration refreshDelayStrategy(Function<Duration, Duration> refreshDelayStrategy);

OAuth2CredentialsRefreshConfiguration approachingExpirationStrategy(Function<Duration, Boolean> approachingExpirationStrategy);

OAuth2CredentialsRefreshConfiguration scheduler(ScheduledThreadPoolExecutor scheduler);

OAuth2Configuration oauth2();

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.rabbitmq.client;

import com.rabbitmq.client.impl.CredentialsProvider;
import com.rabbitmq.client.impl.CredentialsRefreshService;
import com.rabbitmq.client.impl.DefaultCredentialsRefreshService;
import com.rabbitmq.client.impl.OAuth2ClientCredentialsGrantCredentialsProvider;
import com.rabbitmq.client.impl.nio.NioParams;

import javax.net.ssl.SSLContext;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

import static com.rabbitmq.client.impl.DefaultCredentialsRefreshService.ratioRefreshDelayStrategy;

public class ConnectionFactoryConfigurationDemo {

public static void main(String[] args) throws Exception {
SSLContext sslContext = SSLContext.getDefault();

// historical configuration with ConnectionFactory setters
ConnectionFactory cf = new ConnectionFactory();
cf.setUri("amqp://rabbitmq-1:5672/foo");
cf.setChannelRpcTimeout(10_000); // unit?
Map<String, Object> clientProperties = Collections.singletonMap("foo", "bar");
cf.setClientProperties(clientProperties);
cf.useSslProtocol("TLSv1.3", new TrustEverythingTrustManager());
NioParams nioParams = new NioParams();
nioParams.setNbIoThreads(4);
cf.setNioParams(nioParams);

CredentialsProvider credentialsProvider =
new OAuth2ClientCredentialsGrantCredentialsProvider.OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
.tokenEndpointUri("http://localhost:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.tls()
.sslContext(sslContext)
.builder()
.build();
cf.setCredentialsProvider(credentialsProvider);
CredentialsRefreshService refreshService =
new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder()
.refreshDelayStrategy(ratioRefreshDelayStrategy(0.8))
.build();
cf.setCredentialsRefreshService(refreshService);

// configuration with new configuration API
ConnectionFactory.configure()
.uri("amqp://rabbitmq-1:5672/foo")
.channelRpcTimeout(Duration.ofSeconds(10)) // Duration class instead of int
.clientProperty("foo", "bar")
.tls() // TLS configuration API
.protocol("TLSv1.3")
.trustEverything()
.configuration() // back to main configuration
.nio() // NIO configuration API
.nbIoThreads(4)
.configuration() // back to main configuration
.oauth2() // OAuth 2 configuration API
.tokenEndpointUri("http://localhost:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.tls() // OAuth 2 TLS
.sslContext(sslContext)
.oauth2()
.refresh() // OAuth refresh configuration
.refreshDelayStrategy(ratioRefreshDelayStrategy(0.8))
.oauth2()
.configuration() // back to main configuration
.create();
}

}