Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(oceanbase-catalog): Support schema operations for OceanBase JDBC catalog. #5013

Merged
merged 4 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@

package org.apache.gravitino.catalog.jdbc.operation;

import com.google.common.collect.ImmutableMap;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.catalog.jdbc.JdbcSchema;
import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.meta.AuditInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,6 +62,12 @@ public void initialize(
public void create(String databaseName, String comment, Map<String, String> properties)
throws SchemaAlreadyExistsException {
LOG.info("Beginning to create database {}", databaseName);
String originComment = StringIdentifier.removeIdFromComment(comment);
if (!supportSchemaComment() && StringUtils.isNotEmpty(originComment)) {
throw new UnsupportedOperationException(
"Doesn't support setting schema comment: " + originComment);
}

try (final Connection connection = getConnection()) {
JdbcConnectorUtils.executeUpdate(
connection, generateCreateDatabaseSql(databaseName, comment, properties));
Expand Down Expand Up @@ -101,20 +116,80 @@ protected void dropDatabase(String databaseName, boolean cascade) {
}

/**
* The default implementation of this method is based on MySQL syntax, and if the catalog does not
* support MySQL syntax, this method needs to be rewritten.
*
* @param databaseName The name of the database.
* @param comment The comment of the database.
* @param properties The properties of the database.
* @return the SQL statement to create a database with the given name and comment.
*/
protected abstract String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties);
protected String generateCreateDatabaseSql(
mchades marked this conversation as resolved.
Show resolved Hide resolved
String databaseName, String comment, Map<String, String> properties) {
String createDatabaseSql = String.format("CREATE DATABASE `%s`", databaseName);
if (MapUtils.isNotEmpty(properties)) {
throw new UnsupportedOperationException("Properties are not supported yet.");
}
LOG.info("Generated create database:{} sql: {}", databaseName, createDatabaseSql);
return createDatabaseSql;
}

/**
* The default implementation of this method is based on MySQL syntax, and if the catalog does not
* support MySQL syntax, this method needs to be rewritten.
*
* @param databaseName The name of the database.
* @param cascade cascade If set to true, drops all the tables in the schema as well.
* @return the SQL statement to drop a database with the given name.
*/
protected abstract String generateDropDatabaseSql(String databaseName, boolean cascade);
protected String generateDropDatabaseSql(String databaseName, boolean cascade) {
final String dropDatabaseSql = String.format("DROP DATABASE `%s`", databaseName);
if (cascade) {
return dropDatabaseSql;
}

try (final Connection connection = this.dataSource.getConnection()) {
String query = String.format("SHOW TABLES IN `%s`", databaseName);
try (Statement statement = connection.createStatement()) {
// Execute the query and check if there exists any tables in the database
try (ResultSet resultSet = statement.executeQuery(query)) {
if (resultSet.next()) {
throw new IllegalStateException(
String.format(
"Database %s is not empty, the value of cascade should be true.",
databaseName));
}
}
}
} catch (SQLException sqlException) {
throw this.exceptionMapper.toGravitinoException(sqlException);
}
return dropDatabaseSql;
}

/**
* The default implementation of this method is based on MySQL syntax, and if the catalog does not
* support MySQL syntax, this method needs to be rewritten.
*
* @param databaseName The name of the database to check.
* @return information object of the JDBC database.
*/
@Override
public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
List<String> allDatabases = listDatabases();
String dbName =
allDatabases.stream()
.filter(db -> db.equals(databaseName))
.findFirst()
.orElseThrow(
() -> new NoSuchSchemaException("Database %s could not be found", databaseName));

return JdbcSchema.builder()
.withName(dbName)
.withProperties(ImmutableMap.of())
.withAuditInfo(AuditInfo.EMPTY)
.build();
}

protected Connection getConnection() throws SQLException {
return dataSource.getConnection();
Expand All @@ -124,9 +199,15 @@ protected Connection getConnection() throws SQLException {
* Check whether it is a system database.
*
* @param dbName The name of the database.
* @return false for all cases.
* @return whether it is a system database.
*/
protected boolean isSystemDatabase(String dbName) {
return false;
return createSysDatabaseNameSet().contains(dbName.toLowerCase(Locale.ROOT));
}

/** Check whether support setting schema comment. */
protected abstract boolean supportSchemaComment();

/** Create a set of system database names. */
protected abstract Set<String> createSysDatabaseNameSet();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.gravitino.catalog.jdbc.JdbcSchema;
Expand Down Expand Up @@ -91,6 +93,16 @@ public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
return null;
}

@Override
protected boolean supportSchemaComment() {
return false;
}

@Override
protected Set<String> createSysDatabaseNameSet() {
return Collections.emptySet();
}

@Override
public boolean delete(String databaseName, boolean cascade) throws NoSuchSchemaException {
return delete(databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
public class DorisDatabaseOperations extends JdbcDatabaseOperations {
public static final String COMMENT_KEY = "comment";

private static final Set<String> DORIS_SYSTEM_DATABASE_NAMES =
ImmutableSet.of("information_schema");

@Override
public String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties) {
Expand Down Expand Up @@ -137,7 +134,12 @@ protected Map<String, String> getDatabaseProperties(String databaseName) {
}

@Override
protected boolean isSystemDatabase(String dbName) {
return DORIS_SYSTEM_DATABASE_NAMES.contains(dbName);
protected boolean supportSchemaComment() {
return true;
}

@Override
protected Set<String> createSysDatabaseNameSet() {
return ImmutableSet.of("information_schema");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,106 +18,20 @@
*/
package org.apache.gravitino.catalog.mysql.operation;

import com.google.common.collect.ImmutableMap;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.catalog.jdbc.JdbcSchema;
import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.meta.AuditInfo;

/** Database operations for MySQL. */
public class MysqlDatabaseOperations extends JdbcDatabaseOperations {

public static final Set<String> SYS_MYSQL_DATABASE_NAMES = createSysMysqlDatabaseNames();

private static Set<String> createSysMysqlDatabaseNames() {
Set<String> set = new HashSet<>();
set.add("information_schema");
set.add("mysql");
set.add("performance_schema");
set.add("sys");
return Collections.unmodifiableSet(set);
}

@Override
public String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties) {
String originComment = StringIdentifier.removeIdFromComment(comment);
if (StringUtils.isNotEmpty(originComment)) {
throw new UnsupportedOperationException(
"MySQL doesn't support set schema comment: " + originComment);
}
StringBuilder sqlBuilder = new StringBuilder("CREATE DATABASE ");

// Append database name
sqlBuilder.append("`").append(databaseName).append("`");
// Append options
if (MapUtils.isNotEmpty(properties)) {
// TODO #804 Properties will be unified in the future.
throw new UnsupportedOperationException("Properties are not supported yet");
}
String result = sqlBuilder.toString();
LOG.info("Generated create database:{} sql: {}", databaseName, result);
return result;
}

@Override
public String generateDropDatabaseSql(String databaseName, boolean cascade) {
final String dropDatabaseSql = "DROP DATABASE `" + databaseName + "`";
if (cascade) {
return dropDatabaseSql;
}

try (final Connection connection = this.dataSource.getConnection()) {
String query = "SHOW TABLES IN `" + databaseName + "`";
try (Statement statement = connection.createStatement()) {
// Execute the query and check if there exists any tables in the database
try (ResultSet resultSet = statement.executeQuery(query)) {
if (resultSet.next()) {
throw new IllegalStateException(
String.format(
"Database %s is not empty, the value of cascade should be true.",
databaseName));
}
}
}
} catch (SQLException sqlException) {
throw this.exceptionMapper.toGravitinoException(sqlException);
}
return dropDatabaseSql;
}

@Override
public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
List<String> allDatabases = listDatabases();
String dbName =
allDatabases.stream()
.filter(db -> db.equals(databaseName))
.findFirst()
.orElseThrow(
() -> new NoSuchSchemaException("Database %s could not be found", databaseName));

return JdbcSchema.builder()
.withName(dbName)
.withProperties(ImmutableMap.of())
.withAuditInfo(AuditInfo.EMPTY)
.build();
protected boolean supportSchemaComment() {
return false;
}

@Override
protected boolean isSystemDatabase(String dbName) {
return SYS_MYSQL_DATABASE_NAMES.contains(dbName.toLowerCase(Locale.ROOT));
protected Set<String> createSysDatabaseNameSet() {
return ImmutableSet.of("information_schema", "mysql", "sys", "performance_schema");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ public void testSchemaComment() {
UnsupportedOperationException.class,
() -> catalog.asSchemas().createSchema(testSchemaName, "comment", null));
Assertions.assertTrue(
exception.getMessage().contains("MySQL doesn't support set schema comment: comment"));
exception.getMessage().contains("Doesn't support setting schema comment: comment"));

// test null comment
String testSchemaName2 = "test2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.gravitino.catalog.mysql.operation;

import static org.apache.gravitino.catalog.mysql.operation.MysqlDatabaseOperations.SYS_MYSQL_DATABASE_NAMES;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -45,8 +43,11 @@ public void testBaseOperationDatabase() {
// Mysql database creation does not support incoming comments.
String comment = null;
List<String> databases = DATABASE_OPERATIONS.listDatabases();
SYS_MYSQL_DATABASE_NAMES.forEach(
sysMysqlDatabaseName -> Assertions.assertFalse(databases.contains(sysMysqlDatabaseName)));
((MysqlDatabaseOperations) DATABASE_OPERATIONS)
.createSysDatabaseNameSet()
.forEach(
sysMysqlDatabaseName ->
Assertions.assertFalse(databases.contains(sysMysqlDatabaseName)));
testBaseOperation(databaseName, properties, comment);
testDropDatabase(databaseName);
}
Expand Down
Loading
Loading