From 311d1497f3e7e42c065cc93368498a9f21774675 Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Fri, 27 Dec 2024 16:55:21 +0800 Subject: [PATCH 1/6] Use same transaction manager in one transaction --- .../TransactionConnectionContext.java | 18 +- .../transaction/TransactionManager.java | 24 ++ .../DriverDatabaseConnectionManager.java | 2 +- ...ngSphereDistributedTransactionManager.java | 3 +- .../transaction/ConnectionTransaction.java | 7 +- .../transaction/TransactionXAHandler.java | 9 +- .../SwitchingTransactionRuleTestCase.java | 220 ++++++++++++++++++ 7 files changed, 274 insertions(+), 9 deletions(-) create mode 100644 infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionManager.java create mode 100644 test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java diff --git a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java index 1b62261baf772..67f058db78db6 100644 --- a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java +++ b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java @@ -41,14 +41,18 @@ public final class TransactionConnectionContext implements AutoCloseable { @Setter private volatile String readWriteSplitReplicaRoute; + private volatile TransactionManager transactionManager; + /** * Begin transaction. * - * @param transactionType transaction type + * @param transactionType transaction type + * @param transactionManager transaction manager */ - public void beginTransaction(final String transactionType) { + public void beginTransaction(final String transactionType, final TransactionManager transactionManager) { this.transactionType = transactionType; inTransaction = true; + this.transactionManager = transactionManager; } /** @@ -78,6 +82,15 @@ public Optional getReadWriteSplitReplicaRoute() { return Optional.ofNullable(readWriteSplitReplicaRoute); } + /** + * Get transaction manager. + * + * @return transaction manager + */ + public Optional getTransactionManager() { + return Optional.ofNullable(transactionManager); + } + @Override public void close() { transactionType = null; @@ -85,5 +98,6 @@ public void close() { beginMills = 0L; exceptionOccur = false; readWriteSplitReplicaRoute = null; + transactionManager = null; } } diff --git a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionManager.java b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionManager.java new file mode 100644 index 0000000000000..b38dcbc195a00 --- /dev/null +++ b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionManager.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.session.connection.transaction; + +/** + * Transaction manager. + */ +public interface TransactionManager { +} diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java index bc4779a15039f..5218b69f5a800 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java @@ -118,7 +118,7 @@ public void begin() throws SQLException { close(); connectionTransaction.begin(); } - connectionContext.getTransactionContext().beginTransaction(String.valueOf(connectionTransaction.getTransactionType())); + connectionContext.getTransactionContext().beginTransaction(String.valueOf(connectionTransaction.getTransactionType()), connectionTransaction.getDistributedTransactionManager()); } /** diff --git a/kernel/transaction/api/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereDistributedTransactionManager.java b/kernel/transaction/api/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereDistributedTransactionManager.java index 8a61f60b89803..94c53b99ae178 100644 --- a/kernel/transaction/api/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereDistributedTransactionManager.java +++ b/kernel/transaction/api/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereDistributedTransactionManager.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.transaction.spi; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.session.connection.transaction.TransactionManager; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; import org.apache.shardingsphere.transaction.api.TransactionType; @@ -29,7 +30,7 @@ /** * ShardingSphere distributed transaction manager. */ -public interface ShardingSphereDistributedTransactionManager extends TypedSPI, AutoCloseable { +public interface ShardingSphereDistributedTransactionManager extends TypedSPI, AutoCloseable, TransactionManager { /** * Initialize distributed transaction manager. diff --git a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java index c0456ef3ab3c4..aa82ded550159 100644 --- a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java +++ b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java @@ -35,6 +35,7 @@ public final class ConnectionTransaction { @Getter private final TransactionType transactionType; + @Getter private final ShardingSphereDistributedTransactionManager distributedTransactionManager; private final TransactionConnectionContext transactionContext; @@ -42,7 +43,11 @@ public final class ConnectionTransaction { public ConnectionTransaction(final TransactionRule rule, final TransactionConnectionContext transactionContext) { transactionType = transactionContext.getTransactionType().isPresent() ? TransactionType.valueOf(transactionContext.getTransactionType().get()) : rule.getDefaultType(); this.transactionContext = transactionContext; - distributedTransactionManager = TransactionType.LOCAL == transactionType ? null : rule.getResource().getTransactionManager(rule.getDefaultType()); + if (transactionContext.getTransactionManager().isPresent()) { + distributedTransactionManager = (ShardingSphereDistributedTransactionManager) transactionContext.getTransactionManager().get(); + } else { + distributedTransactionManager = TransactionType.LOCAL == this.transactionType ? null : rule.getResource().getTransactionManager(rule.getDefaultType()); + } } /** diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java index 2cce1c63bac7d..6b404c50e2219 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionXAHandler.java @@ -29,13 +29,13 @@ import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow; import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader; import org.apache.shardingsphere.proxy.backend.session.ConnectionSession; -import org.apache.shardingsphere.proxy.backend.util.TransactionUtils; import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.xa.XABeginStatement; import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.xa.XACommitStatement; import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.xa.XARecoveryStatement; import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.xa.XARollbackStatement; import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.xa.XAStatement; -import org.apache.shardingsphere.transaction.api.TransactionType; +import org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine; +import org.apache.shardingsphere.transaction.rule.TransactionRule; import org.apache.shardingsphere.transaction.xa.jta.exception.XATransactionNestedBeginException; import java.sql.SQLException; @@ -88,8 +88,9 @@ public ResponseHeader execute() throws SQLException { private ResponseHeader begin() throws SQLException { ShardingSpherePreconditions.checkState(!connectionSession.getTransactionStatus().isInTransaction(), XATransactionNestedBeginException::new); ResponseHeader result = backendHandler.execute(); - TransactionType transactionType = TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext()); - connectionSession.getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(transactionType)); + TransactionRule transactionRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class); + ShardingSphereTransactionManagerEngine engine = transactionRule.getResource(); + connectionSession.getConnectionContext().getTransactionContext().beginTransaction(transactionRule.getDefaultType().name(), engine.getTransactionManager(transactionRule.getDefaultType())); return result; } diff --git a/test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java b/test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java new file mode 100644 index 0000000000000..18c0a44c843dd --- /dev/null +++ b/test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.test.e2e.transaction.cases.alterresource; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase; +import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionContainerComposer; +import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionTestCase; +import org.apache.shardingsphere.test.e2e.transaction.engine.command.CommonSQLCommand; +import org.apache.shardingsphere.test.e2e.transaction.engine.constants.TransactionTestConstants; +import org.apache.shardingsphere.transaction.api.TransactionType; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Switching transaction rule test case. + */ +@Slf4j +@TransactionTestCase(adapters = TransactionTestConstants.PROXY, scenario = "addResource") +public final class SwitchingTransactionRuleTestCase extends BaseTransactionTestCase { + + private static final int THREAD_SIZE = 100; + + private static final int TRANSACTION_SIZE = 100_0000; + + private static final AtomicBoolean IS_FINISHED = new AtomicBoolean(false); + + public SwitchingTransactionRuleTestCase(final TransactionTestCaseParameter testCaseParam) { + super(testCaseParam); + } + + @Override + protected void executeTest(final TransactionContainerComposer containerComposer) throws SQLException { + innerRun(containerComposer); + } + + @SneakyThrows(InterruptedException.class) + private void innerRun(final TransactionContainerComposer containerComposer) { + List tasks = new ArrayList<>(THREAD_SIZE); + for (int i = 0; i < THREAD_SIZE; i++) { + Thread updateThread = new Thread(new TransactionOperationsTask(getDataSource())); + updateThread.start(); + tasks.add(updateThread); + } + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(new AlterTransactionRuleTask(containerComposer, getBaseTransactionITCase().getCommonSQL())); + for (Thread task : tasks) { + task.join(); + } + IS_FINISHED.set(true); + executor.shutdown(); + } + + @RequiredArgsConstructor + @Getter + private static class AlterTransactionRuleTask implements Runnable { + + private final TransactionContainerComposer containerComposer; + + private final CommonSQLCommand commonSQL; + + @SneakyThrows({SQLException.class, InterruptedException.class}) + @Override + public void run() { + while (!IS_FINISHED.get()) { + alterLocalTransactionRule(); + TimeUnit.MINUTES.sleep(1); + alterXaTransactionRule("Narayana"); + TimeUnit.MINUTES.sleep(1); + } + } + + private void alterLocalTransactionRule() throws SQLException { + try (Connection connection = containerComposer.getDataSource().getConnection()) { + if (isExpectedTransactionRule(connection, TransactionType.LOCAL, "")) { + return; + } + String alterLocalTransactionRule = commonSQL.getAlterLocalTransactionRule(); + executeWithLog(connection, alterLocalTransactionRule); + } + assertTrue(waitExpectedTransactionRule(TransactionType.LOCAL, "", containerComposer)); + } + + private void alterXaTransactionRule(final String providerType) throws SQLException { + try (Connection connection = containerComposer.getDataSource().getConnection()) { + if (isExpectedTransactionRule(connection, TransactionType.XA, providerType)) { + return; + } + String alterXaTransactionRule = commonSQL.getAlterXATransactionRule().replace("${providerType}", providerType); + executeWithLog(connection, alterXaTransactionRule); + } + assertTrue(waitExpectedTransactionRule(TransactionType.XA, providerType, containerComposer)); + } + + private boolean isExpectedTransactionRule(final Connection connection, final TransactionType expectedTransType, final String expectedProviderType) throws SQLException { + Map transactionRuleMap = executeShowTransactionRule(connection); + return Objects.equals(transactionRuleMap.get(TransactionTestConstants.DEFAULT_TYPE), expectedTransType.toString()) + && Objects.equals(transactionRuleMap.get(TransactionTestConstants.PROVIDER_TYPE), expectedProviderType); + } + + private Map executeShowTransactionRule(final Connection connection) throws SQLException { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery("SHOW TRANSACTION RULE;"); + Map result = new HashMap<>(); + while (resultSet.next()) { + String defaultType = resultSet.getString(TransactionTestConstants.DEFAULT_TYPE); + String providerType = resultSet.getString(TransactionTestConstants.PROVIDER_TYPE); + result.put(TransactionTestConstants.DEFAULT_TYPE, defaultType); + result.put(TransactionTestConstants.PROVIDER_TYPE, providerType); + } + statement.close(); + return result; + } + + private boolean waitExpectedTransactionRule(final TransactionType expectedTransType, final String expectedProviderType, + final TransactionContainerComposer containerComposer) throws SQLException { + Awaitility.await().pollDelay(5L, TimeUnit.SECONDS).until(() -> true); + try (Connection connection = containerComposer.getDataSource().getConnection()) { + int waitTimes = 0; + do { + if (isExpectedTransactionRule(connection, expectedTransType, expectedProviderType)) { + return true; + } + Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true); + waitTimes++; + } while (waitTimes <= 3); + return false; + } + } + } + + @RequiredArgsConstructor + @Getter + private static class TransactionOperationsTask implements Runnable { + + private final DataSource dataSource; + + private static final AtomicInteger ID_COUNTER = new AtomicInteger(); + + @SneakyThrows(SQLException.class) + public void run() { + Connection connection = dataSource.getConnection(); + for (int i = 0; i < TRANSACTION_SIZE; i++) { + executeOneTransaction(connection); + } + connection.close(); + } + + private static void executeOneTransaction(final Connection connection) throws SQLException { + boolean isErrorOccured = false; + ThreadLocalRandom random = ThreadLocalRandom.current(); + try { + connection.setAutoCommit(false); + int id = ID_COUNTER.incrementAndGet(); + PreparedStatement insertStatement = connection.prepareStatement("insert into account(id, balance, transaction_id) values(?, ?, ?)"); + insertStatement.setObject(1, id); + insertStatement.setObject(2, id); + insertStatement.setObject(3, id); + insertStatement.execute(); + PreparedStatement updateStatement = connection.prepareStatement("update account set balance = balance - 1 where id = ?"); + updateStatement.setObject(1, id); + updateStatement.execute(); + PreparedStatement selectStatement = connection.prepareStatement("select * from account where id = ?"); + selectStatement.setObject(1, id); + selectStatement.executeQuery(); + PreparedStatement deleteStatement = connection.prepareStatement("delete from account where id = ?"); + deleteStatement.setObject(1, id); + deleteStatement.execute(); + Thread.sleep(random.nextInt(900) + 100); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + log.error("Execute transaction exception occurred", ex); + isErrorOccured = true; + connection.rollback(); + } + if (!isErrorOccured) { + connection.commit(); + } + connection.setAutoCommit(true); + } + } +} From ffeb1c1dc5321a15fe5398354ce6bb63a70c3163 Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Fri, 27 Dec 2024 16:57:55 +0800 Subject: [PATCH 2/6] Use same transaction manager in one transaction --- .../SwitchingTransactionRuleTestCase.java | 220 ------------------ 1 file changed, 220 deletions(-) delete mode 100644 test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java diff --git a/test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java b/test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java deleted file mode 100644 index 18c0a44c843dd..0000000000000 --- a/test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.test.e2e.transaction.cases.alterresource; - -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase; -import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionContainerComposer; -import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionTestCase; -import org.apache.shardingsphere.test.e2e.transaction.engine.command.CommonSQLCommand; -import org.apache.shardingsphere.test.e2e.transaction.engine.constants.TransactionTestConstants; -import org.apache.shardingsphere.transaction.api.TransactionType; -import org.testcontainers.shaded.org.awaitility.Awaitility; - -import javax.sql.DataSource; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * Switching transaction rule test case. - */ -@Slf4j -@TransactionTestCase(adapters = TransactionTestConstants.PROXY, scenario = "addResource") -public final class SwitchingTransactionRuleTestCase extends BaseTransactionTestCase { - - private static final int THREAD_SIZE = 100; - - private static final int TRANSACTION_SIZE = 100_0000; - - private static final AtomicBoolean IS_FINISHED = new AtomicBoolean(false); - - public SwitchingTransactionRuleTestCase(final TransactionTestCaseParameter testCaseParam) { - super(testCaseParam); - } - - @Override - protected void executeTest(final TransactionContainerComposer containerComposer) throws SQLException { - innerRun(containerComposer); - } - - @SneakyThrows(InterruptedException.class) - private void innerRun(final TransactionContainerComposer containerComposer) { - List tasks = new ArrayList<>(THREAD_SIZE); - for (int i = 0; i < THREAD_SIZE; i++) { - Thread updateThread = new Thread(new TransactionOperationsTask(getDataSource())); - updateThread.start(); - tasks.add(updateThread); - } - ExecutorService executor = Executors.newSingleThreadExecutor(); - executor.submit(new AlterTransactionRuleTask(containerComposer, getBaseTransactionITCase().getCommonSQL())); - for (Thread task : tasks) { - task.join(); - } - IS_FINISHED.set(true); - executor.shutdown(); - } - - @RequiredArgsConstructor - @Getter - private static class AlterTransactionRuleTask implements Runnable { - - private final TransactionContainerComposer containerComposer; - - private final CommonSQLCommand commonSQL; - - @SneakyThrows({SQLException.class, InterruptedException.class}) - @Override - public void run() { - while (!IS_FINISHED.get()) { - alterLocalTransactionRule(); - TimeUnit.MINUTES.sleep(1); - alterXaTransactionRule("Narayana"); - TimeUnit.MINUTES.sleep(1); - } - } - - private void alterLocalTransactionRule() throws SQLException { - try (Connection connection = containerComposer.getDataSource().getConnection()) { - if (isExpectedTransactionRule(connection, TransactionType.LOCAL, "")) { - return; - } - String alterLocalTransactionRule = commonSQL.getAlterLocalTransactionRule(); - executeWithLog(connection, alterLocalTransactionRule); - } - assertTrue(waitExpectedTransactionRule(TransactionType.LOCAL, "", containerComposer)); - } - - private void alterXaTransactionRule(final String providerType) throws SQLException { - try (Connection connection = containerComposer.getDataSource().getConnection()) { - if (isExpectedTransactionRule(connection, TransactionType.XA, providerType)) { - return; - } - String alterXaTransactionRule = commonSQL.getAlterXATransactionRule().replace("${providerType}", providerType); - executeWithLog(connection, alterXaTransactionRule); - } - assertTrue(waitExpectedTransactionRule(TransactionType.XA, providerType, containerComposer)); - } - - private boolean isExpectedTransactionRule(final Connection connection, final TransactionType expectedTransType, final String expectedProviderType) throws SQLException { - Map transactionRuleMap = executeShowTransactionRule(connection); - return Objects.equals(transactionRuleMap.get(TransactionTestConstants.DEFAULT_TYPE), expectedTransType.toString()) - && Objects.equals(transactionRuleMap.get(TransactionTestConstants.PROVIDER_TYPE), expectedProviderType); - } - - private Map executeShowTransactionRule(final Connection connection) throws SQLException { - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery("SHOW TRANSACTION RULE;"); - Map result = new HashMap<>(); - while (resultSet.next()) { - String defaultType = resultSet.getString(TransactionTestConstants.DEFAULT_TYPE); - String providerType = resultSet.getString(TransactionTestConstants.PROVIDER_TYPE); - result.put(TransactionTestConstants.DEFAULT_TYPE, defaultType); - result.put(TransactionTestConstants.PROVIDER_TYPE, providerType); - } - statement.close(); - return result; - } - - private boolean waitExpectedTransactionRule(final TransactionType expectedTransType, final String expectedProviderType, - final TransactionContainerComposer containerComposer) throws SQLException { - Awaitility.await().pollDelay(5L, TimeUnit.SECONDS).until(() -> true); - try (Connection connection = containerComposer.getDataSource().getConnection()) { - int waitTimes = 0; - do { - if (isExpectedTransactionRule(connection, expectedTransType, expectedProviderType)) { - return true; - } - Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true); - waitTimes++; - } while (waitTimes <= 3); - return false; - } - } - } - - @RequiredArgsConstructor - @Getter - private static class TransactionOperationsTask implements Runnable { - - private final DataSource dataSource; - - private static final AtomicInteger ID_COUNTER = new AtomicInteger(); - - @SneakyThrows(SQLException.class) - public void run() { - Connection connection = dataSource.getConnection(); - for (int i = 0; i < TRANSACTION_SIZE; i++) { - executeOneTransaction(connection); - } - connection.close(); - } - - private static void executeOneTransaction(final Connection connection) throws SQLException { - boolean isErrorOccured = false; - ThreadLocalRandom random = ThreadLocalRandom.current(); - try { - connection.setAutoCommit(false); - int id = ID_COUNTER.incrementAndGet(); - PreparedStatement insertStatement = connection.prepareStatement("insert into account(id, balance, transaction_id) values(?, ?, ?)"); - insertStatement.setObject(1, id); - insertStatement.setObject(2, id); - insertStatement.setObject(3, id); - insertStatement.execute(); - PreparedStatement updateStatement = connection.prepareStatement("update account set balance = balance - 1 where id = ?"); - updateStatement.setObject(1, id); - updateStatement.execute(); - PreparedStatement selectStatement = connection.prepareStatement("select * from account where id = ?"); - selectStatement.setObject(1, id); - selectStatement.executeQuery(); - PreparedStatement deleteStatement = connection.prepareStatement("delete from account where id = ?"); - deleteStatement.setObject(1, id); - deleteStatement.execute(); - Thread.sleep(random.nextInt(900) + 100); - // CHECKSTYLE:OFF - } catch (final Exception ex) { - // CHECKSTYLE:ON - log.error("Execute transaction exception occurred", ex); - isErrorOccured = true; - connection.rollback(); - } - if (!isErrorOccured) { - connection.commit(); - } - connection.setAutoCommit(true); - } - } -} From cd39ff91a00a51a07de7994a254b1fb58dd4c611 Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Fri, 27 Dec 2024 17:08:48 +0800 Subject: [PATCH 3/6] Fix test --- .../transaction/TransactionConnectionContextTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/infra/session/src/test/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContextTest.java b/infra/session/src/test/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContextTest.java index eb4bc69cc2623..0d0dd0d0993e4 100644 --- a/infra/session/src/test/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContextTest.java +++ b/infra/session/src/test/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContextTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; class TransactionConnectionContextTest { @@ -32,7 +33,7 @@ class TransactionConnectionContextTest { @Test void assertBeginTransaction() { - transactionConnectionContext.beginTransaction("XA"); + transactionConnectionContext.beginTransaction("XA", mock(TransactionManager.class)); assertThat(transactionConnectionContext.getTransactionType(), is(Optional.of("XA"))); assertTrue(transactionConnectionContext.isInTransaction()); } @@ -44,19 +45,19 @@ void assertIsNotInDistributedTransactionWhenNotBegin() { @Test void assertIsNotInDistributedTransactionWithLocal() { - transactionConnectionContext.beginTransaction("LOCAL"); + transactionConnectionContext.beginTransaction("LOCAL", mock(TransactionManager.class)); assertFalse(transactionConnectionContext.isInDistributedTransaction()); } @Test void assertIsInDistributedTransactionWithXA() { - transactionConnectionContext.beginTransaction("XA"); + transactionConnectionContext.beginTransaction("XA", mock(TransactionManager.class)); assertTrue(transactionConnectionContext.isInDistributedTransaction()); } @Test void assertIsInDistributedTransactionWithBASE() { - transactionConnectionContext.beginTransaction("BASE"); + transactionConnectionContext.beginTransaction("BASE", mock(TransactionManager.class)); assertTrue(transactionConnectionContext.isInDistributedTransaction()); } @@ -68,7 +69,7 @@ void assertGetReadWriteSplitReplicaRoute() { @Test void assertClose() { - transactionConnectionContext.beginTransaction("XA"); + transactionConnectionContext.beginTransaction("XA", mock(TransactionManager.class)); transactionConnectionContext.close(); assertFalse(transactionConnectionContext.getTransactionType().isPresent()); assertFalse(transactionConnectionContext.isInTransaction()); From 792fa0e430f2e733564fede4ae76e69222204b47 Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Fri, 27 Dec 2024 17:13:18 +0800 Subject: [PATCH 4/6] Fix test --- .../core/connection/ShardingSphereConnection.java | 3 ++- .../transaction/ConnectionTransactionTest.java | 11 ++++++----- .../jdbc/transaction/BackendTransactionManager.java | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java index 7da8af0ac3531..c41917e774504 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java @@ -171,7 +171,8 @@ private void processLocalTransaction() throws SQLException { return; } if (!autoCommit && !transactionContext.isInTransaction()) { - transactionContext.beginTransaction(String.valueOf(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType())); + transactionContext.beginTransaction(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType().name(), + databaseConnectionManager.getConnectionTransaction().getDistributedTransactionManager()); } } diff --git a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java index 3fde297470451..0e9e6279cb8d1 100644 --- a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java +++ b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.transaction; import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext; +import org.apache.shardingsphere.infra.session.connection.transaction.TransactionManager; import org.apache.shardingsphere.transaction.ConnectionTransaction.DistributedTransactionOperationType; import org.apache.shardingsphere.transaction.api.TransactionType; import org.apache.shardingsphere.transaction.rule.TransactionRule; @@ -48,21 +49,21 @@ void assertIsNotInDistributedTransactionWhenTransactionIsNotBegin() { @Test void assertIsNotInDistributedTransactionWhenIsNotDistributedTransaction() { TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("LOCAL"); + context.beginTransaction("LOCAL", mock(TransactionManager.class)); assertFalse(new ConnectionTransaction(mock(TransactionRule.class), context).isInDistributedTransaction(context)); } @Test void assertIsNotInDistributedTransactionWhenDistributedTransactionIsNotBegin() { TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("XA"); + context.beginTransaction("XA", mock(TransactionManager.class)); assertFalse(new ConnectionTransaction(mock(TransactionRule.class, RETURNS_DEEP_STUBS), context).isInDistributedTransaction(context)); } @Test void assertIsInDistributedTransaction() { TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("XA"); + context.beginTransaction("XA", mock(TransactionManager.class)); TransactionRule rule = mock(TransactionRule.class, RETURNS_DEEP_STUBS); when(rule.getResource().getTransactionManager(rule.getDefaultType()).isInTransaction()).thenReturn(true); assertTrue(new ConnectionTransaction(rule, context).isInDistributedTransaction(context)); @@ -95,7 +96,7 @@ void assertIsHoldTransactionWithXAAndAutoCommit() { when(rule.getDefaultType()).thenReturn(TransactionType.XA); when(rule.getResource().getTransactionManager(TransactionType.XA).isInTransaction()).thenReturn(true); TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("XA"); + context.beginTransaction("XA", mock(TransactionManager.class)); assertTrue(new ConnectionTransaction(rule, context).isHoldTransaction(true)); } @@ -144,7 +145,7 @@ void assertGetConnectionWithoutInDistributeTransaction() throws SQLException { @Test void assertGetConnectionWithInDistributeTransaction() throws SQLException { TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("XA"); + context.beginTransaction("XA", mock(TransactionManager.class)); TransactionRule rule = mock(TransactionRule.class, RETURNS_DEEP_STUBS); when(rule.getResource().getTransactionManager(rule.getDefaultType()).isInTransaction()).thenReturn(true); when(rule.getResource().getTransactionManager(rule.getDefaultType()).getConnection("foo_db", "foo_ds")).thenReturn(mock(Connection.class)); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java index 384c0e3445062..53bad50aceca1 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java @@ -70,7 +70,7 @@ public BackendTransactionManager(final ProxyDatabaseConnectionManager databaseCo public void begin() { if (!connection.getConnectionSession().getTransactionStatus().isInTransaction()) { connection.getConnectionSession().getTransactionStatus().setInTransaction(true); - getTransactionContext().beginTransaction(String.valueOf(transactionType)); + getTransactionContext().beginTransaction(transactionType.name(), distributedTransactionManager); connection.closeHandlers(true); connection.closeConnections(false); } From f06b1a2fd0151d5406d323e788c9ab1416efe0e0 Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Fri, 27 Dec 2024 17:49:06 +0800 Subject: [PATCH 5/6] fix --- .../transaction/ConnectionTransactionTest.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java index 0e9e6279cb8d1..23a832a7e370d 100644 --- a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java +++ b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.transaction; import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext; -import org.apache.shardingsphere.infra.session.connection.transaction.TransactionManager; import org.apache.shardingsphere.transaction.ConnectionTransaction.DistributedTransactionOperationType; import org.apache.shardingsphere.transaction.api.TransactionType; import org.apache.shardingsphere.transaction.rule.TransactionRule; @@ -49,21 +48,23 @@ void assertIsNotInDistributedTransactionWhenTransactionIsNotBegin() { @Test void assertIsNotInDistributedTransactionWhenIsNotDistributedTransaction() { TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("LOCAL", mock(TransactionManager.class)); + context.beginTransaction("LOCAL", mock(ShardingSphereDistributedTransactionManager.class)); assertFalse(new ConnectionTransaction(mock(TransactionRule.class), context).isInDistributedTransaction(context)); } @Test void assertIsNotInDistributedTransactionWhenDistributedTransactionIsNotBegin() { TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("XA", mock(TransactionManager.class)); + context.beginTransaction("XA", mock(ShardingSphereDistributedTransactionManager.class)); assertFalse(new ConnectionTransaction(mock(TransactionRule.class, RETURNS_DEEP_STUBS), context).isInDistributedTransaction(context)); } @Test void assertIsInDistributedTransaction() { TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("XA", mock(TransactionManager.class)); + ShardingSphereDistributedTransactionManager distributedTransactionManager = mock(ShardingSphereDistributedTransactionManager.class); + when(distributedTransactionManager.isInTransaction()).thenReturn(true); + context.beginTransaction("XA", distributedTransactionManager); TransactionRule rule = mock(TransactionRule.class, RETURNS_DEEP_STUBS); when(rule.getResource().getTransactionManager(rule.getDefaultType()).isInTransaction()).thenReturn(true); assertTrue(new ConnectionTransaction(rule, context).isInDistributedTransaction(context)); @@ -96,7 +97,9 @@ void assertIsHoldTransactionWithXAAndAutoCommit() { when(rule.getDefaultType()).thenReturn(TransactionType.XA); when(rule.getResource().getTransactionManager(TransactionType.XA).isInTransaction()).thenReturn(true); TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("XA", mock(TransactionManager.class)); + ShardingSphereDistributedTransactionManager distributedTransactionManager = mock(ShardingSphereDistributedTransactionManager.class); + when(distributedTransactionManager.isInTransaction()).thenReturn(true); + context.beginTransaction("XA", distributedTransactionManager); assertTrue(new ConnectionTransaction(rule, context).isHoldTransaction(true)); } @@ -145,7 +148,10 @@ void assertGetConnectionWithoutInDistributeTransaction() throws SQLException { @Test void assertGetConnectionWithInDistributeTransaction() throws SQLException { TransactionConnectionContext context = new TransactionConnectionContext(); - context.beginTransaction("XA", mock(TransactionManager.class)); + ShardingSphereDistributedTransactionManager distributedTransactionManager = mock(ShardingSphereDistributedTransactionManager.class); + when(distributedTransactionManager.isInTransaction()).thenReturn(true); + when(distributedTransactionManager.getConnection("foo_db", "foo_ds")).thenReturn(mock(Connection.class)); + context.beginTransaction("XA", distributedTransactionManager); TransactionRule rule = mock(TransactionRule.class, RETURNS_DEEP_STUBS); when(rule.getResource().getTransactionManager(rule.getDefaultType()).isInTransaction()).thenReturn(true); when(rule.getResource().getTransactionManager(rule.getDefaultType()).getConnection("foo_db", "foo_ds")).thenReturn(mock(Connection.class)); From 68560d824bb5a6f1e74b970f0d5f8117c93ce117 Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Fri, 27 Dec 2024 18:06:35 +0800 Subject: [PATCH 6/6] fix --- .../handler/transaction/TransactionBackendHandlerTest.java | 5 ++++- .../proxy/backend/session/ConnectionSessionTest.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionBackendHandlerTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionBackendHandlerTest.java index c08e8350e919d..4ab0b3005b65f 100644 --- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionBackendHandlerTest.java +++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionBackendHandlerTest.java @@ -26,6 +26,7 @@ import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.TCLStatement; import org.apache.shardingsphere.test.mock.AutoMockExtension; import org.apache.shardingsphere.test.mock.StaticMockSettings; +import org.apache.shardingsphere.transaction.api.TransactionType; import org.apache.shardingsphere.transaction.core.TransactionOperationType; import org.apache.shardingsphere.transaction.rule.TransactionRule; import org.junit.jupiter.api.Test; @@ -58,7 +59,9 @@ void assertExecute() throws SQLException { private ContextManager mockContextManager() { ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS); - when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(new RuleMetaData(Collections.singleton(mock(TransactionRule.class)))); + TransactionRule transactionRule = mock(TransactionRule.class); + when(transactionRule.getDefaultType()).thenReturn(TransactionType.LOCAL); + when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(new RuleMetaData(Collections.singleton(transactionRule))); return result; } } diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java index 96da820ed6335..9ae58ecc1b721 100644 --- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java +++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.test.mock.AutoMockExtension; import org.apache.shardingsphere.test.mock.StaticMockSettings; +import org.apache.shardingsphere.transaction.api.TransactionType; import org.apache.shardingsphere.transaction.rule.TransactionRule; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -82,7 +83,9 @@ void assertSwitchSchemaWhileBegin() { private ContextManager mockContextManager() { ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS); - when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(new RuleMetaData(Collections.singleton(mock(TransactionRule.class)))); + TransactionRule transactionRule = mock(TransactionRule.class); + when(transactionRule.getDefaultType()).thenReturn(TransactionType.LOCAL); + when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(new RuleMetaData(Collections.singleton(transactionRule))); return result; }