Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proxy_random to it.cluster.adapters #32371

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.e2e.env.container.atomic;

import java.util.Collection;

/**
* Combo IT container.
*/
public interface ComboITContainer extends ITContainer {

/**
* Get containers.
*
* @return containers
*/
Collection<ITContainer> getContainers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public final class ITContainers implements Startable {
* @return registered container
*/
public <T extends ITContainer> T registerContainer(final T container) {
if (container instanceof EmbeddedITContainer) {
if (container instanceof ComboITContainer) {
((ComboITContainer) container).getContainers().forEach(this::registerContainer);
} else if (container instanceof EmbeddedITContainer) {
embeddedContainers.add((EmbeddedITContainer) container);
} else {
DockerITContainer dockerContainer = (DockerITContainer) container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.config.AdaptorContainerConfiguration;
import org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.impl.ShardingSphereJdbcContainer;
import org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.impl.ShardingSphereMultiProxyClusterContainer;
import org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.impl.ShardingSphereProxyClusterContainer;
import org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.impl.ShardingSphereProxyStandaloneContainer;
import org.apache.shardingsphere.test.e2e.env.container.atomic.enums.AdapterMode;
Expand Down Expand Up @@ -53,6 +54,8 @@ public static AdapterContainer newInstance(final AdapterMode mode, final Adapter
return AdapterMode.CLUSTER == mode
? new ShardingSphereProxyClusterContainer(databaseType, containerConfig)
: new ShardingSphereProxyStandaloneContainer(databaseType, containerConfig);
case PROXY_RANDOM:
return new ShardingSphereMultiProxyClusterContainer(databaseType, containerConfig);
case JDBC:
return new ShardingSphereJdbcContainer(storageContainer, scenario, databaseType);
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.impl;

import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.test.e2e.env.container.atomic.ComboITContainer;
import org.apache.shardingsphere.test.e2e.env.container.atomic.ITContainer;
import org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.AdapterContainer;
import org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.config.AdaptorContainerConfiguration;
import org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
import org.testcontainers.lifecycle.Startable;

import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
* ShardingSphere proxy container for cluster mode.
*/
public final class ShardingSphereMultiProxyClusterContainer implements AdapterContainer, ComboITContainer {

private final AtomicReference<DataSource> targetDataSourceProvider = new AtomicReference<>();

private final Collection<ShardingSphereProxyClusterContainer> proxyClusterContainers = new LinkedList<>();

public ShardingSphereMultiProxyClusterContainer(final DatabaseType databaseType, final AdaptorContainerConfiguration config) {
ShardingSphereProxyClusterContainer proxy1 = new ShardingSphereProxyClusterContainer(databaseType, config);
proxy1.setAbbreviation("proxy1");
proxy1.setName("proxy1");
proxyClusterContainers.add(proxy1);
ShardingSphereProxyClusterContainer proxy2 = new ShardingSphereProxyClusterContainer(databaseType, config);
proxy1.setAbbreviation("proxy2");
proxy1.setName("proxy2");
proxyClusterContainers.add(proxy2);
}

@Override
public DataSource getTargetDataSource(final String serverLists) {
DataSource dataSource = targetDataSourceProvider.get();
if (null == dataSource) {
targetDataSourceProvider.set(new RandomDataSourceAdapter(proxyClusterContainers.stream().map(each -> each.getTargetDataSource(serverLists)).collect(Collectors.toSet())));
}
return targetDataSourceProvider.get();
}

@Override
public String getAbbreviation() {
return ProxyContainerConstants.PROXY_CONTAINER_ABBREVIATION;
}

@Override
public void start() {
proxyClusterContainers.forEach(Startable::start);
}

@Override
public Collection<ITContainer> getContainers() {
return proxyClusterContainers.stream().map(each -> (ITContainer) each).collect(Collectors.toList());
}

private static class RandomDataSourceAdapter implements DataSource {

private final DataSource[] dataSources;

RandomDataSourceAdapter(final Set<DataSource> dataSources) {
this.dataSources = dataSources.toArray(new DataSource[0]);
}

private DataSource getDataSource() {
return dataSources[ThreadLocalRandom.current().nextInt(dataSources.length)];
}

@Override
public Connection getConnection() throws SQLException {
return getDataSource().getConnection();
}

@Override
public Connection getConnection(final String username, final String password) throws SQLException {
return getDataSource().getConnection(username, password);
}

@Override
public PrintWriter getLogWriter() throws SQLException {
return getDataSource().getLogWriter();
}

@Override
public void setLogWriter(final PrintWriter out) throws SQLException {
getDataSource().setLogWriter(out);
}

@Override
public void setLoginTimeout(final int seconds) throws SQLException {
for (DataSource each : dataSources) {
each.setLoginTimeout(seconds);
}
}

@Override
public int getLoginTimeout() throws SQLException {
return getDataSource().getLoginTimeout();
}

@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return getDataSource().getParentLogger();
}

@Override
public <T> T unwrap(final Class<T> iface) throws SQLException {
return getDataSource().unwrap(iface);
}

@Override
public boolean isWrapperFor(final Class<?> iface) throws SQLException {
return getDataSource().isWrapperFor(iface);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.impl;

import com.google.common.base.Strings;
import lombok.Setter;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.test.e2e.env.container.atomic.DockerITContainer;
import org.apache.shardingsphere.test.e2e.env.container.atomic.adapter.AdapterContainer;
Expand Down Expand Up @@ -45,6 +46,9 @@ public final class ShardingSphereProxyClusterContainer extends DockerITContainer

private final AtomicReference<DataSource> targetDataSourceProvider = new AtomicReference<>();

@Setter
private String abbreviation = ProxyContainerConstants.PROXY_CONTAINER_ABBREVIATION;

public ShardingSphereProxyClusterContainer(final DatabaseType databaseType, final AdaptorContainerConfiguration config) {
super(ProxyContainerConstants.PROXY_CONTAINER_NAME_PREFIX, config.getAdapterContainerImage());
this.databaseType = databaseType;
Expand Down Expand Up @@ -94,6 +98,6 @@ public DataSource getTargetDataSource(final String serverLists) {

@Override
public String getAbbreviation() {
return ProxyContainerConstants.PROXY_CONTAINER_ABBREVIATION;
return abbreviation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public enum AdapterType {

JDBC("jdbc"),

PROXY("proxy");
PROXY("proxy"),

PROXY_RANDOM("proxy_random");

private final String value;
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ContainerComposer getContainerComposer(final String key, final String sce

private boolean isClusterMode(final AdapterMode adapterMode, final AdapterType adapterType) {
// TODO cluster mode often throw exception sometimes, issue is #15517
return AdapterMode.CLUSTER == adapterMode && AdapterType.PROXY == adapterType;
return AdapterMode.CLUSTER == adapterMode && AdapterType.PROXY == adapterType || AdapterType.PROXY_RANDOM == adapterType;
}

private ContainerComposer createContainerComposer(final boolean clusterMode, final String scenario, final DatabaseType databaseType, final AdapterMode adapterMode, final AdapterType adapterType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.test.e2e.env.container.atomic.enums.AdapterType;
import org.apache.shardingsphere.test.e2e.env.container.atomic.enums.AdapterMode;
import org.apache.shardingsphere.test.e2e.env.container.atomic.enums.AdapterType;
import org.apache.shardingsphere.test.e2e.env.runtime.E2ETestEnvironment;

/**
Expand Down Expand Up @@ -66,6 +66,7 @@ private static boolean isRunDCL() {
}

private static boolean isRunProxy() {
return ENV.getRunModes().contains(AdapterMode.CLUSTER.getValue()) && ENV.getClusterEnvironment().getAdapters().contains(AdapterType.PROXY.getValue());
return ENV.getRunModes().contains(AdapterMode.CLUSTER.getValue()) && ENV.getClusterEnvironment().getAdapters().contains(AdapterType.PROXY.getValue()) || ENV.getClusterEnvironment()
.getAdapters().contains(AdapterType.PROXY_RANDOM.getValue());
}
}
2 changes: 1 addition & 1 deletion test/e2e/sql/src/test/resources/env/it-env.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ it.scenarios=db,tbl,readwrite_splitting,encrypt,shadow,dbtbl_with_readwrite_spli
# it.cluster.env.type=DOCKER,NATIVE
it.cluster.env.type=DOCKER

# it.cluster.adapters=jdbc,proxy
# it.cluster.adapters=jdbc,proxy,proxy_random
it.cluster.adapters=proxy

# it.cluster.databases=MySQL,PostgreSQL,openGauss
Expand Down