From c0a9207dccccf1727432db523d7bcb519e010a6c Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 29 Feb 2024 12:26:46 +0800 Subject: [PATCH] fixed --- docs/content/how-to/creating-catalogs.md | 147 ------------------ .../org/apache/paimon/jdbc/JdbcCatalog.java | 31 +--- .../org/apache/paimon/jdbc/JdbcUtils.java | 102 +----------- 3 files changed, 3 insertions(+), 277 deletions(-) diff --git a/docs/content/how-to/creating-catalogs.md b/docs/content/how-to/creating-catalogs.md index bc170ae933c97..4d12d96ed0af1 100644 --- a/docs/content/how-to/creating-catalogs.md +++ b/docs/content/how-to/creating-catalogs.md @@ -176,151 +176,6 @@ Using the table option facilitates the convenient definition of Hive table param Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table. For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process. - - -## Creating a Catalog with Filesystem Metastore - -{{< tabs "filesystem-metastore-example" >}} - -{{< tab "Flink" >}} - -The following Flink SQL registers and uses a Paimon catalog named `my_catalog`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. - -```sql -CREATE CATALOG my_catalog WITH ( - 'type' = 'paimon', - 'warehouse' = 'hdfs:///path/to/warehouse' -); - -USE CATALOG my_catalog; -``` - -You can define any default table options with the prefix `table-default.` for tables created in the catalog. - -{{< /tab >}} - -{{< tab "Spark3" >}} - -The following shell command registers a paimon catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. - -```bash -spark-sql ... \ - --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ - --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse -``` - -You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog. - -After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL. - -```sql -USE paimon.default; -``` - -{{< /tab >}} - -{{< /tabs >}} - -## Creating a Catalog with Hive Metastore - -By using Paimon Hive catalog, changes to the catalog will directly affect the corresponding Hive metastore. Tables created in such catalog can also be accessed directly from Hive. - -To use Hive catalog, Database name, Table name and Field names should be **lower** case. - -{{< tabs "hive-metastore-example" >}} - -{{< tab "Flink" >}} - -Paimon Hive catalog in Flink relies on Flink Hive connector bundled jar. You should first download Hive connector bundled jar and add it to classpath. - -| Metastore version | Bundle Name | SQL Client JAR | -|:------------------|:--------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.3.0 - 3.1.3 | Flink Bundle | [Download](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/overview/#using-bundled-hive-jar) | -| 1.2.0 - x.x.x | Presto Bundle | [Download](https://repo.maven.apache.org/maven2/com/facebook/presto/hive/hive-apache/1.2.2-2/hive-apache-1.2.2-2.jar) | - -The following Flink SQL registers and uses a Paimon Hive catalog named `my_hive`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore. - -If your Hive requires security authentication such as Kerberos, LDAP, Ranger or you want the paimon table to be managed -by Apache Atlas(Setting 'hive.metastore.event.listeners' in hive-site.xml). You can specify the hive-conf-dir and -hadoop-conf-dir parameter to the hive-site.xml file path. - -```sql -CREATE CATALOG my_hive WITH ( - 'type' = 'paimon', - 'metastore' = 'hive', - -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf - -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment - -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - -- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf -); - -USE CATALOG my_hive; -``` - -You can define any default table options with the prefix `table-default.` for tables created in the catalog. - -Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}). - -{{< /tab >}} - -{{< tab "Spark3" >}} - -Your Spark installation should be able to detect, or already contains Hive dependencies. See [here](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html) for more information. - -The following shell command registers a Paimon Hive catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore. - -```bash -spark-sql ... \ - --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ - --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \ - --conf spark.sql.catalog.paimon.metastore=hive \ - --conf spark.sql.catalog.paimon.uri=thrift://: -``` - -You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog. - -After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL. - -```sql -USE paimon.default; -``` - -Also, you can create [SparkGenericCatalog]({{< ref "engines/spark" >}}). - -{{< /tab >}} - -{{< /tabs >}} - -> When using hive catalog to change incompatible column types through alter table, you need to configure `hive.metastore.disallow.incompatible.col.type.changes=false`. see [HIVE-17832](https://issues.apache.org/jira/browse/HIVE-17832). - -> If you are using Hive3, please disable Hive ACID: -> -> ```shell -> hive.strict.managed.tables=false -> hive.create.as.insert.only=false -> metastore.create.as.acid=false -> ``` - -### Setting Location in Properties - -If you are using an object storage , and you don't want that the location of paimon table/database is accessed by the filesystem of hive, -which may lead to the error such as "No FileSystem for scheme: s3a". -You can set location in the properties of table/database by the config of `location-in-properties`. See -[setting the location of table/database in properties ]({{< ref "maintenance/configurations#HiveCatalogOptions" >}}) - -### Synchronizing Partitions into Hive Metastore - -By default, Paimon does not synchronize newly created partitions into Hive metastore. Users will see an unpartitioned table in Hive. Partition push-down will be carried out by filter push-down instead. - -If you want to see a partitioned table in Hive and also synchronize newly created partitions into Hive metastore, please set the table property `metastore.partitioned-table` to true. Also see [CoreOptions]({{< ref "maintenance/configurations#CoreOptions" >}}). - -### Adding Parameters to a Hive Table - -Using the table option facilitates the convenient definition of Hive table parameters. -Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table. -For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process. - - ## Creating a Catalog with JDBC Metastore By using the Paimon JDBC catalog, changes to the catalog will be directly stored in relational databases such as MySQL, postgres, etc. @@ -353,8 +208,6 @@ You can define any connection parameters for a database with the prefix "jdbc.". You can define any default table options with the prefix `table-default.` for tables created in the catalog. -Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}). - {{< /tab >}} {{< /tabs >}} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 064b50554937f..2a06ea8a2a48f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -56,7 +56,6 @@ import static org.apache.paimon.jdbc.JdbcUtils.execute; import static org.apache.paimon.jdbc.JdbcUtils.insertProperties; import static org.apache.paimon.jdbc.JdbcUtils.updateTable; -import static org.apache.paimon.jdbc.JdbcUtils.updateTableMetadataLocation; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; /** Support jdbc catalog. */ @@ -270,7 +269,6 @@ protected void createTableImpl(Identifier identifier, Schema schema) { sql.setString(1, catalogName); sql.setString(2, identifier.getDatabaseName()); sql.setString(3, identifier.getObjectName()); - sql.setString(4, path.toString()); return sql.executeUpdate(); } }); @@ -312,9 +310,6 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { + " to underlying files.", e); } - // Update table metadata - updateTableMetadataLocation( - connections, catalogName, toTable, toPath.toString(), fromPath.toString()); } } catch (Exception e) { throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e); @@ -328,30 +323,8 @@ protected void alterTableImpl(Identifier identifier, List changes) throw new RuntimeException( String.format("Table is not exists {}", identifier.getFullName())); } - final SchemaManager schemaManager = getSchemaManager(identifier); - // first commit changes to underlying files - TableSchema schema = schemaManager.commitChanges(changes); - try { - String newMetadataLocation = getDataTableLocation(identifier).toString(); - Map tableMetadata = - JdbcUtils.getTable( - connections, - catalogName, - identifier.getDatabaseName(), - identifier.getObjectName()); - String oldMetadataLocation = tableMetadata.get(JdbcUtils.METADATA_LOCATION_PROP); - if (!newMetadataLocation.equals(oldMetadataLocation)) { - updateTableMetadataLocation( - connections, - catalogName, - identifier, - newMetadataLocation, - oldMetadataLocation); - } - } catch (Exception te) { - schemaManager.deleteSchema(schema.id()); - throw new RuntimeException(te); - } + SchemaManager schemaManager = getSchemaManager(identifier); + schemaManager.commitChanges(changes); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 231f4d44a2983..4c732625716cc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -40,30 +40,11 @@ /** Util for jdbc catalog. */ public class JdbcUtils { private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); - public static final String METADATA_LOCATION_PROP = "metadata_location"; - public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location"; public static final String CATALOG_TABLE_NAME = "paimon_tables"; public static final String CATALOG_NAME = "catalog_name"; public static final String TABLE_DATABASE = "database_name"; public static final String TABLE_NAME = "table_name"; - static final String DO_COMMIT_SQL = - "UPDATE " - + CATALOG_TABLE_NAME - + " SET " - + METADATA_LOCATION_PROP - + " = ? , " - + PREVIOUS_METADATA_LOCATION_PROP - + " = ? " - + " WHERE " - + CATALOG_NAME - + " = ? AND " - + TABLE_DATABASE - + " = ? AND " - + TABLE_NAME - + " = ? AND " - + METADATA_LOCATION_PROP - + " = ?"; static final String CREATE_CATALOG_TABLE = "CREATE TABLE " + CATALOG_TABLE_NAME @@ -74,10 +55,6 @@ public class JdbcUtils { + " VARCHAR(255) NOT NULL," + TABLE_NAME + " VARCHAR(255) NOT NULL," - + METADATA_LOCATION_PROP - + " VARCHAR(1000)," - + PREVIOUS_METADATA_LOCATION_PROP - + " VARCHAR(1000)," + " PRIMARY KEY (" + CATALOG_NAME + ", " @@ -166,12 +143,8 @@ public class JdbcUtils { + TABLE_DATABASE + ", " + TABLE_NAME - + ", " - + METADATA_LOCATION_PROP - + ", " - + PREVIOUS_METADATA_LOCATION_PROP + ") " - + " VALUES (?,?,?,?,null)"; + + " VALUES (?,?,?)"; // Catalog database Properties static final String DATABASE_PROPERTIES_TABLE_NAME = "paimon_database_properties"; @@ -336,10 +309,6 @@ public static Map getTable( table.put(CATALOG_NAME, rs.getString(CATALOG_NAME)); table.put(TABLE_DATABASE, rs.getString(TABLE_DATABASE)); table.put(TABLE_NAME, rs.getString(TABLE_NAME)); - table.put(METADATA_LOCATION_PROP, rs.getString(METADATA_LOCATION_PROP)); - table.put( - PREVIOUS_METADATA_LOCATION_PROP, - rs.getString(PREVIOUS_METADATA_LOCATION_PROP)); } rs.close(); } @@ -355,8 +324,6 @@ public static void updateTable( int updatedRecords = execute( err -> { - // SQLite doesn't set SQLState or throw - // SQLIntegrityConstraintViolationException if (err instanceof SQLIntegrityConstraintViolationException || (err.getMessage() != null && err.getMessage().contains("constraint failed"))) { @@ -383,40 +350,6 @@ public static void updateTable( } } - /** Update table metadata location. */ - public static void updateTableMetadataLocation( - JdbcClientPool connections, - String catalogName, - Identifier identifier, - String newMetadataLocation, - String oldMetadataLocation) - throws SQLException, InterruptedException { - int updatedRecords = - connections.run( - conn -> { - try (PreparedStatement sql = - conn.prepareStatement(JdbcUtils.DO_COMMIT_SQL)) { - // UPDATE - sql.setString(1, newMetadataLocation); - sql.setString(2, oldMetadataLocation); - // WHERE - sql.setString(3, catalogName); - sql.setString(4, identifier.getDatabaseName()); - sql.setString(5, identifier.getObjectName()); - sql.setString(6, oldMetadataLocation); - return sql.executeUpdate(); - } - }); - if (updatedRecords == 1) { - LOG.debug("Successfully committed to existing table: {}", identifier.getFullName()); - } else { - throw new RuntimeException( - String.format( - "Failed to update table %s from catalog %s", - identifier.getFullName(), catalogName)); - } - } - public static boolean databaseExists( JdbcClientPool connections, String catalogName, String databaseName) { @@ -517,39 +450,6 @@ public static boolean insertProperties( insertedRecords, properties.size())); } - public static boolean updateProperties( - JdbcClientPool connections, - String catalogName, - String databaseName, - Map properties) { - Stream caseArgs = - properties.entrySet().stream() - .flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())); - Stream whereArgs = - Stream.concat(Stream.of(catalogName, databaseName), properties.keySet().stream()); - String[] args = Stream.concat(caseArgs, whereArgs).toArray(String[]::new); - int updatedRecords = - execute(connections, JdbcUtils.updatePropertiesStatement(properties.size()), args); - if (updatedRecords == properties.size()) { - return true; - } - throw new IllegalStateException( - String.format( - "Failed to update: %d of %d succeeded", updatedRecords, properties.size())); - } - - public static boolean deleteProperties( - JdbcClientPool connections, - String catalogName, - String databaseName, - Set properties) { - String[] args = - Stream.concat(Stream.of(catalogName, databaseName), properties.stream()) - .toArray(String[]::new); - - return execute(connections, JdbcUtils.deletePropertiesStatement(properties), args) > 0; - } - private static String insertPropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_DATABASE_PROPERTIES_SQL); for (int i = 0; i < size; i++) {