Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Feb 29, 2024
1 parent 5c1fa42 commit c0a9207
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 277 deletions.
147 changes: 0 additions & 147 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,151 +176,6 @@ Using the table option facilitates the convenient definition of Hive table param
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table.
For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process.
## Creating a Catalog with Filesystem Metastore
{{< tabs "filesystem-metastore-example" >}}
{{< tab "Flink" >}}
The following Flink SQL registers and uses a Paimon catalog named `my_catalog`. Metadata and table files are stored under `hdfs:///path/to/warehouse`.
```sql
CREATE CATALOG my_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs:///path/to/warehouse'
);
USE CATALOG my_catalog;
```
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
{{< /tab >}}
{{< tab "Spark3" >}}
The following shell command registers a paimon catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`.
```bash
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse
```
You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog.
After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL.
```sql
USE paimon.default;
```
{{< /tab >}}
{{< /tabs >}}
## Creating a Catalog with Hive Metastore
By using Paimon Hive catalog, changes to the catalog will directly affect the corresponding Hive metastore. Tables created in such catalog can also be accessed directly from Hive.
To use Hive catalog, Database name, Table name and Field names should be **lower** case.
{{< tabs "hive-metastore-example" >}}
{{< tab "Flink" >}}
Paimon Hive catalog in Flink relies on Flink Hive connector bundled jar. You should first download Hive connector bundled jar and add it to classpath.
| Metastore version | Bundle Name | SQL Client JAR |
|:------------------|:--------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.3.0 - 3.1.3 | Flink Bundle | [Download](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/overview/#using-bundled-hive-jar) |
| 1.2.0 - x.x.x | Presto Bundle | [Download](https://repo.maven.apache.org/maven2/com/facebook/presto/hive/hive-apache/1.2.2-2/hive-apache-1.2.2-2.jar) |
The following Flink SQL registers and uses a Paimon Hive catalog named `my_hive`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore.
If your Hive requires security authentication such as Kerberos, LDAP, Ranger or you want the paimon table to be managed
by Apache Atlas(Setting 'hive.metastore.event.listeners' in hive-site.xml). You can specify the hive-conf-dir and
hadoop-conf-dir parameter to the hive-site.xml file path.
```sql
CREATE CATALOG my_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
-- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
-- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
-- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
-- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
);
USE CATALOG my_hive;
```
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}).
{{< /tab >}}
{{< tab "Spark3" >}}
Your Spark installation should be able to detect, or already contains Hive dependencies. See [here](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html) for more information.
The following shell command registers a Paimon Hive catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore.
```bash
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
--conf spark.sql.catalog.paimon.metastore=hive \
--conf spark.sql.catalog.paimon.uri=thrift://<hive-metastore-host-name>:<port>
```
You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog.
After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL.
```sql
USE paimon.default;
```
Also, you can create [SparkGenericCatalog]({{< ref "engines/spark" >}}).
{{< /tab >}}
{{< /tabs >}}
> When using hive catalog to change incompatible column types through alter table, you need to configure `hive.metastore.disallow.incompatible.col.type.changes=false`. see [HIVE-17832](https://issues.apache.org/jira/browse/HIVE-17832).
> If you are using Hive3, please disable Hive ACID:
>
> ```shell
> hive.strict.managed.tables=false
> hive.create.as.insert.only=false
> metastore.create.as.acid=false
> ```
### Setting Location in Properties
If you are using an object storage , and you don't want that the location of paimon table/database is accessed by the filesystem of hive,
which may lead to the error such as "No FileSystem for scheme: s3a".
You can set location in the properties of table/database by the config of `location-in-properties`. See
[setting the location of table/database in properties ]({{< ref "maintenance/configurations#HiveCatalogOptions" >}})
### Synchronizing Partitions into Hive Metastore
By default, Paimon does not synchronize newly created partitions into Hive metastore. Users will see an unpartitioned table in Hive. Partition push-down will be carried out by filter push-down instead.
If you want to see a partitioned table in Hive and also synchronize newly created partitions into Hive metastore, please set the table property `metastore.partitioned-table` to true. Also see [CoreOptions]({{< ref "maintenance/configurations#CoreOptions" >}}).
### Adding Parameters to a Hive Table
Using the table option facilitates the convenient definition of Hive table parameters.
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table.
For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process.
## Creating a Catalog with JDBC Metastore
By using the Paimon JDBC catalog, changes to the catalog will be directly stored in relational databases such as MySQL, postgres, etc.
Expand Down Expand Up @@ -353,8 +208,6 @@ You can define any connection parameters for a database with the prefix "jdbc.".
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}).

{{< /tab >}}
{{< /tabs >}}
31 changes: 2 additions & 29 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import static org.apache.paimon.jdbc.JdbcUtils.execute;
import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
import static org.apache.paimon.jdbc.JdbcUtils.updateTableMetadataLocation;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;

/** Support jdbc catalog. */
Expand Down Expand Up @@ -270,7 +269,6 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
sql.setString(1, catalogName);
sql.setString(2, identifier.getDatabaseName());
sql.setString(3, identifier.getObjectName());
sql.setString(4, path.toString());
return sql.executeUpdate();
}
});
Expand Down Expand Up @@ -312,9 +310,6 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
+ " to underlying files.",
e);
}
// Update table metadata
updateTableMetadataLocation(
connections, catalogName, toTable, toPath.toString(), fromPath.toString());
}
} catch (Exception e) {
throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e);
Expand All @@ -328,30 +323,8 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throw new RuntimeException(
String.format("Table is not exists {}", identifier.getFullName()));
}
final SchemaManager schemaManager = getSchemaManager(identifier);
// first commit changes to underlying files
TableSchema schema = schemaManager.commitChanges(changes);
try {
String newMetadataLocation = getDataTableLocation(identifier).toString();
Map<String, String> tableMetadata =
JdbcUtils.getTable(
connections,
catalogName,
identifier.getDatabaseName(),
identifier.getObjectName());
String oldMetadataLocation = tableMetadata.get(JdbcUtils.METADATA_LOCATION_PROP);
if (!newMetadataLocation.equals(oldMetadataLocation)) {
updateTableMetadataLocation(
connections,
catalogName,
identifier,
newMetadataLocation,
oldMetadataLocation);
}
} catch (Exception te) {
schemaManager.deleteSchema(schema.id());
throw new RuntimeException(te);
}
SchemaManager schemaManager = getSchemaManager(identifier);
schemaManager.commitChanges(changes);
}

@Override
Expand Down
102 changes: 1 addition & 101 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,11 @@
/** Util for jdbc catalog. */
public class JdbcUtils {
private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class);
public static final String METADATA_LOCATION_PROP = "metadata_location";
public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location";
public static final String CATALOG_TABLE_NAME = "paimon_tables";
public static final String CATALOG_NAME = "catalog_name";
public static final String TABLE_DATABASE = "database_name";
public static final String TABLE_NAME = "table_name";

static final String DO_COMMIT_SQL =
"UPDATE "
+ CATALOG_TABLE_NAME
+ " SET "
+ METADATA_LOCATION_PROP
+ " = ? , "
+ PREVIOUS_METADATA_LOCATION_PROP
+ " = ? "
+ " WHERE "
+ CATALOG_NAME
+ " = ? AND "
+ TABLE_DATABASE
+ " = ? AND "
+ TABLE_NAME
+ " = ? AND "
+ METADATA_LOCATION_PROP
+ " = ?";
static final String CREATE_CATALOG_TABLE =
"CREATE TABLE "
+ CATALOG_TABLE_NAME
Expand All @@ -74,10 +55,6 @@ public class JdbcUtils {
+ " VARCHAR(255) NOT NULL,"
+ TABLE_NAME
+ " VARCHAR(255) NOT NULL,"
+ METADATA_LOCATION_PROP
+ " VARCHAR(1000),"
+ PREVIOUS_METADATA_LOCATION_PROP
+ " VARCHAR(1000),"
+ " PRIMARY KEY ("
+ CATALOG_NAME
+ ", "
Expand Down Expand Up @@ -166,12 +143,8 @@ public class JdbcUtils {
+ TABLE_DATABASE
+ ", "
+ TABLE_NAME
+ ", "
+ METADATA_LOCATION_PROP
+ ", "
+ PREVIOUS_METADATA_LOCATION_PROP
+ ") "
+ " VALUES (?,?,?,?,null)";
+ " VALUES (?,?,?)";

// Catalog database Properties
static final String DATABASE_PROPERTIES_TABLE_NAME = "paimon_database_properties";
Expand Down Expand Up @@ -336,10 +309,6 @@ public static Map<String, String> getTable(
table.put(CATALOG_NAME, rs.getString(CATALOG_NAME));
table.put(TABLE_DATABASE, rs.getString(TABLE_DATABASE));
table.put(TABLE_NAME, rs.getString(TABLE_NAME));
table.put(METADATA_LOCATION_PROP, rs.getString(METADATA_LOCATION_PROP));
table.put(
PREVIOUS_METADATA_LOCATION_PROP,
rs.getString(PREVIOUS_METADATA_LOCATION_PROP));
}
rs.close();
}
Expand All @@ -355,8 +324,6 @@ public static void updateTable(
int updatedRecords =
execute(
err -> {
// SQLite doesn't set SQLState or throw
// SQLIntegrityConstraintViolationException
if (err instanceof SQLIntegrityConstraintViolationException
|| (err.getMessage() != null
&& err.getMessage().contains("constraint failed"))) {
Expand All @@ -383,40 +350,6 @@ public static void updateTable(
}
}

/** Update table metadata location. */
public static void updateTableMetadataLocation(
JdbcClientPool connections,
String catalogName,
Identifier identifier,
String newMetadataLocation,
String oldMetadataLocation)
throws SQLException, InterruptedException {
int updatedRecords =
connections.run(
conn -> {
try (PreparedStatement sql =
conn.prepareStatement(JdbcUtils.DO_COMMIT_SQL)) {
// UPDATE
sql.setString(1, newMetadataLocation);
sql.setString(2, oldMetadataLocation);
// WHERE
sql.setString(3, catalogName);
sql.setString(4, identifier.getDatabaseName());
sql.setString(5, identifier.getObjectName());
sql.setString(6, oldMetadataLocation);
return sql.executeUpdate();
}
});
if (updatedRecords == 1) {
LOG.debug("Successfully committed to existing table: {}", identifier.getFullName());
} else {
throw new RuntimeException(
String.format(
"Failed to update table %s from catalog %s",
identifier.getFullName(), catalogName));
}
}

public static boolean databaseExists(
JdbcClientPool connections, String catalogName, String databaseName) {

Expand Down Expand Up @@ -517,39 +450,6 @@ public static boolean insertProperties(
insertedRecords, properties.size()));
}

public static boolean updateProperties(
JdbcClientPool connections,
String catalogName,
String databaseName,
Map<String, String> properties) {
Stream<String> caseArgs =
properties.entrySet().stream()
.flatMap(entry -> Stream.of(entry.getKey(), entry.getValue()));
Stream<String> whereArgs =
Stream.concat(Stream.of(catalogName, databaseName), properties.keySet().stream());
String[] args = Stream.concat(caseArgs, whereArgs).toArray(String[]::new);
int updatedRecords =
execute(connections, JdbcUtils.updatePropertiesStatement(properties.size()), args);
if (updatedRecords == properties.size()) {
return true;
}
throw new IllegalStateException(
String.format(
"Failed to update: %d of %d succeeded", updatedRecords, properties.size()));
}

public static boolean deleteProperties(
JdbcClientPool connections,
String catalogName,
String databaseName,
Set<String> properties) {
String[] args =
Stream.concat(Stream.of(catalogName, databaseName), properties.stream())
.toArray(String[]::new);

return execute(connections, JdbcUtils.deletePropertiesStatement(properties), args) > 0;
}

private static String insertPropertiesStatement(int size) {
StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_DATABASE_PROPERTIES_SQL);
for (int i = 0; i < size; i++) {
Expand Down

0 comments on commit c0a9207

Please sign in to comment.