Skip to content

Commit

Permalink
Merge branch 'apache:master' into ISSUE_31437_WITH_KEYWORD_QUERY_ERRO…
Browse files Browse the repository at this point in the history
…R_MYSQL
  • Loading branch information
Yash-cor authored Dec 31, 2024
2 parents 893cea5 + 6ecd873 commit c340ae3
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class LockSQLException extends KernelSQLException {

private static final int KERNEL_CODE = 5;

public LockSQLException(final SQLState sqlState, final int errorCode, final String reason, final Object... messageArguments) {
protected LockSQLException(final SQLState sqlState, final int errorCode, final String reason, final Object... messageArguments) {
super(sqlState, KERNEL_CODE, errorCode, reason, messageArguments);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Setter;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

/**
* Transaction connection context.
Expand All @@ -41,7 +42,7 @@ public final class TransactionConnectionContext implements AutoCloseable {
@Setter
private volatile String readWriteSplitReplicaRoute;

private volatile TransactionManager transactionManager;
private AtomicReference<TransactionManager> transactionManager;

/**
* Begin transaction.
Expand All @@ -52,7 +53,7 @@ public final class TransactionConnectionContext implements AutoCloseable {
public void beginTransaction(final String transactionType, final TransactionManager transactionManager) {
this.transactionType = transactionType;
inTransaction = true;
this.transactionManager = transactionManager;
this.transactionManager = new AtomicReference<>(transactionManager);
}

/**
Expand Down Expand Up @@ -88,7 +89,7 @@ public Optional<String> getReadWriteSplitReplicaRoute() {
* @return transaction manager
*/
public Optional<TransactionManager> getTransactionManager() {
return Optional.ofNullable(transactionManager);
return null == transactionManager ? Optional.empty() : Optional.ofNullable(transactionManager.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import lombok.Setter;
import org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.EmptyStorageUnitException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
import org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor;
import org.apache.shardingsphere.distsql.statement.ral.updatable.RefreshTableMetaDataStatement;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.kernel.metadata.SchemaNotFoundException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.TableNotFoundException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.EmptyStorageUnitException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.mode.manager.ContextManager;
Expand All @@ -43,8 +45,8 @@ public final class RefreshTableMetaDataExecutor implements DistSQLUpdateExecutor

@Override
public void executeUpdate(final RefreshTableMetaDataStatement sqlStatement, final ContextManager contextManager) throws SQLException {
checkStorageUnit(contextManager.getStorageUnits(database.getName()), sqlStatement);
String schemaName = getSchemaName(sqlStatement);
checkBeforeUpdate(sqlStatement, schemaName);
if (sqlStatement.getStorageUnitName().isPresent()) {
if (sqlStatement.getTableName().isPresent()) {
contextManager.reloadTable(database, schemaName, sqlStatement.getStorageUnitName().get(), sqlStatement.getTableName().get());
Expand All @@ -60,6 +62,16 @@ public void executeUpdate(final RefreshTableMetaDataStatement sqlStatement, fina
}
}

private String getSchemaName(final RefreshTableMetaDataStatement sqlStatement) {
return sqlStatement.getSchemaName().isPresent() ? sqlStatement.getSchemaName().get() : new DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(database.getName());
}

private void checkBeforeUpdate(final RefreshTableMetaDataStatement sqlStatement, final String schemaName) {
checkStorageUnit(database.getResourceMetaData().getStorageUnits(), sqlStatement);
checkSchema(schemaName);
checkTable(sqlStatement, schemaName);
}

private void checkStorageUnit(final Map<String, StorageUnit> storageUnits, final RefreshTableMetaDataStatement sqlStatement) {
ShardingSpherePreconditions.checkNotEmpty(storageUnits, () -> new EmptyStorageUnitException(database.getName()));
if (sqlStatement.getStorageUnitName().isPresent()) {
Expand All @@ -68,8 +80,15 @@ private void checkStorageUnit(final Map<String, StorageUnit> storageUnits, final
}
}

private String getSchemaName(final RefreshTableMetaDataStatement sqlStatement) {
return sqlStatement.getSchemaName().isPresent() ? sqlStatement.getSchemaName().get() : new DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(database.getName());
private void checkSchema(final String schemaName) {
ShardingSpherePreconditions.checkState(database.containsSchema(schemaName), () -> new SchemaNotFoundException(schemaName));
}

private void checkTable(final RefreshTableMetaDataStatement sqlStatement, final String schemaName) {
if (sqlStatement.getTableName().isPresent()) {
String tableName = sqlStatement.getTableName().get();
ShardingSpherePreconditions.checkState(database.getSchema(schemaName).containsTable(tableName), () -> new TableNotFoundException(tableName));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@

import org.apache.shardingsphere.distsql.statement.ral.updatable.RefreshTableMetaDataStatement;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.exception.kernel.metadata.SchemaNotFoundException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.TableNotFoundException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.EmptyStorageUnitException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
Expand All @@ -30,16 +35,19 @@
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;

import java.sql.SQLException;
import java.util.Collections;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -49,29 +57,62 @@ class DistSQLUpdateBackendHandlerTest {

private final DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "FIXTURE");

@Test
void assertEmptyResource() {
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ContextManager contextManager;

@BeforeEach
void setUp() {
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
when(contextManager.getStorageUnits("foo_db")).thenReturn(Collections.emptyMap());
when(contextManager.getDatabase("foo_db")).thenReturn(new ShardingSphereDatabase("foo_db", mock(), mock(), mock(), Collections.emptyList()));
}

@Test
void assertEmptyStorageUnit() {
when(contextManager.getDatabase("foo_db")).thenReturn(new ShardingSphereDatabase("foo_db", databaseType, mock(), mock(), Collections.emptyList()));
DistSQLUpdateBackendHandler backendHandler = new DistSQLUpdateBackendHandler(new RefreshTableMetaDataStatement(), mockConnectionSession("foo_db"));
assertThrows(EmptyStorageUnitException.class, backendHandler::execute);
}

@Test
void assertMissingRequiredResources() {
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
void assertMissingRequiredStorageUnit() {
ResourceMetaData resourceMetaData = mock(ResourceMetaData.class);
when(resourceMetaData.getStorageUnits()).thenReturn(Collections.singletonMap("ds_0", mock(StorageUnit.class)));
when(contextManager.getDatabase("foo_db")).thenReturn(new ShardingSphereDatabase("foo_db", databaseType, resourceMetaData, mock(), Collections.emptyList()));
DistSQLUpdateBackendHandler backendHandler = new DistSQLUpdateBackendHandler(new RefreshTableMetaDataStatement("t_order", "ds_1", null), mockConnectionSession("foo_db"));
assertThrows(MissingRequiredStorageUnitsException.class, backendHandler::execute);
}

@Test
void assertSchemaNotFound() {
ResourceMetaData resourceMetaData = mock(ResourceMetaData.class);
when(resourceMetaData.getStorageUnits()).thenReturn(Collections.singletonMap("ds_0", mock(StorageUnit.class)));
when(contextManager.getDatabase("foo_db")).thenReturn(new ShardingSphereDatabase("foo_db", databaseType, resourceMetaData, mock(), Collections.emptyList()));
DistSQLUpdateBackendHandler backendHandler = new DistSQLUpdateBackendHandler(new RefreshTableMetaDataStatement("t_order", "ds_0", "bar_db"), mockConnectionSession("foo_db"));
assertThrows(SchemaNotFoundException.class, backendHandler::execute);
}

@Test
void assertTableNotFound() {
ResourceMetaData resourceMetaData = mock(ResourceMetaData.class);
when(resourceMetaData.getStorageUnits()).thenReturn(Collections.singletonMap("ds_0", mock(StorageUnit.class)));
ShardingSphereSchema schema = mock(ShardingSphereSchema.class);
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
when(database.containsSchema("foo_db")).thenReturn(true);
when(database.getSchema("foo_db")).thenReturn(schema);
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(contextManager.getDatabase("foo_db")).thenReturn(database);
DistSQLUpdateBackendHandler backendHandler = new DistSQLUpdateBackendHandler(new RefreshTableMetaDataStatement("t_order", "ds_0", "foo_db"), mockConnectionSession("foo_db"));
assertThrows(TableNotFoundException.class, backendHandler::execute);
}

@Test
void assertUpdate() throws SQLException {
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
when(contextManager.getDatabase("foo_db")).thenReturn(new ShardingSphereDatabase("foo_db", databaseType, mock(), mock(), Collections.emptyList()));
ResourceMetaData resourceMetaData = mock(ResourceMetaData.class);
when(resourceMetaData.getStorageUnits()).thenReturn(Collections.singletonMap("ds_0", mock(StorageUnit.class)));
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
when(database.containsSchema(any())).thenReturn(true);
when(database.getProtocolType()).thenReturn(databaseType);
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(contextManager.getDatabase("foo_db")).thenReturn(database);
ResponseHeader actual = new DistSQLUpdateBackendHandler(new RefreshTableMetaDataStatement(), mockConnectionSession("foo_db")).execute();
assertThat(actual, instanceOf(UpdateResponseHeader.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ private static class AlterTransactionRuleTask implements Runnable {

private final CommonSQLCommand commonSQL;

@SneakyThrows({SQLException.class, InterruptedException.class})
@SneakyThrows(SQLException.class)
@Override
public void run() {
while (!IS_FINISHED.get()) {
alterLocalTransactionRule();
TimeUnit.SECONDS.sleep(20);
Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(19L, TimeUnit.SECONDS).until(() -> true);
alterXaTransactionRule("Narayana");
if (SWITCH_COUNT.incrementAndGet() >= MAX_SWITCH_COUNT) {
IS_FINISHED.set(true);
break;
}
TimeUnit.SECONDS.sleep(20);
Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(19L, TimeUnit.SECONDS).until(() -> true);
}
}

Expand Down Expand Up @@ -220,7 +220,8 @@ private static void executeOneTransaction(final Connection connection) throws SQ
PreparedStatement deleteStatement = connection.prepareStatement("delete from account where id = ?");
deleteStatement.setObject(1, id);
deleteStatement.execute();
Thread.sleep(random.nextInt(900) + 100);
long time = random.nextLong(900) + 100;
Awaitility.await().atMost(time + 10L, TimeUnit.MILLISECONDS).pollInterval(time, TimeUnit.MILLISECONDS).until(() -> true);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
Expand Down

0 comments on commit c340ae3

Please sign in to comment.