Skip to content

Commit

Permalink
feat(oceanbase-catalog): Support schema operations for OceanBase JDBC…
Browse files Browse the repository at this point in the history
… catalog. (#5013)

### What changes were proposed in this pull request?

- Support schema operations for OceanBase JDBC catalog.
- Add OceanBaseContainer to test schema operations.

### Why are the changes needed?

Fix: #4990 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Add unit test: TestOceanBaseDatabaseOperations.
  • Loading branch information
yuanoOo authored Oct 9, 2024
1 parent 75ec8ae commit 437c41b
Show file tree
Hide file tree
Showing 14 changed files with 524 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@

package org.apache.gravitino.catalog.jdbc.operation;

import com.google.common.collect.ImmutableMap;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.catalog.jdbc.JdbcSchema;
import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.meta.AuditInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,6 +62,12 @@ public void initialize(
public void create(String databaseName, String comment, Map<String, String> properties)
throws SchemaAlreadyExistsException {
LOG.info("Beginning to create database {}", databaseName);
String originComment = StringIdentifier.removeIdFromComment(comment);
if (!supportSchemaComment() && StringUtils.isNotEmpty(originComment)) {
throw new UnsupportedOperationException(
"Doesn't support setting schema comment: " + originComment);
}

try (final Connection connection = getConnection()) {
JdbcConnectorUtils.executeUpdate(
connection, generateCreateDatabaseSql(databaseName, comment, properties));
Expand Down Expand Up @@ -101,20 +116,80 @@ protected void dropDatabase(String databaseName, boolean cascade) {
}

/**
* The default implementation of this method is based on MySQL syntax, and if the catalog does not
* support MySQL syntax, this method needs to be rewritten.
*
* @param databaseName The name of the database.
* @param comment The comment of the database.
* @param properties The properties of the database.
* @return the SQL statement to create a database with the given name and comment.
*/
protected abstract String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties);
protected String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties) {
String createDatabaseSql = String.format("CREATE DATABASE `%s`", databaseName);
if (MapUtils.isNotEmpty(properties)) {
throw new UnsupportedOperationException("Properties are not supported yet.");
}
LOG.info("Generated create database:{} sql: {}", databaseName, createDatabaseSql);
return createDatabaseSql;
}

/**
* The default implementation of this method is based on MySQL syntax, and if the catalog does not
* support MySQL syntax, this method needs to be rewritten.
*
* @param databaseName The name of the database.
* @param cascade cascade If set to true, drops all the tables in the schema as well.
* @return the SQL statement to drop a database with the given name.
*/
protected abstract String generateDropDatabaseSql(String databaseName, boolean cascade);
protected String generateDropDatabaseSql(String databaseName, boolean cascade) {
final String dropDatabaseSql = String.format("DROP DATABASE `%s`", databaseName);
if (cascade) {
return dropDatabaseSql;
}

try (final Connection connection = this.dataSource.getConnection()) {
String query = String.format("SHOW TABLES IN `%s`", databaseName);
try (Statement statement = connection.createStatement()) {
// Execute the query and check if there exists any tables in the database
try (ResultSet resultSet = statement.executeQuery(query)) {
if (resultSet.next()) {
throw new IllegalStateException(
String.format(
"Database %s is not empty, the value of cascade should be true.",
databaseName));
}
}
}
} catch (SQLException sqlException) {
throw this.exceptionMapper.toGravitinoException(sqlException);
}
return dropDatabaseSql;
}

/**
* The default implementation of this method is based on MySQL syntax, and if the catalog does not
* support MySQL syntax, this method needs to be rewritten.
*
* @param databaseName The name of the database to check.
* @return information object of the JDBC database.
*/
@Override
public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
List<String> allDatabases = listDatabases();
String dbName =
allDatabases.stream()
.filter(db -> db.equals(databaseName))
.findFirst()
.orElseThrow(
() -> new NoSuchSchemaException("Database %s could not be found", databaseName));

return JdbcSchema.builder()
.withName(dbName)
.withProperties(ImmutableMap.of())
.withAuditInfo(AuditInfo.EMPTY)
.build();
}

protected Connection getConnection() throws SQLException {
return dataSource.getConnection();
Expand All @@ -124,9 +199,15 @@ protected Connection getConnection() throws SQLException {
* Check whether it is a system database.
*
* @param dbName The name of the database.
* @return false for all cases.
* @return whether it is a system database.
*/
protected boolean isSystemDatabase(String dbName) {
return false;
return createSysDatabaseNameSet().contains(dbName.toLowerCase(Locale.ROOT));
}

/** Check whether support setting schema comment. */
protected abstract boolean supportSchemaComment();

/** Create a set of system database names. */
protected abstract Set<String> createSysDatabaseNameSet();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.gravitino.catalog.jdbc.JdbcSchema;
Expand Down Expand Up @@ -91,6 +93,16 @@ public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
return null;
}

@Override
protected boolean supportSchemaComment() {
return false;
}

@Override
protected Set<String> createSysDatabaseNameSet() {
return Collections.emptySet();
}

@Override
public boolean delete(String databaseName, boolean cascade) throws NoSuchSchemaException {
return delete(databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
public class DorisDatabaseOperations extends JdbcDatabaseOperations {
public static final String COMMENT_KEY = "comment";

private static final Set<String> DORIS_SYSTEM_DATABASE_NAMES =
ImmutableSet.of("information_schema");

@Override
public String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties) {
Expand Down Expand Up @@ -137,7 +134,12 @@ protected Map<String, String> getDatabaseProperties(String databaseName) {
}

@Override
protected boolean isSystemDatabase(String dbName) {
return DORIS_SYSTEM_DATABASE_NAMES.contains(dbName);
protected boolean supportSchemaComment() {
return true;
}

@Override
protected Set<String> createSysDatabaseNameSet() {
return ImmutableSet.of("information_schema");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,106 +18,20 @@
*/
package org.apache.gravitino.catalog.mysql.operation;

import com.google.common.collect.ImmutableMap;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.catalog.jdbc.JdbcSchema;
import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.meta.AuditInfo;

/** Database operations for MySQL. */
public class MysqlDatabaseOperations extends JdbcDatabaseOperations {

public static final Set<String> SYS_MYSQL_DATABASE_NAMES = createSysMysqlDatabaseNames();

private static Set<String> createSysMysqlDatabaseNames() {
Set<String> set = new HashSet<>();
set.add("information_schema");
set.add("mysql");
set.add("performance_schema");
set.add("sys");
return Collections.unmodifiableSet(set);
}

@Override
public String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties) {
String originComment = StringIdentifier.removeIdFromComment(comment);
if (StringUtils.isNotEmpty(originComment)) {
throw new UnsupportedOperationException(
"MySQL doesn't support set schema comment: " + originComment);
}
StringBuilder sqlBuilder = new StringBuilder("CREATE DATABASE ");

// Append database name
sqlBuilder.append("`").append(databaseName).append("`");
// Append options
if (MapUtils.isNotEmpty(properties)) {
// TODO #804 Properties will be unified in the future.
throw new UnsupportedOperationException("Properties are not supported yet");
}
String result = sqlBuilder.toString();
LOG.info("Generated create database:{} sql: {}", databaseName, result);
return result;
}

@Override
public String generateDropDatabaseSql(String databaseName, boolean cascade) {
final String dropDatabaseSql = "DROP DATABASE `" + databaseName + "`";
if (cascade) {
return dropDatabaseSql;
}

try (final Connection connection = this.dataSource.getConnection()) {
String query = "SHOW TABLES IN `" + databaseName + "`";
try (Statement statement = connection.createStatement()) {
// Execute the query and check if there exists any tables in the database
try (ResultSet resultSet = statement.executeQuery(query)) {
if (resultSet.next()) {
throw new IllegalStateException(
String.format(
"Database %s is not empty, the value of cascade should be true.",
databaseName));
}
}
}
} catch (SQLException sqlException) {
throw this.exceptionMapper.toGravitinoException(sqlException);
}
return dropDatabaseSql;
}

@Override
public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
List<String> allDatabases = listDatabases();
String dbName =
allDatabases.stream()
.filter(db -> db.equals(databaseName))
.findFirst()
.orElseThrow(
() -> new NoSuchSchemaException("Database %s could not be found", databaseName));

return JdbcSchema.builder()
.withName(dbName)
.withProperties(ImmutableMap.of())
.withAuditInfo(AuditInfo.EMPTY)
.build();
protected boolean supportSchemaComment() {
return false;
}

@Override
protected boolean isSystemDatabase(String dbName) {
return SYS_MYSQL_DATABASE_NAMES.contains(dbName.toLowerCase(Locale.ROOT));
protected Set<String> createSysDatabaseNameSet() {
return ImmutableSet.of("information_schema", "mysql", "sys", "performance_schema");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ public void testSchemaComment() {
UnsupportedOperationException.class,
() -> catalog.asSchemas().createSchema(testSchemaName, "comment", null));
Assertions.assertTrue(
exception.getMessage().contains("MySQL doesn't support set schema comment: comment"));
exception.getMessage().contains("Doesn't support setting schema comment: comment"));

// test null comment
String testSchemaName2 = "test2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.gravitino.catalog.mysql.operation;

import static org.apache.gravitino.catalog.mysql.operation.MysqlDatabaseOperations.SYS_MYSQL_DATABASE_NAMES;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -45,8 +43,11 @@ public void testBaseOperationDatabase() {
// Mysql database creation does not support incoming comments.
String comment = null;
List<String> databases = DATABASE_OPERATIONS.listDatabases();
SYS_MYSQL_DATABASE_NAMES.forEach(
sysMysqlDatabaseName -> Assertions.assertFalse(databases.contains(sysMysqlDatabaseName)));
((MysqlDatabaseOperations) DATABASE_OPERATIONS)
.createSysDatabaseNameSet()
.forEach(
sysMysqlDatabaseName ->
Assertions.assertFalse(databases.contains(sysMysqlDatabaseName)));
testBaseOperation(databaseName, properties, comment);
testDropDatabase(databaseName);
}
Expand Down
Loading

0 comments on commit 437c41b

Please sign in to comment.