Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3919] feat(catalog-lakehouse-paimon): Support hive backend for Paimon Catalog #5092

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion catalogs/catalog-lakehouse-paimon/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ dependencies {
exclude("com.sun.jersey")
exclude("javax.servlet")
exclude("org.apache.curator")
exclude("org.apache.hive")
caican00 marked this conversation as resolved.
Show resolved Hide resolved
exclude("org.apache.hbase")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate:jetty-all")
Expand Down Expand Up @@ -122,12 +121,14 @@ dependencies {
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.mysql.driver)
testImplementation(libs.postgresql.driver)
testImplementation(libs.h2db)
testImplementation(libs.bundles.log4j)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.paimon.s3)
testImplementation(libs.paimon.spark)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.localstack)
testImplementation(libs.testcontainers.mysql)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@

/** The type of Apache Paimon catalog backend. */
public enum PaimonCatalogBackend {
FILESYSTEM
FILESYSTEM,
JDBC,
HIVE
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,26 @@ public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
public static final String PAIMON_METASTORE = "metastore";
public static final String WAREHOUSE = "warehouse";
public static final String URI = "uri";
public static final String JDBC_USER = "jdbc.user";
public static final String JDBC_PASSWORD = "jdbc.password";

// S3 properties needed by Paimon
public static final String S3_ENDPOINT = "s3.endpoint";
public static final String S3_ACCESS_KEY = "s3.access-key";
public static final String S3_SECRET_KEY = "s3.secret-key";

public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON =
ImmutableMap.of(GRAVITINO_CATALOG_BACKEND, PAIMON_METASTORE, WAREHOUSE, WAREHOUSE, URI, URI);
ImmutableMap.of(
GRAVITINO_CATALOG_BACKEND,
PAIMON_METASTORE,
WAREHOUSE,
WAREHOUSE,
URI,
URI,
JDBC_USER,
JDBC_USER,
JDBC_PASSWORD,
JDBC_PASSWORD);
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
private static final Map<String, String> KERBEROS_CONFIGURATION =
ImmutableMap.of(
Expand Down Expand Up @@ -96,6 +108,18 @@ public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
"Paimon catalog uri config",
false /* immutable */,
null /* defaultValue */,
false /* hidden */),
stringOptionalPropertyEntry(
JDBC_USER,
"Paimon catalog jdbc user",
false /* immutable */,
null /* defaultValue */,
false /* hidden */),
stringOptionalPropertyEntry(
JDBC_PASSWORD,
"Paimon catalog jdbc password",
false /* immutable */,
null /* defaultValue */,
false /* hidden */));
HashMap<String, PropertyEntry<?>> result = Maps.newHashMap();
result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ public class PaimonConfig extends Config {
.stringConf()
.create();

public static final ConfigEntry<String> CATALOG_JDBC_USER =
new ConfigBuilder(PaimonCatalogPropertiesMetadata.JDBC_USER)
.doc("Paimon catalog jdbc user")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public static final ConfigEntry<String> CATALOG_JDBC_PASSWORD =
new ConfigBuilder(PaimonCatalogPropertiesMetadata.JDBC_PASSWORD)
.doc("Paimon catalog jdbc password")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public PaimonConfig() {
super(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata.S3_CONFIGURATION;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_BACKEND;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_JDBC_PASSWORD;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_JDBC_USER;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_URI;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_WAREHOUSE;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
Expand Down Expand Up @@ -121,6 +123,17 @@ private static void checkPaimonConfig(PaimonConfig paimonConfig) {
Preconditions.checkArgument(
StringUtils.isNotBlank(uri), "Paimon Catalog uri can not be null or empty.");
}

if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(metastore)) {
String jdbcUser = paimonConfig.get(CATALOG_JDBC_USER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUser), "Paimon Catalog jdbc user can not be null or empty.");

String jdbcPassword = paimonConfig.get(CATALOG_JDBC_PASSWORD);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcPassword),
"Paimon Catalog jdbc password can not be null or empty.");
}
}

public static Map<String, String> toPaimonCatalogProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.integration.test.container.ContainerSuite;
import org.apache.gravitino.integration.test.container.MySQLContainer;
import org.apache.gravitino.integration.test.util.AbstractIT;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.integration.test.util.TestDatabaseName;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
Expand Down Expand Up @@ -87,29 +89,35 @@
public abstract class CatalogPaimonBaseIT extends AbstractIT {

protected static final ContainerSuite containerSuite = ContainerSuite.getInstance();
protected static final TestDatabaseName TEST_DB_NAME =
TestDatabaseName.PG_TEST_ICEBERG_CATALOG_MULTIPLE_JDBC_LOAD;
protected static MySQLContainer mySQLContainer;
protected String WAREHOUSE;
protected String TYPE;
protected String URI;
protected String jdbcUser;
protected String jdbcPassword;
protected Catalog catalog;
protected org.apache.paimon.catalog.Catalog paimonCatalog;
protected SparkSession spark;
protected String metalakeName = GravitinoITUtils.genRandomName("paimon_it_metalake");
protected String catalogName = GravitinoITUtils.genRandomName("paimon_it_catalog");
protected String schemaName = GravitinoITUtils.genRandomName("paimon_it_schema");
protected static final String schema_comment = "schema_comment";

private static final String provider = "lakehouse-paimon";
private static final String catalog_comment = "catalog_comment";
private static final String schema_comment = "schema_comment";
private static final String table_comment = "table_comment";
private static final String PAIMON_COL_NAME1 = "paimon_col_name1";
private static final String PAIMON_COL_NAME2 = "paimon_col_name2";
private static final String PAIMON_COL_NAME3 = "paimon_col_name3";
private static final String PAIMON_COL_NAME4 = "paimon_col_name4";
private static final String PAIMON_COL_NAME5 = "paimon_col_name5";
private static final String alertTableName = "alert_table_name";
private String metalakeName = GravitinoITUtils.genRandomName("paimon_it_metalake");
private String catalogName = GravitinoITUtils.genRandomName("paimon_it_catalog");
private String schemaName = GravitinoITUtils.genRandomName("paimon_it_schema");
private String tableName = GravitinoITUtils.genRandomName("paimon_it_table");
private static String INSERT_BATCH_WITHOUT_PARTITION_TEMPLATE = "INSERT INTO paimon.%s VALUES %s";
private static final String SELECT_ALL_TEMPLATE = "SELECT * FROM paimon.%s";
private static final String DEFAULT_DB = "default";
private GravitinoMetalake metalake;
private Catalog catalog;
private org.apache.paimon.catalog.Catalog paimonCatalog;
protected SparkSession spark;
private Map<String, String> catalogProperties;

@BeforeAll
Expand Down Expand Up @@ -163,9 +171,7 @@ void testPaimonSchemaOperations() throws DatabaseNotExistException {

// load schema check.
Schema schema = schemas.loadSchema(schemaIdent.name());
// database properties is empty for Paimon FilesystemCatalog.
Assertions.assertTrue(schema.properties().isEmpty());
Assertions.assertTrue(paimonCatalog.loadDatabaseProperties(schemaIdent.name()).isEmpty());
Assertions.assertEquals(testSchemaName, schema.name());

Map<String, String> emptyMap = Collections.emptyMap();
Assertions.assertThrows(
Expand Down Expand Up @@ -202,6 +208,7 @@ void testPaimonSchemaOperations() throws DatabaseNotExistException {

@Test
void testCreateTableWithNullComment() {
String tableName = GravitinoITUtils.genRandomName("paimon_table_with_null_comment");
Column[] columns = createColumns();
NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);

Expand All @@ -217,6 +224,8 @@ void testCreateTableWithNullComment() {
@Test
void testCreateAndLoadPaimonTable()
throws org.apache.paimon.catalog.Catalog.TableNotExistException {
String tableName = GravitinoITUtils.genRandomName("create_and_load_paimon_table");

// Create table from Gravitino API
Column[] columns = createColumns();

Expand Down Expand Up @@ -301,6 +310,8 @@ void testCreateAndLoadPaimonTable()
@Test
void testCreateAndLoadPaimonPartitionedTable()
throws org.apache.paimon.catalog.Catalog.TableNotExistException {
String tableName = GravitinoITUtils.genRandomName("create_and_load_paimon_partitioned_table");

// Create table from Gravitino API
Column[] columns = createColumns();

Expand Down Expand Up @@ -390,6 +401,8 @@ void testCreateAndLoadPaimonPartitionedTable()
@Test
void testCreateAndLoadPaimonPrimaryKeyTable()
throws org.apache.paimon.catalog.Catalog.TableNotExistException {
String tableName = GravitinoITUtils.genRandomName("create_and_load_paimon_primary_key_table");

// Create table from Gravitino API
Column[] columns = createColumns();
ArrayList<Column> newColumns = new ArrayList<>(Arrays.asList(columns));
Expand Down Expand Up @@ -615,6 +628,8 @@ void testListAndDropPaimonTable() throws DatabaseNotExistException {

@Test
public void testAlterPaimonTable() {
String tableName = GravitinoITUtils.genRandomName("alter_paimon_table");

Column[] columns = createColumns();
catalog
.asTableCatalog()
Expand Down Expand Up @@ -712,11 +727,11 @@ public void testAlterPaimonTable() {
// update column position
Column col1 = Column.of("name", Types.StringType.get(), "comment");
Column col2 = Column.of("address", Types.StringType.get(), "comment");
Column col3 = Column.of("date_of_birth", Types.DateType.get(), "comment");
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
Column col3 = Column.of("date_of_birth", Types.StringType.get(), "comment");

Column[] newColumns = new Column[] {col1, col2, col3};
NameIdentifier tableIdentifier =
NameIdentifier.of(schemaName, GravitinoITUtils.genRandomName("PaimonAlterTableIT"));
NameIdentifier.of(schemaName, GravitinoITUtils.genRandomName("new_alter_paimon_table"));
catalog
.asTableCatalog()
.createTable(
Expand Down Expand Up @@ -857,9 +872,15 @@ void testOperationDataOfPaimonTable() {
}

private void clearTableAndSchema() {
if (catalog.asSchemas().schemaExists(schemaName)) {
catalog.asSchemas().dropSchema(schemaName, true);
}
SupportsSchemas supportsSchema = catalog.asSchemas();
Arrays.stream(supportsSchema.listSchemas())
.forEach(
schema -> {
// can not drop default database for hive backend.
if (!DEFAULT_DB.equalsIgnoreCase(schema)) {
supportsSchema.dropSchema(schema, true);
}
});
}

private void createMetalake() {
Expand Down Expand Up @@ -903,10 +924,8 @@ private void createSchema() {
prop.put("key2", "val2");

Schema createdSchema = catalog.asSchemas().createSchema(ident.name(), schema_comment, prop);
// database properties is empty for Paimon FilesystemCatalog.
Schema loadSchema = catalog.asSchemas().loadSchema(ident.name());
Assertions.assertEquals(createdSchema.name(), loadSchema.name());
Assertions.assertTrue(loadSchema.properties().isEmpty());
}

private Column[] createColumns() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SupportsSchemas;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.paimon.catalog.Catalog;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@Tag("gravitino-docker-test")
Expand All @@ -48,4 +55,22 @@ protected Map<String, String> initPaimonCatalogProperties() {

return catalogProperties;
}

@Test
void testPaimonSchemaProperties() throws Catalog.DatabaseNotExistException {
SupportsSchemas schemas = catalog.asSchemas();

// create schema.
String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1");
NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName);
Map<String, String> schemaProperties = Maps.newHashMap();
schemaProperties.put("key1", "val1");
schemaProperties.put("key2", "val2");
schemas.createSchema(schemaIdent.name(), schema_comment, schemaProperties);

// load schema check, database properties is empty for Paimon FilesystemCatalog.
Schema schema = schemas.loadSchema(schemaIdent.name());
Assertions.assertTrue(schema.properties().isEmpty());
Assertions.assertTrue(paimonCatalog.loadDatabaseProperties(schemaIdent.name()).isEmpty());
}
}
Loading
Loading