diff --git a/flink-catalog-aws-glue/pom.xml b/flink-catalog-aws-glue/pom.xml new file mode 100644 index 000000000..4505dc366 --- /dev/null +++ b/flink-catalog-aws-glue/pom.xml @@ -0,0 +1,115 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-connector-aws-parent + 4.1-SNAPSHOT + + + flink-catalog-aws-glue + Flink : Catalog : AWS : Glue + jar + + + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + + org.apache.flink + flink-connector-aws-base + ${project.version} + + + + software.amazon.awssdk + glue + ${aws.sdkv2.version} + + + + software.amazon.awssdk + apache-client + ${aws.sdkv2.version} + compile + + + + software.amazon.awssdk + url-connection-client + ${aws.sdkv2.version} + + + + + + org.apache.flink + flink-architecture-tests-test + test + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-api-java + ${flink.version} + test-jar + test + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + META-INF/services/org.junit.jupiter.api.extension.Extension + + + + + + + + diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java new file mode 100644 index 000000000..33cdc7408 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java @@ -0,0 +1,1031 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.table.catalog.glue.GlueCatalogConfig.LOCATION_URI; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A Glue catalog implementation that uses glue catalog. + */ +public class GlueCatalog extends AbstractCatalog { + + /** + * instance of GlueOperator to facilitate glue related actions. + */ + public GlueOperator glueOperator; + + /** + * Default database name if not passed as part of catalog. + */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, Map glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + initialize(glueProperties); + } + + public GlueCatalog(String catalogName) { + super(catalogName, DEFAULT_DB); + Map properties = new HashMap<>(); + initialize(properties); + } + + public void initialize(Map catalogProperties) { + // setLocationUri for the database level + String locationUri = catalogProperties.getOrDefault(LOCATION_URI, ""); + // initialize aws client factories + AwsProperties awsProperties = new AwsProperties(catalogProperties); + + // create glue client + GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue(); + this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient); + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException { + } + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueOperator.closeClient(); + } catch (Exception e) { + LOG.warn("Glue Client is not closed properly!"); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param name Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name)); + checkNotNull(database); + + // glue supports lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + if (databaseExists(name)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), name); + } + } else { + glueOperator.createGlueDatabase(name, database); + } + } + + /** + * Drop a database. + * + * @param name Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + if (databaseExists(name)) { + if (cascade) { + // delete all tables in database + glueOperator.deleteTablesFromDatabase(name, listTables(name)); + // delete all functions in database + glueOperator.deleteFunctionsFromDatabase(name, listFunctions(name)); + } + + if (glueOperator.isDatabaseEmpty(name)) { + glueOperator.dropGlueDatabase(name); + } else { + throw new DatabaseNotEmptyException(getName(), name); + } + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Modify an existing database. + * + * @param name Name of the database to be modified + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + checkNotNull(newDatabase, "Database cannot be Empty"); + + CatalogDatabase existingDatabase = glueOperator.getDatabase(name); + if (existingDatabase != null) { + if (existingDatabase.getClass() != newDatabase.getClass()) { + throw new CatalogException( + String.format( + "Database types don't match. Existing database is '%s' and new database is '%s'.", + existingDatabase.getClass().getName(), + newDatabase.getClass().getName())); + } + glueOperator.updateGlueDatabase(name, newDatabase); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Get the names of all databases in this catalog. + * + * @return a list of the names of all databases + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listDatabases() throws CatalogException { + return glueOperator.listGlueDatabases(); + } + + /** + * Get a database from this catalog. + * + * @param databaseName Name of the database + * @return The requested database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + databaseName = databaseName.toLowerCase(Locale.ROOT); + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + return glueOperator.getDatabase(databaseName); + } + + /** + * Check if a database exists in this catalog. + * + * @param databaseName Name of the database + * @return true if the given database exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + try { + CatalogDatabase database = getDatabase(databaseName); + return database != null; + } catch (DatabaseNotExistException e) { + return false; + } + } + + // ------ tables ------ + + /** + * Creates a new table or view. + * + *

The framework will make sure to call this method with fully validated {@link + * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances are easy to serialize + * for a durable catalog implementation. + * + * @param tablePath path of the table or view to be created + * @param table the table definition + * @param ignoreIfExists flag to specify behavior when a table or view already exists at the + * given path: if set to false, it throws a TableAlreadyExistException, if set to true, do + * nothing. + * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false + * @throws DatabaseNotExistException if the database in tablePath doesn't exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(tablePath); + checkNotNull(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); + } + + if (tableExists(tablePath)) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(getName(), tablePath); + } + } else { + glueOperator.createGlueTable(tablePath, table, false); + } + } + + /** + * Modifies an existing table or view. Note that the new and old {@link CatalogBaseTable} must + * be of the same kind. For example, this doesn't allow altering a regular table to partitioned + * table, or altering a view to a table, and vice versa. + * + *

The framework will make sure to call this method with fully validated {@link + * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances are easy to serialize + * for a durable catalog implementation. + * + * @param tablePath path of the table or view to be modified + * @param newTable the new table definition + * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws TableNotExistException if the table does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + + checkNotNull(tablePath); + checkNotNull(newTable); + + CatalogBaseTable existingTable = getTable(tablePath); + + if (existingTable != null) { + if (existingTable.getTableKind() != newTable.getTableKind()) { + throw new CatalogException( + String.format( + "Table types don't match. Existing table is '%s' and new table is '%s'.", + existingTable.getTableKind(), newTable.getTableKind())); + } + glueOperator.alterGlueTable(tablePath, newTable, false); + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath); + } + } + + // ------ tables and views ------ + + /** + * Drop a table or view. + * + * @param tablePath Path of the table or view to be dropped + * @param ignoreIfNotExists Flag to specify behavior when the table or view does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws TableNotExistException if the table or view does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + checkNotNull(tablePath); + if (tableExists(tablePath)) { + glueOperator.deleteGlueTable(tablePath); + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath); + } + } + + /** + * Rename an existing table or view. + * + * @param tablePath Path of the table or view to be renamed + * @param newTableName the new name of the table or view + * @param ignoreIfNotExists Flag to specify behavior when the table or view does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws TableNotExistException if the table does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + checkNotNull(tablePath); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName)); + + if (tableExists(tablePath)) { + ObjectPath newTablePath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + if (tableExists(newTablePath)) { + throw new TableAlreadyExistException(getName(), newTablePath); + } + glueOperator.renameGlueTable(tablePath, newTablePath); + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath); + } + } + + /** + * Get names of all tables and views under this database. An empty list is returned if none + * exists. + * + * @param databaseName fully qualified database name. + * @return a list of the names of all tables and views in this database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName)); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + List results = glueOperator.getGlueTableList(databaseName, CatalogBaseTable.TableKind.TABLE.name()); + results.addAll(listViews(databaseName)); + return results; + } + + /** + * Get names of all views under this database. An empty list is returned if none exists. + * + * @param databaseName the name of the given database + * @return a list of the names of all views in the given database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listViews(String databaseName) + throws DatabaseNotExistException, CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + return glueOperator.getGlueTableList(databaseName, CatalogBaseTable.TableKind.VIEW.name()); + } + + /** + * Returns a {@link CatalogTable} or {@link CatalogView} identified by the given {@link + * ObjectPath}. The framework will resolve the metadata objects when necessary. + * + * @param tablePath Path of the table or view + * @return The requested table or view + * @throws TableNotExistException if the target does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + checkNotNull(tablePath); + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + return glueOperator.getCatalogTableFromGlueTable(glueOperator.getGlueTable(tablePath)); + } + + /** + * Check if a table or view exists in this catalog. + * + * @param tablePath Path of the table or view + * @return true if the given table exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + checkNotNull(tablePath); + return databaseExists(tablePath.getDatabaseName()) && glueOperator.glueTableExists(tablePath); + } + + // ------ functions ------ + + /** + * Create a function. Function name should be handled in a case-insensitive way. + * + * @param path path of the function + * @param function the function to be created + * @param ignoreIfExists flag to specify behavior if a function with the given name already + * exists: if set to false, it throws a FunctionAlreadyExistException, if set to true, + * nothing happens. + * @throws FunctionAlreadyExistException if the function already exist + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createFunction(ObjectPath path, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(path); + checkNotNull(function); + + ObjectPath functionPath = normalize(path); + if (!databaseExists(functionPath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), functionPath.getDatabaseName()); + } + + if (functionExists(functionPath)) { + if (!ignoreIfExists) { + throw new FunctionAlreadyExistException(getName(), functionPath); + } + } + glueOperator.createGlueFunction(functionPath, function); + } + + private ObjectPath normalize(ObjectPath path) { + return new ObjectPath( + path.getDatabaseName(), FunctionIdentifier.normalizeName(path.getObjectName())); + } + + /** + * Modify an existing function. Function name should be handled in a case-insensitive way. + * + * @param path path of the function + * @param newFunction the function to be modified + * @param ignoreIfNotExists flag to specify behavior if the function does not exist: if set to + * false, throw an exception if set to true, nothing happens + * @throws FunctionNotExistException if the function does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterFunction( + ObjectPath path, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + checkNotNull(path); + checkNotNull(newFunction); + + ObjectPath functionPath = normalize(path); + + CatalogFunction existingFunction = getFunction(functionPath); + + if (existingFunction != null) { + if (existingFunction.getClass() != newFunction.getClass()) { + throw new CatalogException( + String.format( + "Function types don't match. Existing function is '%s' and new function is '%s'.", + existingFunction.getClass().getName(), + newFunction.getClass().getName())); + } + + glueOperator.alterGlueFunction(functionPath, newFunction); + } else if (!ignoreIfNotExists) { + throw new FunctionNotExistException(getName(), functionPath); + } + } + + /** + * Drop a function. Function name should be handled in a case-insensitive way. + * + * @param path path of the function to be dropped + * @param ignoreIfNotExists flag to specify behavior if the function does not exist: if set to + * false, throw an exception if set to true, nothing happens + * @throws FunctionNotExistException if the function does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropFunction(ObjectPath path, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + checkNotNull(path); + + ObjectPath functionPath = normalize(path); + + if (functionExists(functionPath)) { + glueOperator.dropGlueFunction(functionPath); + } else if (!ignoreIfNotExists) { + throw new FunctionNotExistException(getName(), functionPath); + } + } + + + /** + * List the names of all functions in the given database. An empty list is returned if none is + * registered. + * + * @param databaseName name of the database. + * @return a list of the names of the functions in this database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listFunctions(String databaseName) + throws DatabaseNotExistException, CatalogException { + + databaseName = databaseName.toLowerCase(Locale.ROOT); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "databaseName cannot be null or empty"); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return glueOperator.listGlueFunctions(databaseName); + } + + /** + * Get the function. Function name should be handled in a case-insensitive way. + * + * @param path path of the function + * @return the requested function + * @throws FunctionNotExistException if the function does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogFunction getFunction(ObjectPath path) + throws FunctionNotExistException, CatalogException { + checkNotNull(path); + + ObjectPath functionPath = normalize(path); + + if (!functionExists(functionPath)) { + throw new FunctionNotExistException(getName(), functionPath); + } else { + return glueOperator.getGlueFunction(functionPath); + } + } + + /** + * Check whether a function exists or not. Function name should be handled in a case-insensitive + * way. + * + * @param path path of the function + * @return true if the function exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean functionExists(ObjectPath path) throws CatalogException { + checkNotNull(path); + ObjectPath functionPath = normalize(path); + return databaseExists(functionPath.getDatabaseName()) && glueOperator.glueFunctionExists(functionPath); + } + + /** + * Create a partition. + * + * @param tablePath path of the table. + * @param partitionSpec partition spec of the partition + * @param partition the partition to add. + * @param ignoreIfExists flag to specify behavior if a table with the given name already exists: + * if set to false, it throws a TableAlreadyExistException, if set to true, nothing happens. + * @throws TableNotExistException thrown if the target table does not exist + * @throws TableNotPartitionedException thrown if the target table is not partitioned + * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid + * @throws PartitionAlreadyExistsException thrown if the target partition already exists + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, PartitionAlreadyExistsException, + CatalogException { + checkNotNull(tablePath); + checkNotNull(partitionSpec); + checkNotNull(partition); + + Table glueTable = glueOperator.getGlueTable(tablePath); + glueOperator.ensurePartitionedTable(tablePath, glueTable); + + if (partitionExists(tablePath, partitionSpec)) { + if (!ignoreIfExists) { + throw new PartitionAlreadyExistsException (getName(), tablePath, partitionSpec); + } + } + glueOperator.createGluePartition(glueTable, partitionSpec, partition); + } + + /** + * Get CatalogPartitionSpec of all partitions of the table. + * + * @param tablePath path of the table + * @return a list of CatalogPartitionSpec of the table + * @throws TableNotExistException thrown if the table does not exist in the catalog + * @throws TableNotPartitionedException thrown if the table is not partitioned + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + + checkNotNull(tablePath); + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + if (!isPartitionedTable(tablePath)) { + throw new TableNotPartitionedException(getName(), tablePath); + } + + return glueOperator.listAllGluePartitions(tablePath); + } + + private boolean isPartitionedTable(ObjectPath tablePath) { + CatalogTable table; + try { + table = getTable(tablePath); + } catch (TableNotExistException e) { + return false; + } + return (table != null) && table.isPartitioned(); + } + + /** + * Get CatalogPartitionSpec of all partitions that is under the given CatalogPartitionSpec in + * the table. + * + * @param tablePath path of the table + * @param partitionSpec the partition spec to list + * @return a list of CatalogPartitionSpec that is under the given CatalogPartitionSpec in the + * table + * @throws TableNotExistException thrown if the table does not exist in the catalog + * @throws TableNotPartitionedException thrown if the table is not partitioned + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, CatalogException { + + checkNotNull(tablePath); + checkNotNull(partitionSpec); + + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + if (!isPartitionedTable(tablePath)) { + throw new TableNotPartitionedException(getName(), tablePath); + } + return glueOperator.listGluePartitions(tablePath, partitionSpec); + } + + /** + * Get CatalogPartitionSpec of partitions by expression filters in the table. + * + *

NOTE: For FieldReferenceExpression, the field index is based on schema of this table + * instead of partition columns only. + * + *

The passed in predicates have been translated in conjunctive form. + * + *

If catalog does not support this interface at present, throw an {@link + * UnsupportedOperationException} directly. If the catalog does not have a valid filter, throw + * the {@link UnsupportedOperationException} directly. Planner will fallback to get all + * partitions and filter by itself. + * + * @param tablePath path of the table + * @param filters filters to push down filter to catalog + * @return a list of CatalogPartitionSpec that is under the given CatalogPartitionSpec in the + * table + * @throws TableNotExistException thrown if the table does not exist in the catalog + * @throws TableNotPartitionedException thrown if the table is not partitioned + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listPartitionsByFilter( + ObjectPath tablePath, List filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + if (!isPartitionedTable(tablePath)) { + throw new TableNotPartitionedException(getName(), tablePath); + } + + return glueOperator.listGluePartitionsByFilter(tablePath, filters); + } + + /** + * Get a partition of the given table. The given partition spec keys and values need to be + * matched exactly for a result. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of partition to get + * @return the requested partition + * @throws PartitionNotExistException thrown if the partition doesn't exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); + + Partition gluePartition = glueOperator.getGluePartition(tablePath, partitionSpec); + + Map properties = gluePartition.parameters(); + + properties.put( + GlueCatalogConfig.TABLE_LOCATION_URI, gluePartition.storageDescriptor().location()); + + String comment = properties.remove(GlueCatalogConfig.COMMENT); + return new CatalogPartitionImpl(properties, comment); + } + + /** + * Check whether a partition exists or not. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition to check + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + checkNotNull(tablePath); + + if (!databaseExists(tablePath.getDatabaseName()) || !tableExists(tablePath)) { + throw new CatalogException("Database/Table Doesn't exists."); + } + return glueOperator.gluePartitionExists(tablePath, partitionSpec); + + } + + /** + * Drop a partition. + * + * @param tablePath path of the table. + * @param partitionSpec partition spec of the partition to drop + * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws PartitionNotExistException thrown if the target partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + checkNotNull(tablePath); + checkNotNull(partitionSpec); + CatalogPartition partition = getPartition(tablePath, partitionSpec); + + if (partition != null) { + glueOperator.dropGluePartition(tablePath, partitionSpec); + } else if (!ignoreIfNotExists) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + } + + /** + * Alter a partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition + * @param newPartition new partition to replace the old one + * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws PartitionNotExistException thrown if the target partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); + checkNotNull(newPartition, "New partition cannot be null"); + CatalogPartition existingPartition = getPartition(tablePath, partitionSpec); + if (existingPartition != null) { + glueOperator.alterGluePartition(tablePath, partitionSpec, newPartition); + + } else if (!ignoreIfNotExists) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + } + + /** + * Get the statistics of a table. + * + * @param tablePath path of the table + * @return statistics of the given table + * @throws TableNotExistException if the table does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return null; + + } + + /** + * Get the column statistics of a table. + * + * @param tablePath path of the table + * @return column statistics of the given table + * @throws TableNotExistException if the table does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return null; + } + + /** + * Get the statistics of a partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition + * @return statistics of the given partition + * @throws PartitionNotExistException if the partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return null; + } + + /** + * Get the column statistics of a partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition + * @return column statistics of the given partition + * @throws PartitionNotExistException if the partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return null; + } + + /** + * Update the statistics of a table. + * + * @param tablePath path of the table + * @param tableStatistics new statistics to update + * @param ignoreIfNotExists flag to specify behavior if the table does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws TableNotExistException if the table does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterTableStatistics( + ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException("Operation with Statistics not supported."); + + } + + /** + * Update the column statistics of a table. + * + * @param tablePath path of the table + * @param columnStatistics new column statistics to update + * @param ignoreIfNotExists flag to specify behavior if the table does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws TableNotExistException if the table does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException("Operation with Statistics not supported."); + } + + /** + * Update the statistics of a table partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition + * @param partitionStatistics new statistics to update + * @param ignoreIfNotExists flag to specify behavior if the partition does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws PartitionNotExistException if the partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("Operation with Statistics not supported."); + } + + /** + * Update the column statistics of a table partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition @@param columnStatistics new column + * statistics to update + * @param columnStatistics column related statistics + * @param ignoreIfNotExists flag to specify behavior if the partition does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws PartitionNotExistException if the partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("Operation with Statistics not supported."); + } +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConfig.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConfig.java new file mode 100644 index 000000000..1f8879964 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConfig.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import java.util.regex.Pattern; + +/** Configs for catalog meta-objects in {@link GlueCatalog}. */ +public class GlueCatalogConfig { + // Table related config. + public static final String COMMENT = "comment"; + public static final String DEFAULT_SEPARATOR = ":"; + public static final String LOCATION_SEPARATOR = "/"; + public static final String TABLE_OWNER = "owner"; + + // Location related config. + public static final String TABLE_LOCATION_URI = "glue.table.location-uri"; + public static final String TABLE_INPUT_FORMAT = "glue.table.input.format"; + public static final String TABLE_OUTPUT_FORMAT = "glue.table.output.format"; + public static final String LOCATION_URI = "location-uri"; + + + public static final String FLINK_SCALA_FUNCTION_PREFIX = "flink:scala:"; + public static final String FLINK_PYTHON_FUNCTION_PREFIX = "flink:python:"; + public static final String FLINK_JAVA_FUNCTION_PREFIX = "flink:java:"; + + public static final String FLINK_CATALOG = "FLINK_CATALOG"; + public static final String LAST_ACCESS_TIME = "last_access_time"; + public static final Pattern GLUE_DB_PATTERN = Pattern.compile("^[a-z0-9_]{1,252}$"); + + public static final String AND = "and"; + public static final String NEXT_LINE = "\n"; + + public static final CharSequence SPACE = " "; + public static final String GLUE_EXCEPTION_MSG_IDENTIFIER = "GLUE EXCEPTION"; + + public static final String TABLE_NOT_EXISTS_IDENTIFIER = "TABLE DOESN'T EXISTS"; + public static final String DATABASE_LOCATION_URI = "glue.database.location-uri"; + public static final String DEFAULT_PARTITION_NAME = "__GLUE_DEFAULT_PARTITION__"; +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java new file mode 100644 index 000000000..475543104 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.glue.GlueCatalogFactoryOptions.DEFAULT_DATABASE; +import static org.apache.flink.table.catalog.glue.GlueCatalogFactoryOptions.INPUT_FORMAT; +import static org.apache.flink.table.catalog.glue.GlueCatalogFactoryOptions.LOCATION_URI; +import static org.apache.flink.table.catalog.glue.GlueCatalogFactoryOptions.OUTPUT_FORMAT; +import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; + +/** Catalog factory for {@link GlueCatalog}. */ +public class GlueCatalogFactory implements CatalogFactory { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalogFactory.class); + + @Override + public String factoryIdentifier() { + return GlueCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(DEFAULT_DATABASE); + options.add(PROPERTY_VERSION); + options.add(LOCATION_URI); + options.add(INPUT_FORMAT); + options.add(OUTPUT_FORMAT); + return options; + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); + LOG.info("Config Info"); + String msg = context.getOptions().entrySet().stream() + .map(entry -> entry.getKey() + "-> " + entry.getValue()).collect(Collectors.joining("\n")); + LOG.info("Config Map: " + msg); + return new GlueCatalog( + context.getName(), helper.getOptions().get(DEFAULT_DATABASE), context.getOptions()); + } +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactoryOptions.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactoryOptions.java new file mode 100644 index 000000000..70e0c8511 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactoryOptions.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.util.Map; + +/** {@link ConfigOption}s for {@link GlueCatalog}. */ +@Internal +public class GlueCatalogFactoryOptions { + + public static final String IDENTIFIER = "glue"; + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(GlueCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue(GlueCatalog.DEFAULT_DB); + + public static final ConfigOption> CATALOG_PROPERTIES = + ConfigOptions.key(GlueCatalogOptions.CATALOG_PROPERTIES_KEY.key()) + .mapType() + .noDefaultValue(); + + public static final ConfigOption LOCATION_URI = ConfigOptions.key(GlueCatalogOptions.LOCATION_KEY.key()) + .stringType().noDefaultValue(); + + public static final ConfigOption INPUT_FORMAT = ConfigOptions.key(GlueCatalogOptions.INPUT_FORMAT.key()) + .stringType().noDefaultValue(); + + public static final ConfigOption OUTPUT_FORMAT = ConfigOptions.key(GlueCatalogOptions.INPUT_FORMAT.key()) + .stringType().noDefaultValue(); + private GlueCatalogFactoryOptions() {} +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java new file mode 100644 index 000000000..9d2167f37 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.CommonCatalogOptions; + +import java.util.HashMap; +import java.util.Map; + +/** A collection of {@link ConfigOption} which is used in GlueCatalog. */ +public class GlueCatalogOptions extends CommonCatalogOptions { + /** {@link ConfigOption} This is used for getting aws-related properties. */ + public static final ConfigOption> CATALOG_PROPERTIES_KEY = + ConfigOptions.key("aws-properties").mapType().defaultValue(new HashMap<>()); + + public static final ConfigOption LOCATION_KEY = + ConfigOptions.key(GlueCatalogConfig.LOCATION_URI).stringType().defaultValue(""); + + public static final ConfigOption INPUT_FORMAT = + ConfigOptions.key(GlueCatalogConfig.TABLE_INPUT_FORMAT).stringType().defaultValue(""); + + public static final ConfigOption OUTPUT_FORMAT = + ConfigOptions.key(GlueCatalogConfig.TABLE_OUTPUT_FORMAT).stringType().defaultValue(""); +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java new file mode 100644 index 000000000..5f4866e09 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import software.amazon.awssdk.services.glue.GlueClient; + +import java.util.Map; + +/** Default factories. */ +public class AwsClientFactories { + + private AwsClientFactories() {} + + public static AwsClientFactory factory(AwsProperties properties) { + return new DefaultAwsClientFactory(properties); + } + + static class DefaultAwsClientFactory implements AwsClientFactory { + + /** instance that holds provides. */ + private AwsProperties awsProperties; + + DefaultAwsClientFactory(AwsProperties properties) { + awsProperties = properties; + } + + @Override + public GlueClient glue() { + return GlueClient.builder() + .applyMutation(awsProperties::applyHttpClientConfigurations) + .applyMutation(awsProperties::applyGlueEndpointConfigurations) + .build(); + } + + @Override + public void initialize(Map properties) { + this.awsProperties = new AwsProperties(properties); + } + } +} + diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java new file mode 100644 index 000000000..d74caf076 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import software.amazon.awssdk.services.glue.GlueClient; + +import java.io.Serializable; +import java.util.Map; + +/** + * Interface to customize AWS clients used by Flink. A custom factory must have a no-arg. + * constructor, and use {@link #initialize(Map)} to initialize the factory. + */ +public interface AwsClientFactory extends Serializable { + + /** + * create a AWS Glue client. + * + * @return glue client + */ + GlueClient glue(); + + /** + * Initialize AWS client factory from catalog properties. + * + * @param properties catalog properties + */ + void initialize(Map properties); +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java new file mode 100644 index 000000000..939f00f0d --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; +import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.glue.GlueClientBuilder; + +import java.net.URI; +import java.time.Duration; +import java.util.Map; + +/** Aws properties for glue and other clients. */ +public class AwsProperties { + + /** + * The type of {@link software.amazon.awssdk.http.SdkHttpClient} implementation used by {@link + * AwsClientFactory} If set, all AWS clients will use this specified HTTP client. If not set, + * {@link #HTTP_CLIENT_TYPE_DEFAULT} will be used. For specific types supported, see + * HTTP_CLIENT_TYPE_* defined below. + */ + public static final String HTTP_CLIENT_TYPE = "http-client.type"; + + private Long httpClientUrlConnectionConnectionTimeoutMs; + + private Long httpClientUrlConnectionSocketTimeoutMs; + + private Long httpClientApacheConnectionAcquisitionTimeoutMs; + + private Long httpClientApacheConnectionMaxIdleTimeMs; + + private Long httpClientApacheConnectionTimeToLiveMs; + + private Long httpClientApacheConnectionTimeoutMs; + + private Boolean httpClientApacheExpectContinueEnabled; + + private Integer httpClientApacheMaxConnections; + + private Long httpClientApacheSocketTimeoutMs; + + private Boolean httpClientApacheTcpKeepAliveEnabled; + + private Boolean httpClientApacheUseIdleConnectionReaperEnabled; + + private String glueEndpoint; + + private String glueCatalogId; + + private Boolean glueCatalogSkipArchive; + + private Boolean glueCatalogSkipNameValidation; + + /** http client. */ + private String httpClientType; + + /** + * If this is set under {@link #HTTP_CLIENT_TYPE}, {@link + * software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient} will be used as the HTTP. + * Client in {@link AwsClientFactory} + */ + public static final String HTTP_CLIENT_TYPE_URLCONNECTION = "urlconnection"; + + /** + * If this is set under {@link #HTTP_CLIENT_TYPE}, {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient} will be used as the HTTP Client in + * {@linkAwsClientFactory}. + */ + public static final String HTTP_CLIENT_TYPE_APACHE = "apache"; + + /** + * Used to configure the connection timeout in milliseconds for {@link + * UrlConnectionHttpClient.Builder}. This flag only works when {@link #HTTP_CLIENT_TYPE} is set + * to {@link #HTTP_CLIENT_TYPE_URLCONNECTION} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/urlconnection/UrlConnectionHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_URLCONNECTION_CONNECTION_TIMEOUT_MS = + "http-client.urlconnection.connection-timeout-ms"; + + public static final String HTTP_CLIENT_TYPE_DEFAULT = HTTP_CLIENT_TYPE_URLCONNECTION; + + /** + * The ID of the Glue Data Catalog where the tables reside. If none is provided, Glue + * automatically uses the caller's AWS account ID by default. + * + *

For more details, see + * https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html + */ + public static final String GLUE_CATALOG_ID = "glue.id"; + + /** + * The account ID used in a Glue resource ARN, e.g. + * arn:aws:glue:us-east-1:1000000000000:table/db1/table1 + */ + public static final String GLUE_ACCOUNT_ID = "glue.account-id"; + + /** + * If Glue should skip archiving an old table version when creating a new version in a commit. + * By default Glue archives all old table versions after an UpdateTable call, but Glue has a + * default max number of archived table versions (can be increased). So for streaming use case + * with lots of commits, it is recommended to set this value to true. + */ + public static final String GLUE_CATALOG_SKIP_ARCHIVE = "glue.skip-archive"; + + public static final String GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT = "false"; + + /** + * Used to configure the socket timeout in milliseconds for {@link + * UrlConnectionHttpClient.Builder}. This flag only works when {@link #HTTP_CLIENT_TYPE} is set + * to {@link #HTTP_CLIENT_TYPE_URLCONNECTION} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/urlconnection/UrlConnectionHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_URLCONNECTION_SOCKET_TIMEOUT_MS = + "http-client.urlconnection.socket-timeout-ms"; + + /** + * Used to configure the connection acquisition timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS = + "http-client.apache.connection-acquisition-timeout-ms"; + + /** + * Used to configure the connection max idle time in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS = + "http-client.apache.connection-max-idle-time-ms"; + + /** + * Used to configure the connection time to live in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS = + "http-client.apache.connection-time-to-live-ms"; + + /** + * Used to configure whether to enable the expect continue setting for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE} + * + *

In default, this is disabled. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_EXPECT_CONTINUE_ENABLED = + "http-client.apache.expect-continue-enabled"; + + /** + * Used to configure the max connections number for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_MAX_CONNECTIONS = + "http-client.apache.max-connections"; + + /** + * Used to configure whether to enable the tcp keep alive setting for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE}. + * + *

In default, this is disabled. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_TCP_KEEP_ALIVE_ENABLED = + "http-client.apache.tcp-keep-alive-enabled"; + + /** + * Used to configure the connection timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS = + "http-client.apache.connection-timeout-ms"; + + /** + * Used to configure whether to use idle connection reaper for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE}. + * + *

In default, this is enabled. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED = + "http-client.apache.use-idle-connection-reaper-enabled"; + + /** + * Used to configure the socket timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_APACHE} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS = + "http-client.apache.socket-timeout-ms"; + + /** + * Configure an alternative endpoint of the Glue service for GlueCatalog to access. + * + *

This could be used to use GlueCatalog with any glue-compatible metastore service that has + * a different endpoint + */ + public static final String GLUE_CATALOG_ENDPOINT = "glue.endpoint"; + + /** + * If Glue should skip name validations It is recommended to stick to Glue best practice in + * https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html to make sure operations + * are Hive compatible. This is only added for users that have existing conventions using + * non-standard characters. When database name and table name validation are skipped, there is + * no guarantee that downstream systems would all support the names. + */ + public static final String GLUE_CATALOG_SKIP_NAME_VALIDATION = "glue.skip-name-validation"; + + public static final String GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT = "false"; + + public AwsProperties(Map properties) { + this.httpClientType = properties.getOrDefault(HTTP_CLIENT_TYPE, HTTP_CLIENT_TYPE_DEFAULT); + this.httpClientUrlConnectionConnectionTimeoutMs = + Long.parseLong( + properties.getOrDefault( + HTTP_CLIENT_URLCONNECTION_CONNECTION_TIMEOUT_MS, "0")); + this.httpClientUrlConnectionSocketTimeoutMs = + Long.parseLong( + properties.getOrDefault(HTTP_CLIENT_URLCONNECTION_SOCKET_TIMEOUT_MS, "0")); + + this.httpClientApacheConnectionAcquisitionTimeoutMs = + Long.parseLong( + properties.getOrDefault( + HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS, "0")); + this.httpClientApacheConnectionMaxIdleTimeMs = + Long.parseLong( + properties.getOrDefault( + HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS, "0")); + this.httpClientApacheConnectionTimeToLiveMs = + Long.parseLong( + properties.getOrDefault( + HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS, "0")); + this.httpClientApacheConnectionTimeoutMs = + Long.parseLong( + properties.getOrDefault(HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS, "0")); + this.httpClientApacheExpectContinueEnabled = + Boolean.parseBoolean( + properties.getOrDefault( + HTTP_CLIENT_APACHE_EXPECT_CONTINUE_ENABLED, "false")); + this.httpClientApacheMaxConnections = + Integer.parseInt(properties.getOrDefault(HTTP_CLIENT_APACHE_MAX_CONNECTIONS, "1")); + this.httpClientApacheSocketTimeoutMs = + Long.parseLong(properties.getOrDefault(HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS, "0")); + this.httpClientApacheTcpKeepAliveEnabled = + Boolean.parseBoolean( + properties.getOrDefault( + HTTP_CLIENT_APACHE_TCP_KEEP_ALIVE_ENABLED, "false")); + this.httpClientApacheUseIdleConnectionReaperEnabled = + Boolean.parseBoolean( + properties.getOrDefault( + HTTP_CLIENT_APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED, "false")); + + this.glueEndpoint = properties.get(GLUE_CATALOG_ENDPOINT); + this.glueCatalogId = properties.get(GLUE_CATALOG_ID); + this.glueCatalogSkipArchive = + Boolean.parseBoolean( + properties.getOrDefault( + GLUE_CATALOG_SKIP_ARCHIVE, GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT)); + this.glueCatalogSkipNameValidation = + Boolean.parseBoolean( + properties.getOrDefault( + GLUE_CATALOG_SKIP_NAME_VALIDATION, + GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT)); + } + + // public AwsProperties() { + // this.httpClientType = + // } + + /** + * Configure the httpClient for a client according to the HttpClientType. The two supported + * HttpClientTypes are urlconnection and apache + * + *

Sample usage: + * + *

+     *     S3Client.builder().applyMutation(awsProperties::applyHttpClientConfigurations)
+     * 
+ */ + public void applyHttpClientConfigurations(T builder) { + if (Strings.isNullOrEmpty(httpClientType)) { + httpClientType = HTTP_CLIENT_TYPE_DEFAULT; + } + switch (httpClientType) { + case HTTP_CLIENT_TYPE_URLCONNECTION: + builder.httpClientBuilder( + UrlConnectionHttpClient.builder() + .applyMutation(this::configureUrlConnectionHttpClientBuilder)); + break; + case HTTP_CLIENT_TYPE_APACHE: + builder.httpClientBuilder( + ApacheHttpClient.builder() + .applyMutation(this::configureApacheHttpClientBuilder)); + break; + default: + throw new IllegalArgumentException( + "Unrecognized HTTP client type " + httpClientType); + } + } + + @VisibleForTesting + void configureUrlConnectionHttpClientBuilder( + T builder) { + if (httpClientUrlConnectionConnectionTimeoutMs != null) { + builder.connectionTimeout( + Duration.ofMillis(httpClientUrlConnectionConnectionTimeoutMs)); + } + + if (httpClientUrlConnectionSocketTimeoutMs != null) { + builder.socketTimeout(Duration.ofMillis(httpClientUrlConnectionSocketTimeoutMs)); + } + } + + @VisibleForTesting + void configureApacheHttpClientBuilder(T builder) { + if (httpClientApacheConnectionTimeoutMs != null) { + builder.connectionTimeout(Duration.ofMillis(httpClientApacheConnectionTimeoutMs)); + } + + if (httpClientApacheSocketTimeoutMs != null) { + builder.socketTimeout(Duration.ofMillis(httpClientApacheSocketTimeoutMs)); + } + + if (httpClientApacheConnectionAcquisitionTimeoutMs != null) { + builder.connectionAcquisitionTimeout( + Duration.ofMillis(httpClientApacheConnectionAcquisitionTimeoutMs)); + } + + if (httpClientApacheConnectionMaxIdleTimeMs != null) { + builder.connectionMaxIdleTime( + Duration.ofMillis(httpClientApacheConnectionMaxIdleTimeMs)); + } + + if (httpClientApacheConnectionTimeToLiveMs != null) { + builder.connectionTimeToLive(Duration.ofMillis(httpClientApacheConnectionTimeToLiveMs)); + } + + if (httpClientApacheExpectContinueEnabled != null) { + builder.expectContinueEnabled(httpClientApacheExpectContinueEnabled); + } + + if (httpClientApacheMaxConnections != null) { + builder.maxConnections(httpClientApacheMaxConnections); + } + + if (httpClientApacheTcpKeepAliveEnabled != null) { + builder.tcpKeepAlive(httpClientApacheTcpKeepAliveEnabled); + } + + if (httpClientApacheUseIdleConnectionReaperEnabled != null) { + builder.useIdleConnectionReaper(httpClientApacheUseIdleConnectionReaperEnabled); + } + } + + /** + * Override the endpoint for a glue client. + * + *

Sample usage: + * + *

+     *     GlueClient.builder().applyMutation(awsProperties::applyS3EndpointConfigurations)
+     * 
+ */ + public void applyGlueEndpointConfigurations(T builder) { + configureEndpoint(builder, glueEndpoint); + } + + private void configureEndpoint(T builder, String endpoint) { + if (endpoint != null) { + builder.endpointOverride(URI.create(endpoint)); + } + } + + /* + * Getter for glue catalogId. + */ + public String getGlueCatalogId() { + return glueCatalogId; + } +} + diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java new file mode 100644 index 000000000..7f86e3fff --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java @@ -0,0 +1,1405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogFunctionImpl; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.glue.GlueCatalogConfig; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.mapper.DatatypeMapper; +import org.apache.flink.table.catalog.mapper.GlueDatatypeMapper; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.ManagedTableFactory; +import org.apache.flink.table.resource.ResourceType; +import org.apache.flink.table.types.DataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest; +import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.CreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.CreatePartitionResponse; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.CreateTableResponse; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.DatabaseInput; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse; +import software.amazon.awssdk.services.glue.model.DeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.DeletePartitionResponse; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.DeleteTableResponse; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; +import software.amazon.awssdk.services.glue.model.GetDatabasesRequest; +import software.amazon.awssdk.services.glue.model.GetDatabasesResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.PartitionInput; +import software.amazon.awssdk.services.glue.model.PrincipalType; +import software.amazon.awssdk.services.glue.model.ResourceUri; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.UpdatePartitionRequest; +import software.amazon.awssdk.services.glue.model.UpdatePartitionResponse; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; +import software.amazon.awssdk.services.glue.model.UpdateTableResponse; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; +import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** + Utilities for Glue catalog operations. + Important Note : https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/ +*/ +public class GlueOperator { + + /** + * Defines the location URI for database. + * This locationUri is high level path for catalog + */ + public final String locationUri; + + /** + * Instance of AwsProperties which holds the configs related to configure glue and aws setup. + */ + private final AwsProperties awsProperties; + + /** + * http client for glue client. + * Current implementation for client is sync type. + */ + private final GlueClient glueClient; + + private final DatatypeMapper datatypeMapper; + + private final String catalogName; + + private static final Logger LOG = LoggerFactory.getLogger(GlueOperator.class); + + public GlueOperator(String locationUri, String catalogName, AwsProperties awsProperties, GlueClient glueClient) { + this.locationUri = locationUri; + this.awsProperties = awsProperties; + this.glueClient = glueClient; + this.datatypeMapper = new GlueDatatypeMapper<>(); + this.catalogName = catalogName; + } + + public void closeClient() { + glueClient.close(); + } + + // -------------- Database related operations. + + /** + * List all databases present. + * @return List of fully qualified database names + */ + public List listGlueDatabases() throws CatalogException { + try { + GetDatabasesRequest.Builder databasesRequestBuilder = + GetDatabasesRequest.builder() + .catalogId(getGlueCatalogId()); + + GetDatabasesResponse response = glueClient.getDatabases(databasesRequestBuilder.build()); + List glueDatabases = + response.databaseList().stream() + .map(Database::name) + .collect(Collectors.toList()); + String dbResultNextToken = response.nextToken(); + if (Optional.ofNullable(dbResultNextToken).isPresent()) { + do { + databasesRequestBuilder.nextToken(dbResultNextToken); + response = glueClient.getDatabases(databasesRequestBuilder.build()); + glueDatabases.addAll( + response.databaseList().stream() + .map(Database::name) + .collect(Collectors.toList())); + dbResultNextToken = response.nextToken(); + } while (Optional.ofNullable(dbResultNextToken).isPresent()); + } + return glueDatabases; + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause()); + } + } + + /** + * Create database in glue data catalog. + * + * @param databaseName fully qualified name of database. + * @param database Instance of {@link CatalogDatabase}. + * @throws CatalogException when unknown error from glue servers. + * @throws DatabaseAlreadyExistException when database exists already in glue data catalog. + */ + public void createGlueDatabase(String databaseName, CatalogDatabase database) + throws CatalogException, DatabaseAlreadyExistException { + + validateName(databaseName); + Map properties = database.getProperties(); + DatabaseInput.Builder databaseInputBuilder = DatabaseInput.builder().name(databaseName) + .description(database.getComment()) + // update location and remove location from properties. + .locationUri(extractLocation(properties, databaseName)) + .parameters(properties); + CreateDatabaseRequest.Builder requestBuilder = CreateDatabaseRequest.builder() + .databaseInput(databaseInputBuilder.build()) + .catalogId(getGlueCatalogId()).tags(getFlinkCatalogTag()); + LOG.info(String.format("Database Properties Listing :- %s", + properties.entrySet().stream().map(e -> e.getKey() + e.getValue()).collect(Collectors.joining(",")))); + try { + CreateDatabaseResponse response = glueClient.createDatabase(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + LOG.info(String.format("%s Database created.", databaseName)); + } catch (EntityNotFoundException e) { + throw new DatabaseAlreadyExistException(catalogName, databaseName, e); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @return location for database. + */ + private String extractLocation(Map databaseProperties, String databaseName) { + if (databaseProperties.containsKey(GlueCatalogConfig.DATABASE_LOCATION_URI)) { + return databaseProperties.remove(GlueCatalogConfig.DATABASE_LOCATION_URI); + } + + return locationUri + GlueCatalogConfig.LOCATION_SEPARATOR + databaseName; + } + + /** + * Create tag for flink in glue catalog for identification. + * @return Key/Value pair for tags + */ + private Map getFlinkCatalogTag() { + Map tags = new HashMap<>(); + tags.put("source", "flink_catalog"); + return tags; + } + + /** + * Delete a database from Glue data catalog only when database is empty. + * + * @param databaseName fully qualified name of database + * @throws CatalogException Any Exception thrown due to glue error + * @throws DatabaseNotExistException when database doesn't exists in glue catalog. + */ + public void dropGlueDatabase(String databaseName) throws CatalogException, DatabaseNotExistException { + + validateName(databaseName); + + DeleteDatabaseRequest deleteDatabaseRequest = DeleteDatabaseRequest.builder() + .name(databaseName).catalogId(getGlueCatalogId()).build(); + try { + DeleteDatabaseResponse response = glueClient.deleteDatabase(deleteDatabaseRequest); + LOG.debug(getDebugLog(response)); + LOG.info(String.format("Database Dropped %s", databaseName)); + } catch (EntityNotFoundException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (GlueException e) { + throw new CatalogException(catalogName, e); + } + } + + /** + * Drops list of table in database from glue data catalog. + * + * @param databaseName fully qualified name of database + * @param tables List of tables to remove from database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public void deleteTablesFromDatabase(String databaseName, Collection tables) throws CatalogException { + validateName(databaseName); + BatchDeleteTableRequest batchTableRequest = + BatchDeleteTableRequest.builder() + .databaseName(databaseName) + .catalogId(getGlueCatalogId()) + .tablesToDelete(tables) + .build(); + + try { + BatchDeleteTableResponse response = glueClient.batchDeleteTable(batchTableRequest); + if (response.hasErrors()) { + String errorMsg = String.format( + "Glue Table errors:- %s", + response.errors().stream() + .map( + e -> + "Table: " + + e.tableName() + + "\nErrorDetail: " + + e.errorDetail() + .errorMessage()) + .collect(Collectors.joining("\n"))); + LOG.error(errorMsg); + } + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Drops list of user defined function in database from glue data catalog. + * @param databaseName fully qualified name of database + * @param functions List of tables to remove from database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public void deleteFunctionsFromDatabase(String databaseName, Collection functions) + throws CatalogException { + validateName(databaseName); + try { + DeleteUserDefinedFunctionRequest.Builder requestBuilder = DeleteUserDefinedFunctionRequest.builder() + .databaseName(databaseName).catalogId(getGlueCatalogId()); + for (String functionName: functions) { + requestBuilder.functionName(functionName); + DeleteUserDefinedFunctionResponse response = glueClient.deleteUserDefinedFunction(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + validateGlueResponse(response); + LOG.info(String.format("Dropped Function %s", functionName)); + } + + } catch (GlueException e) { + LOG.error(String.format("Error deleting functions in database: %s", databaseName)); + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + + /** + * Check if database is empty. + * i.e. it should not contain + * 1. table + * 2. functions + * @param databaseName name of database. + * @return boolean True/False based on the content of database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public boolean isDatabaseEmpty(String databaseName) throws CatalogException { + checkArgument(!isNullOrWhitespaceOnly(databaseName)); + validateName(databaseName); + GetTablesRequest tablesRequest = + GetTablesRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(databaseName).maxResults(1) + .build(); + try { + GetTablesResponse response = glueClient.getTables(tablesRequest); + if (response.sdkHttpResponse().isSuccessful() && !response.hasTableList()) { + GetUserDefinedFunctionsRequest functionsRequest = GetUserDefinedFunctionsRequest.builder() + .databaseName(databaseName).catalogId(getGlueCatalogId()).maxResults(1).build(); + GetUserDefinedFunctionsResponse functionsResponse = glueClient.getUserDefinedFunctions(functionsRequest); + return !functionsResponse.hasUserDefinedFunctions(); + } + return false; + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + /** + * Get a database from this glue data catalog. + * @param databaseName fully qualified name of database. + * @return Instance of {@link CatalogDatabase } . + * @throws DatabaseNotExistException when database doesn't exists in Glue data catalog. + * @throws CatalogException when any unknown error occurs in glue. + */ + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + + validateName(databaseName); + GetDatabaseRequest getDatabaseRequest = + GetDatabaseRequest.builder() + .name(databaseName) + .catalogId(getGlueCatalogId()) + .build(); + try { + GetDatabaseResponse response = glueClient.getDatabase(getDatabaseRequest); + LOG.debug( + GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER + ": existing database. Client call response :- " + + response.sdkHttpResponse().statusText()); + validateGlueResponse(response); + return getCatalogDatabase(response.database()); + } catch (EntityNotFoundException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Build CatalogDatabase instance using information from glue Database. + * @param glueDatabase {@link Database } + * @return {@link CatalogDatabase } instance. + */ + private CatalogDatabase getCatalogDatabase(Database glueDatabase) { + Map properties = new HashMap<>(glueDatabase.parameters()); + + // retrieve location uri into properties + properties.put(GlueCatalogConfig.DATABASE_LOCATION_URI, glueDatabase.locationUri()); + String comment = glueDatabase.description(); + return new CatalogDatabaseImpl(properties, comment); + } + + public void updateGlueDatabase(String databaseName, CatalogDatabase newDatabase) + throws CatalogException { + + validateName(databaseName); + Map properties = newDatabase.getProperties(); + + String databaseLocationUri = properties.containsKey(GlueCatalogConfig.DATABASE_LOCATION_URI) ? + properties.remove(GlueCatalogConfig.DATABASE_LOCATION_URI) : null; + + DatabaseInput.Builder databaseInputBuilder = DatabaseInput.builder() + .parameters(properties) + .description(newDatabase.getComment()) + .name(databaseName); + + if (databaseLocationUri != null) { + databaseInputBuilder.locationUri(databaseLocationUri); + } + + UpdateDatabaseRequest updateRequest = + UpdateDatabaseRequest.builder() + .databaseInput(databaseInputBuilder.build()) + .name(databaseName) + .catalogId(getGlueCatalogId()) + .build(); + UpdateDatabaseResponse response = glueClient.updateDatabase(updateRequest); + LOG.debug(getDebugLog(response)); + LOG.info(String.format("Database Updated. %s", databaseName)); + validateGlueResponse(response); + } + + // -------------- Table related operations. + + /** + * Create table in glue data catalog. + * + * @param tablePath Fully qualified name of table. {@link ObjectPath} + * @param table instance of {@link CatalogBaseTable} containing table related information. + * @param managedTable identifier if managed table. + * @throws CatalogException Any Exception thrown due to glue error + */ + public void createGlueTable(final ObjectPath tablePath, final CatalogBaseTable table, + final boolean managedTable) throws CatalogException { + + checkNotNull(table); + checkNotNull(tablePath); + Map properties = new HashMap<>(table.getOptions()); + String tableOwner = extractTableOwner(properties); + if (managedTable) { + properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER); + } + Set glueColumns = getGlueColumnsFromCatalogTable(table); + + // create StorageDescriptor for table + StorageDescriptor.Builder storageDescriptorBuilder = StorageDescriptor.builder() + .inputFormat(extractInputFormat(properties)) + .outputFormat(extractOutputFormat(properties)) + .location(extractLocation(properties, tablePath)) + .parameters(properties); + + // create TableInput Builder with available information. + TableInput.Builder tableInputBuilder = TableInput.builder() + .name(tablePath.getObjectName()) + .description(table.getComment()) + .tableType(table.getTableKind().name()) + .lastAccessTime(Instant.now()) + .owner(tableOwner); + + CreateTableRequest.Builder requestBuilder = CreateTableRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()); + + if (table instanceof CatalogTable) { + CatalogTable catalogTable = (CatalogTable) table; + if (catalogTable.isPartitioned()) { + LOG.info("Catalog table is partitioned"); + Collection partitionKeys = getPartitionKeys(catalogTable, glueColumns); + LOG.info("Partition columns are -> " + partitionKeys.stream().map(Column::name).collect(Collectors.joining(","))); + tableInputBuilder.partitionKeys(partitionKeys); + } + } + + try { + // apply storage descriptor and tableInput for request + storageDescriptorBuilder.columns(glueColumns); + tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build()); + requestBuilder.tableInput(tableInputBuilder.build()); + CreateTableResponse response = glueClient.createTable(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + validateGlueResponse(response); + LOG.info(String.format("Table created. %s", tablePath.getFullName())); + + // create partitions if table is partitioned + CatalogTable catalogTable = (CatalogTable) table; + if (catalogTable.isPartitioned()) { + Table glueTable = getGlueTable(tablePath); + CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(catalogTable.getOptions()); + for (Column column: glueTable.partitionKeys()) { + CatalogPartition partition = new CatalogPartitionImpl(column.parameters(), ""); + createGluePartition(glueTable, partitionSpec, partition); + } + } + + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } catch (TableNotExistException | PartitionSpecInvalidException e) { + e.printStackTrace(); + } + } + + /** + * Extract table owner name and remove from properties. + * + * @param properties Map of properties. + * @return fully qualified owner name. + */ + private String extractTableOwner (Map properties) { + return properties.containsKey(GlueCatalogConfig.TABLE_OWNER) ? + properties.remove(GlueCatalogConfig.TABLE_OWNER) : null; + } + + /** + * Build set of {@link Column} associated with table. + * + * @param catalogBaseTable instance of {@link CatalogBaseTable}. + * @return Set of Column + */ + private Set getGlueColumnsFromCatalogTable(CatalogBaseTable catalogBaseTable) { + checkNotNull(catalogBaseTable); + TableSchema tableSchema = catalogBaseTable.getSchema(); + return Arrays.stream(tableSchema.getFieldNames()) + .map(fieldName -> getGlueColumn(catalogBaseTable, tableSchema, fieldName)) + .collect(Collectors.toSet()); + } + + /** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @return location for table. + */ + private String extractLocation(Map tableProperties, ObjectPath tablePath) { + return tableProperties.containsKey(GlueCatalogConfig.TABLE_LOCATION_URI) ? + tableProperties.remove(GlueCatalogConfig.LOCATION_URI) : locationUri + GlueCatalogConfig.LOCATION_SEPARATOR + + tablePath.getDatabaseName() + GlueCatalogConfig.LOCATION_SEPARATOR + tablePath.getObjectName(); + + } + + /** + * Extract OutputFormat from properties if present and remove outputFormat from properties. + * fallback to default format if not present + * @param tableProperties Key/Value properties + * @return output Format. + */ + private String extractOutputFormat(Map tableProperties) { + return tableProperties.containsKey(GlueCatalogConfig.TABLE_OUTPUT_FORMAT) ? tableProperties.remove(GlueCatalogConfig.TABLE_OUTPUT_FORMAT) + : GlueCatalogOptions.OUTPUT_FORMAT.defaultValue(); + } + + /** + * Extract InputFormat from properties if present and remove inputFormat from properties. + * fallback to default format if not present + * @param tableProperties Key/Value properties + * @return input Format. + */ + private String extractInputFormat(Map tableProperties) { + return tableProperties.containsKey(GlueCatalogConfig.TABLE_INPUT_FORMAT) ? + tableProperties.remove(GlueCatalogConfig.TABLE_INPUT_FORMAT) : GlueCatalogOptions.INPUT_FORMAT.defaultValue(); + + } + + /** + * Get list of filtered columns which are partition columns. + * @param catalogTable {@link CatalogTable} instance. + * @param columns List of all column in table. + * @return List of column marked as partition key. + */ + private Collection getPartitionKeys(CatalogTable catalogTable, Collection columns) { + Set partitionKeys = new HashSet<>(catalogTable.getPartitionKeys()); + return columns.stream().filter(column -> partitionKeys.contains(column.name())).collect(Collectors.toList()); + } + + /** + * + * @param tablePath + * @param newTable + * @param managedTable + * @throws CatalogException + */ + public void alterGlueTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean managedTable) + throws CatalogException { + + Map properties = new HashMap<>(newTable.getOptions()); + String tableOwner = extractTableOwner(properties); + + if (managedTable) { + properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER); + } + + Set glueColumns = getGlueColumnsFromCatalogTable(newTable); + + // create StorageDescriptor for table + StorageDescriptor.Builder storageDescriptorBuilder = StorageDescriptor.builder() + .inputFormat(extractInputFormat(properties)) + .outputFormat(extractOutputFormat(properties)) + .location(extractLocation(properties, tablePath)) + .parameters(properties) + .columns(glueColumns); + + // create TableInput Builder with available information. + TableInput.Builder tableInputBuilder = TableInput.builder() + .name(tablePath.getObjectName()) + .description(newTable.getComment()) + .tableType(newTable.getTableKind().name()) + .lastAccessTime(Instant.now()) + .owner(tableOwner); + + UpdateTableRequest.Builder requestBuilder = UpdateTableRequest.builder().tableInput(tableInputBuilder.build()) + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()); + + if (newTable instanceof CatalogTable) { + CatalogTable catalogTable = (CatalogTable) newTable; + if (catalogTable.isPartitioned()) { + tableInputBuilder.partitionKeys(getPartitionKeys(catalogTable, glueColumns)); + } + } + + // apply storage descriptor and tableInput for request + tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build()); + requestBuilder.tableInput(tableInputBuilder.build()); + + try { + UpdateTableResponse response = glueClient.updateTable(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + validateGlueResponse(response); + LOG.info(String.format("Table updated. %s", tablePath.getFullName())); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private String getDebugLog(GlueResponse response) { + return String.format("Glue response : status = %s \n " + + "Details = %s \nMetadataResponse = %s", + response.sdkHttpResponse().isSuccessful(), response.sdkHttpResponse().toString(), response.responseMetadata()); + } + + /** + * Get names of all tables or views under this database based on type identifier. + * An empty list is returned if none exists. + * + * @param databaseName fully qualified database name. + * @return a list of the names of all tables or views in this database based on type identifier. + * @throws CatalogException in case of any runtime exception + */ + public List getGlueTableList(String databaseName, String type) throws CatalogException { + GetTablesRequest.Builder tablesRequestBuilder = + GetTablesRequest.builder() + .databaseName(databaseName) + .catalogId(getGlueCatalogId()); + GetTablesResponse response = glueClient.getTables(tablesRequestBuilder.build()); + validateGlueResponse(response); + List finalTableList = response.tableList().stream() + .filter(table -> table.tableType().equalsIgnoreCase(type)) + .map(Table::name).collect(Collectors.toList()); + String tableResultNextToken = response.nextToken(); + + if (Optional.ofNullable(tableResultNextToken).isPresent()) { + do { + // update token in requestBuilder to fetch next batch + tablesRequestBuilder.nextToken(tableResultNextToken); + response = glueClient.getTables(tablesRequestBuilder.build()); + validateGlueResponse(response); + finalTableList.addAll(response.tableList().stream() + .filter(table -> table.tableType().equalsIgnoreCase(type)) + .map(Table::name).collect(Collectors.toList())); + tableResultNextToken = response.nextToken(); + } while (Optional.ofNullable(tableResultNextToken).isPresent()); + } + return finalTableList; + } + + /** + * Returns a {@link Table} identified by the given {@link ObjectPath}. + * + * @param tablePath Path of the table or view + * @return The requested table. Glue encapsulates whether table or view in its attribute called type. + * @throws TableNotExistException if the target does not exist + * @throws CatalogException in case of any runtime exception + */ + public Table getGlueTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + GetTableRequest tablesRequest = + GetTableRequest.builder() + .databaseName(tablePath.getDatabaseName()) + .name(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()) + .build(); + try { + GetTableResponse response = glueClient.getTable(tablesRequest); + LOG.info(String.format("Glue table Found %s", response.table() != null)); + validateGlueResponse(response); + return response.table(); + } catch (EntityNotFoundException e) { + throw new TableNotExistException(catalogName, tablePath, e); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + /** + * Check if a table or view exists in glue data catalog. + * + * @param tablePath Path of the table or view + * @return true if the given table exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + public boolean glueTableExists(ObjectPath tablePath) throws CatalogException { + try { + Table glueTable = getGlueTable(tablePath); + return glueTable != null; + } catch (TableNotExistException e) { + LOG.warn(String.format("%s\nDatabase: %s Table: %s", GlueCatalogConfig.TABLE_NOT_EXISTS_IDENTIFIER, + tablePath.getDatabaseName(), tablePath.getObjectName())); + return false; + } catch (CatalogException e) { + LOG.error(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause()); + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + /** + * Modify an existing function. Function name should be handled in a case-insensitive way. + * + * @param functionPath path of function. + * @param newFunction modified function. + * @throws CatalogException on runtime errors. + */ + public void alterGlueFunction(ObjectPath functionPath, CatalogFunction newFunction) throws CatalogException { + UserDefinedFunctionInput functionInput = createFunctionInput(functionPath, newFunction); + UpdateUserDefinedFunctionRequest updateUserDefinedFunctionRequest = UpdateUserDefinedFunctionRequest.builder() + .functionName(functionPath.getObjectName()) + .databaseName(functionPath.getDatabaseName()) + .catalogId(getGlueCatalogId()) + .functionInput(functionInput).build(); + UpdateUserDefinedFunctionResponse response = glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest); + validateGlueResponse(response); + LOG.info(String.format("Function altered. %s", functionPath.getFullName())); + } + + public void deleteGlueTable(ObjectPath tablePath) throws CatalogException { + DeleteTableRequest.Builder tableRequestBuilder = + DeleteTableRequest.builder() + .databaseName(tablePath.getDatabaseName()) + .name(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()); + + DeleteTableResponse response = glueClient.deleteTable(tableRequestBuilder.build()); + validateGlueResponse(response); + LOG.info(String.format("Dropped Table %s.", tablePath.getObjectName())); + } + + private void validateGlueResponse(GlueResponse response) { + if (!response.sdkHttpResponse().isSuccessful()) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER); + } + } + + public Schema getSchemaFromGlueTable(Table glueTable) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + for (Column col : glueTable.storageDescriptor().columns()) { + schemaBuilder.column(col.name(), col.type()); + } + return schemaBuilder.build(); + } + + // -------------- Function related operations. + + /** + * Create a function. Function name should be handled in a case-insensitive way. + * @param functionPath path of the function + * @param function Flink function to be created + * @throws CatalogException in case of any runtime exception + */ + public void createGlueFunction(ObjectPath functionPath, CatalogFunction function) throws CatalogException, FunctionAlreadyExistException { + UserDefinedFunctionInput functionInput = createFunctionInput(functionPath, function); + CreateUserDefinedFunctionRequest.Builder requestBuilder = CreateUserDefinedFunctionRequest.builder() + .databaseName(functionPath.getDatabaseName()) + .catalogId(getGlueCatalogId()) + .functionInput(functionInput); + try { + CreateUserDefinedFunctionResponse response = glueClient.createUserDefinedFunction(requestBuilder.build()); + validateGlueResponse(response); + LOG.info(String.format("Function created. %s", functionPath)); + } catch (AlreadyExistsException e){ + LOG.error(String.format("%s.%s already Exists. Function language of type: %s", + functionPath.getDatabaseName(), functionPath.getObjectName(), function.getFunctionLanguage())); + throw new FunctionAlreadyExistException(catalogName, functionPath); + } catch (GlueException e) { + LOG.error("Error creating glue function.", e.getCause()); + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Get the user defined function from glue Catalog. Function name should be handled in a case-insensitive way. + * + * @param functionPath path of the function + * @return the requested function + * @throws CatalogException in case of any runtime exception + */ + public CatalogFunction getGlueFunction(ObjectPath functionPath) { + GetUserDefinedFunctionRequest request = GetUserDefinedFunctionRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(functionPath.getDatabaseName()) + .functionName(functionPath.getObjectName()) + .build(); + GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request); + validateGlueResponse(response); + List resourceUris = new LinkedList<>(); + for (ResourceUri resourceUri : response.userDefinedFunction().resourceUris()) { + resourceUris.add(new org.apache.flink.table.resource.ResourceUri( + ResourceType.valueOf(resourceUri.resourceType().name()), resourceUri.uri())); + } + + return new CatalogFunctionImpl(getFunctionClassName(response.userDefinedFunction()), + getFunctionalLanguage(response.userDefinedFunction()), resourceUris); + } + + public List listGlueFunctions(String databaseName) { + GetUserDefinedFunctionsRequest.Builder requestBuilder = GetUserDefinedFunctionsRequest.builder() + .databaseName(databaseName) + .catalogId(getGlueCatalogId()).maxResults(100); + + List glueFunctions; + try { + GetUserDefinedFunctionsResponse response = glueClient.getUserDefinedFunctions(requestBuilder.build()); + String token = response.nextToken(); + glueFunctions = response + .userDefinedFunctions() + .stream() + .map(this::getFunctionClassName) + .collect(Collectors.toCollection(LinkedList::new)); + if (Optional.ofNullable(token).isPresent()) { + do { + requestBuilder.nextToken(token); + response = glueClient.getUserDefinedFunctions(requestBuilder.build()); + glueFunctions.addAll(response + .userDefinedFunctions() + .stream() + .map(UserDefinedFunction::functionName) + .collect(Collectors.toCollection(LinkedList::new))); + token = response.nextToken(); + } while (Optional.ofNullable(token).isPresent()); + } + + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + return glueFunctions; + } + + public boolean glueFunctionExists(ObjectPath functionPath) { + GetUserDefinedFunctionRequest request = GetUserDefinedFunctionRequest.builder() + .functionName(functionPath.getObjectName()).databaseName(functionPath.getDatabaseName()) + .catalogId(getGlueCatalogId()) + .build(); + + try { + GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request); + validateGlueResponse(response); + return response.userDefinedFunction().functionName().equalsIgnoreCase(functionPath.getObjectName()); + } catch (EntityNotFoundException e) { + LOG.warn(String.format("Entry not found for function %s.%s", functionPath.getObjectName(), functionPath.getDatabaseName())); + return false; + } catch (GlueException e) { + LOG.error(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + public void dropGlueFunction(ObjectPath functionPath) throws CatalogException { + DeleteUserDefinedFunctionRequest request = DeleteUserDefinedFunctionRequest.builder() + .catalogId(getGlueCatalogId()) + .functionName(functionPath.getObjectName()) + .databaseName(functionPath.getDatabaseName()) + .build(); + DeleteUserDefinedFunctionResponse response = glueClient.deleteUserDefinedFunction(request); + validateGlueResponse(response); + LOG.info(String.format("Dropped Function. %s", functionPath.getFullName())); + } + + // -------------- Partition related operations. + + public void ensurePartitionedTable(ObjectPath tablePath, Table glueTable) + throws TableNotPartitionedException { + if (!glueTable.hasPartitionKeys()) { + throw new TableNotPartitionedException(catalogName, tablePath); + } + } + + /** + * create partition in glue. + * @param glueTable glue table + * @param partitionSpec partition spec + * @param catalogPartition partition to add + */ + public void createGluePartition(Table glueTable, CatalogPartitionSpec partitionSpec, + CatalogPartition catalogPartition) throws CatalogException, PartitionSpecInvalidException { + + // ---- + List partCols = getColumnNames(glueTable.partitionKeys()); + LOG.info("Partition Column size " + partCols.size()); + LOG.info(String.format("Partition Columns are : %s", String.join(", ", partCols))); + List partitionValues = + getOrderedFullPartitionValues( + partitionSpec, + partCols, + new ObjectPath(glueTable.databaseName(), glueTable.name())); + + LOG.info("Partition value size " + partitionValues.size()); + LOG.info(String.format("Partition Values are : %s", String.join(", ", partitionValues))); + // validate partition values + for (int i = 0; i < partCols.size(); i++) { + if (isNullOrWhitespaceOnly(partitionValues.get(i))) { + throw new PartitionSpecInvalidException( + catalogName, + partCols, + new ObjectPath(glueTable.databaseName(), glueTable.name()), + partitionSpec); + } + } + // TODO: handle GenericCatalogPartition + StorageDescriptor.Builder sdBuilder = glueTable.storageDescriptor().toBuilder(); + sdBuilder.location( + catalogPartition.getProperties().remove(GlueCatalogConfig.TABLE_LOCATION_URI)); + + Map properties = new HashMap<>(catalogPartition.getProperties()); + String comment = catalogPartition.getComment(); + if (comment != null) { + properties.put(GlueCatalogConfig.COMMENT, comment); + } + + PartitionInput.Builder partitionInput = PartitionInput.builder() + .parameters(properties) + .lastAccessTime(Instant.now()) + .storageDescriptor(sdBuilder.build()) + .values(partitionValues); + CreatePartitionRequest createPartitionRequest = CreatePartitionRequest.builder() + .partitionInput(partitionInput.build()) + .catalogId(getGlueCatalogId()) + .databaseName(glueTable.databaseName()) + .tableName(glueTable.name()) + .build(); + + try { + CreatePartitionResponse response = glueClient.createPartition(createPartitionRequest); + validateGlueResponse(response); + LOG.info("Partition Created."); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + /** Get column names from List of column. */ + private static List getColumnNames(List columns) { + return columns.stream().map(Column::name).collect(Collectors.toList()); + } + + /** + * Get a list of ordered partition values by re-arranging them based on the given list of + * partition keys. If the partition value is null, it'll be converted into default partition + * name. + * + * @param partitionSpec a partition spec. + * @param partitionKeys a list of partition keys. + * @param tablePath path of the table to which the partition belongs. + * @return A list of partition values ordered according to partitionKeys. + * @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have + * different sizes, or any key in partitionKeys doesn't exist in partitionSpec. + */ + private List getOrderedFullPartitionValues( + CatalogPartitionSpec partitionSpec, List partitionKeys, ObjectPath tablePath) + throws PartitionSpecInvalidException { + Map spec = partitionSpec.getPartitionSpec(); + if (spec.size() != partitionKeys.size()) { + throw new PartitionSpecInvalidException( + catalogName, partitionKeys, tablePath, partitionSpec); + } + + List values = new ArrayList<>(spec.size()); + for (String key : partitionKeys) { + if (!spec.containsKey(key)) { + throw new PartitionSpecInvalidException( + catalogName, partitionKeys, tablePath, partitionSpec); + } else { + String value = spec.get(key); + if (value == null) { + value = GlueCatalogConfig.DEFAULT_PARTITION_NAME; + } + values.add(value); + } + } + + return values; + } + + /** + * Update glue table. + * @param tablePath contains database name and table name. + * @param partitionSpec Existing partition information. + * @param newPartition Partition information with new changes. + * @throws CatalogException Exception in failure. + */ + public void alterGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition) throws CatalogException { + PartitionInput partitionInput = PartitionInput.builder() + .storageDescriptor(getPartitionStorageDescriptor(newPartition)) + .parameters(newPartition.getProperties()) + .lastAccessTime(Instant.now()) + .build(); + UpdatePartitionRequest updatePartitionRequest = UpdatePartitionRequest.builder() + .partitionInput(partitionInput) + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .build(); + try { + UpdatePartitionResponse response = glueClient.updatePartition(updatePartitionRequest); + validateGlueResponse(response); + LOG.info(String.format("Partition updated. %s", tablePath.getFullName())); + } catch (GlueException e){ + throw new CatalogException("Glue Client failed to execute", e.getCause()); + } + + } + + public List listAllGluePartitions(ObjectPath tablePath) { + GetPartitionsRequest request = GetPartitionsRequest.builder().catalogId(awsProperties.getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()).tableName(tablePath.getObjectName()).build(); + GetPartitionsResponse response = glueClient.getPartitions(request); + // GlueOperator.validateGlueResponse(response); + List finalPartitionsList = response.partitions().stream() + .map(this::buildCatalogPartitionSpec).collect(Collectors.toList()); + String partitionsResultNextToken = response.nextToken(); + if (Optional.ofNullable(partitionsResultNextToken).isPresent()) { + do { + // creating a new GetPartitionsResult using next token. + request = + GetPartitionsRequest.builder() + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .nextToken(partitionsResultNextToken) + .catalogId(awsProperties.getGlueCatalogId()) + .build(); + response = glueClient.getPartitions(request); + finalPartitionsList.addAll(response.partitions().stream() + .map(this::buildCatalogPartitionSpec).collect(Collectors.toList())); + partitionsResultNextToken = response.nextToken(); + } while (Optional.ofNullable(partitionsResultNextToken).isPresent()); + } + + return finalPartitionsList; + + } + + public void dropGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + DeletePartitionRequest deletePartitionRequest = DeletePartitionRequest.builder() + .catalogId(awsProperties.getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .partitionValues(partitionSpec.getPartitionSpec().values()) + .build(); + + try { + DeletePartitionResponse response = glueClient.deletePartition(deletePartitionRequest); + // GlueOperator.validateGlueResponse(response); + } catch (GlueException e) { + throw new CatalogException("Glue Client failed to execute", e.getCause()); + } + } + + private StorageDescriptor getPartitionStorageDescriptor(CatalogPartition partition) { + return StorageDescriptor.builder() + .parameters(partition.getProperties()) + // todo get columns for partition spec + // .columns(getCols(partition, partitionSpec)) + .build(); + } + + private CatalogPartitionSpec buildCatalogPartitionSpec(Partition partition) { + Map params = partition.parameters(); + params.put(GlueCatalogConfig.LAST_ACCESS_TIME, partition.lastAccessTime().toString()); + return new CatalogPartitionSpec(params); + } + + public List listGluePartitionsByFilter(ObjectPath tablePath, List filters) { + String expressionString = filters.stream().map(x -> getExpressionString(x, new StringBuilder())) + .collect(Collectors.joining(GlueCatalogConfig.SPACE + GlueCatalogConfig.AND)); + try { + GetPartitionsRequest request = GetPartitionsRequest.builder().databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()) + .expression(expressionString) + .build(); + GetPartitionsResponse response = glueClient.getPartitions(request); + List catalogPartitionSpecList = response.partitions().stream() + .map(this::buildCatalogPartitionSpec).collect(Collectors.toList()); + // GlueOperator.validateGlueResponse(response); + String nextToken = response.nextToken(); + if (Optional.ofNullable(nextToken).isPresent()) { + do { + // creating a new GetPartitionsResult using next token. + request = + GetPartitionsRequest.builder().databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()) + .expression(expressionString) + .nextToken(nextToken) + .build(); + response = glueClient.getPartitions(request); + catalogPartitionSpecList.addAll(response.partitions().stream() + .map(this::buildCatalogPartitionSpec).collect(Collectors.toList())); + nextToken = response.nextToken(); + } while (Optional.ofNullable(nextToken).isPresent()); + } + return catalogPartitionSpecList; + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + private String getExpressionString(Expression expression, StringBuilder sb) { + + for (Expression childExpression: expression.getChildren()) { + if (childExpression.getChildren() != null && childExpression.getChildren().size() > 0) { + getExpressionString(childExpression, sb); + } + } + return sb.insert(0, expression.asSummaryString() + GlueCatalogConfig.SPACE + + GlueCatalogConfig.AND).toString(); + + } + + public Partition getGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException { + try { + GetPartitionRequest request = GetPartitionRequest.builder().catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()).tableName(tablePath.getObjectName()) + .partitionValues(partitionSpec.getPartitionSpec().values()) + .build(); + GetPartitionResponse response = glueClient.getPartition(request); + validateGlueResponse(response); + return response.partition(); + } catch (EntityNotFoundException e) { + throw new PartitionNotExistException(catalogName, tablePath, partitionSpec); + } + } + + public boolean gluePartitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { + GetPartitionRequest request = GetPartitionRequest.builder().catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()).tableName(tablePath.getObjectName()) + .partitionValues(partitionSpec.getPartitionSpec().values()).build(); + try { + GetPartitionResponse response = glueClient.getPartition(request); + // GlueOperator.validateGlueResponse(response); + return response.partition().parameters().keySet().containsAll(partitionSpec.getPartitionSpec().keySet()); + } catch (EntityNotFoundException e) { + LOG.warn(String.format("%s is not found", partitionSpec.getPartitionSpec())); + } catch (GlueException e) { + throw new CatalogException(catalogName, e); + } + return false; + } + + private Column getGlueColumn(CatalogBaseTable catalogBaseTable, TableSchema tableSchema, String fieldName) throws CatalogException { + Optional dataType = tableSchema.getFieldDataType(fieldName); + if (dataType.isPresent()) { + String glueDataType = datatypeMapper.mapFlinkTypeToGlueType(dataType.get()); + return Column.builder().comment(catalogBaseTable.getComment()) + .type(glueDataType) // datatype of column + // .parameters(catalogBaseTable.getOptions()) remove it for error occurs due to partition column + .name(fieldName) + .build(); + } else { + throw new CatalogException("DataType information missing from table schema"); + } + + } + + private void validateUDFClassName(String name) { + checkArgument(!isNullOrWhitespaceOnly(name)); + if (name.split(GlueCatalogConfig.DEFAULT_SEPARATOR).length != 3) { + throw new ValidationException("Improper classname"); + } + } + + private String getGlueCatalogId(){ + return awsProperties.getGlueCatalogId(); + } + + private StorageDescriptor getTableStorageDescriptor( + ObjectPath tablePath, Collection glueColumns, Map options) { + return StorageDescriptor.builder() + .columns(glueColumns) + .location(locationUri + GlueCatalogConfig.LOCATION_SEPARATOR + tablePath.getFullName()) + .parameters(options) + .build(); + } + + private UserDefinedFunctionInput createFunctionInput(ObjectPath functionPath, CatalogFunction function) + throws UnsupportedOperationException{ + Collection resourceUris = new LinkedList<>(); + for (org.apache.flink.table.resource.ResourceUri resourceUri : function.getFunctionResources()) { + switch (resourceUri.getResourceType()) { + case JAR: + case FILE: + case ARCHIVE: + resourceUris.add(ResourceUri.builder().resourceType(resourceUri.getResourceType().name()) + .uri(resourceUri.getUri()).build()); + break; + default: + throw new UnsupportedOperationException("GlueCatalog supports only creating resources JAR/FILE or ARCHIVE for now"); + } + + } + return UserDefinedFunctionInput.builder().functionName(functionPath.getObjectName()) + .className(getGlueFunctionClassName(function)) + .ownerType(PrincipalType.USER) + .ownerName(GlueCatalogConfig.FLINK_CATALOG) + .resourceUris(resourceUris) + .build(); + } + + private String getGlueFunctionClassName(CatalogFunction function) { + if (function.getFunctionLanguage().equals(FunctionLanguage.JAVA)) { + return GlueCatalogConfig.FLINK_JAVA_FUNCTION_PREFIX + GlueCatalogConfig.DEFAULT_SEPARATOR + function.getClassName(); + } else if (function.getFunctionLanguage().equals(FunctionLanguage.SCALA)) { + return GlueCatalogConfig.FLINK_SCALA_FUNCTION_PREFIX + GlueCatalogConfig.DEFAULT_SEPARATOR + function.getClassName(); + } else if (function.getFunctionLanguage().equals(FunctionLanguage.PYTHON)) { + return GlueCatalogConfig.FLINK_PYTHON_FUNCTION_PREFIX + GlueCatalogConfig.DEFAULT_SEPARATOR + function.getClassName(); + } else { + throw new UnsupportedOperationException(String.format("GlueCatalog supports only creating: [%s]", + Arrays.stream(FunctionLanguage.values()).map(FunctionLanguage::name) + .collect(Collectors.joining(GlueCatalogConfig.NEXT_LINE)))); + } + } + + private FunctionLanguage getFunctionalLanguage(UserDefinedFunction glueFunction) { + if (glueFunction.className().startsWith(GlueCatalogConfig.FLINK_JAVA_FUNCTION_PREFIX)) { + return FunctionLanguage.JAVA; + } else if (glueFunction.className().startsWith(GlueCatalogConfig.FLINK_PYTHON_FUNCTION_PREFIX)) { + return FunctionLanguage.PYTHON; + } else if (glueFunction.className().startsWith(GlueCatalogConfig.FLINK_SCALA_FUNCTION_PREFIX)) { + return FunctionLanguage.SCALA; + } else { + throw new CatalogException("Invalid Functional Language"); + } + } + + /** + * + * @param udf Instance of UserDefinedFunction + * @return ClassName for function + */ + private String getFunctionClassName(UserDefinedFunction udf){ + validateUDFClassName(udf.className()); + return udf.functionName().split(GlueCatalogConfig.DEFAULT_SEPARATOR)[1]; + + } + + private boolean filterGlueUDF(UserDefinedFunction udf) { + try { + validateUDFClassName(udf.functionName()); + return true; + } catch (ValidationException e) { + return false; + } + } + + /** + * Rename glue table. + * Direct renaming is not supported in glue. + * It has to be done in 3 step. + * 1. fetch existing table info from glue + * 2. Create a table with new-name and info fetched in step 1 + * 3. Delete existing table + * @param oldTablePath old table name + * @param newTablePath new renamed table + */ + public void renameGlueTable(ObjectPath oldTablePath, ObjectPath newTablePath) throws CatalogException, TableNotExistException { + // Glue catalog don't support renaming table. + // no need to handle crashes + // update user about this in proper information + // Probable steps to do + // 1. Get Current Glue Table + // 2. Derive input with renamed values + // 3. create new table + // 4. create partitions with renamed values + // 5. delete old partitions + // 6. delete old table + // todo: when statistics features are implemented in GlueCatalog + // 1. update table statistics + // 2. update table column statistics + // 3. partition statistics + } + + /** + * Create a {@link CatalogTable} using all the information from {@link Table}. + * @param glueTable Instance of Table from glue Data catalog. + * @return {@link CatalogTable}. + */ + public CatalogTable getCatalogTableFromGlueTable(Table glueTable) { + + checkNotNull(glueTable, "Fetched Glue table is null"); + Schema schemaInfo = getSchemaFromGlueTable(glueTable); + + List partitionKeys = glueTable.partitionKeys().stream().map(Column::name).collect(Collectors.toList()); + LOG.info("Testing Here" + String.join(", ", partitionKeys)); + Map properties = new HashMap<>(glueTable.parameters()); + if (glueTable.owner() != null) { + properties.put(GlueCatalogConfig.TABLE_OWNER, glueTable.owner()); + } + + if (glueTable.storageDescriptor().parameters().size() > 0) { + properties.putAll(glueTable.storageDescriptor().parameters()); + } + + return CatalogTable.of(schemaInfo, glueTable.description(), partitionKeys, properties); + } + + /** + * Get CatalogPartitionSpec of all partitions that is under the given CatalogPartitionSpec in + * the table. + * @param tablePath + * @param partitionSpec + * @return + */ + public List listGluePartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException { + Table glueTable = getGlueTable(tablePath); + + if (glueTable.hasPartitionKeys()) { + return glueTable.partitionKeys().stream() + .filter (partition -> specSubset(partitionSpec.getPartitionSpec(), partition.parameters())) + .map(col -> new CatalogPartitionSpec(col.parameters())) + .collect(Collectors.toList()); + + } else { + throw new TableNotPartitionedException(catalogName, tablePath); + } + } + + private boolean specSubset(Map spec1, Map spec2) { + return spec1.entrySet().stream() + .allMatch(e -> e.getValue().equals(spec2.get(e.getKey()))); + } + + /** + * A Glue database name cannot be longer than 252 characters. The only acceptable characters are + * lowercase letters, numbers, and the underscore character. More details: + * https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html + * + * @param name name + */ + static void validateName(String name) { + checkArgument(name != null && GlueCatalogConfig.GLUE_DB_PATTERN.matcher(name).find(), + "Database name is not according to Glue Norms, " + + "check here https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html"); + } + +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/DatatypeMapper.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/DatatypeMapper.java new file mode 100644 index 000000000..7ae6004f7 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/DatatypeMapper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.mapper; + +import org.apache.flink.table.types.DataType; + +/** + * Interface to define mapping datatype from flink to other framework and vice-versa. + */ +public interface DatatypeMapper { + + /** + * Takes the Column information and retrieves datatype name. + * @param column Type that extends Schema.UnresolvedColumn + * @return stringify name of datatype for column + */ + default String mapFlinkTypeToGlueType(final T column) { + throw new UnsupportedOperationException("Provide Implementation for using the functionality"); + } + +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/GlueDatatypeMapper.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/GlueDatatypeMapper.java new file mode 100644 index 000000000..52877b5ae --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/GlueDatatypeMapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.mapper; + +import org.apache.flink.table.types.DataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation to define datatype from flink to glue catalog and vice-versa. + */ +public class GlueDatatypeMapper implements DatatypeMapper { + + private static final Logger LOG = LoggerFactory.getLogger(GlueDatatypeMapper.class); + + @Override + public String mapFlinkTypeToGlueType(T type) { + StringBuilder sb = new StringBuilder(); + if (type != null) { + getStringifiedDatatype(type, sb); + + } + return sb.toString(); + } + + private void getStringifiedDatatype(DataType dataType, StringBuilder sb) { + sb.append(dataType.toString()); + } + +} diff --git a/flink-catalog-aws-glue/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-catalog-aws-glue/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..c8308d477 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.catalog.glue.GlueCatalogFactory \ No newline at end of file diff --git a/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/GlueCatalogTest.java b/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/GlueCatalogTest.java new file mode 100644 index 000000000..ec790b22e --- /dev/null +++ b/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/GlueCatalogTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.glue.GlueCatalog; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.catalog.stats.Date; +import org.apache.flink.table.functions.TestGenericUDF; +import org.apache.flink.table.functions.TestSimpleUDF; +import org.apache.flink.table.utils.TableEnvironmentMock; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for GlueCatalog. */ +class GlueCatalogTest extends CatalogTestBase { + + @BeforeAll + static void init() { + catalog = new GlueCatalog(TEST_CATALOG_NAME); + catalog.open(); + } + + // ------ tables ------ + + @Test + void testDropTable_partitionedTable() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createPartitionedTable(), false); + CatalogPartition catalogPartition = createPartition(); + CatalogPartitionSpec catalogPartitionSpec = createPartitionSpec(); + catalog.createPartition(path1, catalogPartitionSpec, catalogPartition, false); + + assertThat(catalog.tableExists(path1)).isTrue(); + + catalog.dropTable(path1, false); + + assertThat(catalog.tableExists(path1)).isFalse(); + assertThat(catalog.partitionExists(path1, catalogPartitionSpec)).isFalse(); + } + + @Test + void testRenameTable_partitionedTable() throws Exception { + catalog.createDatabase(db1, createDb(), false); + CatalogTable table = createPartitionedTable(); + catalog.createTable(path1, table, false); + CatalogPartition catalogPartition = createPartition(); + CatalogPartitionSpec catalogPartitionSpec = createPartitionSpec(); + catalog.createPartition(path1, catalogPartitionSpec, catalogPartition, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1)); + assertThat(catalog.partitionExists(path1, catalogPartitionSpec)).isTrue(); + + catalog.renameTable(path1, t2, false); + + CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path3)); + assertThat(catalog.partitionExists(path3, catalogPartitionSpec)).isTrue(); + assertThat(catalog.tableExists(path1)).isFalse(); + assertThat(catalog.partitionExists(path1, catalogPartitionSpec)).isFalse(); + } + + // ------ statistics ------ + + @Test + void testStatistics() throws Exception { + // Table related + catalog.createDatabase(db1, createDb(), false); + CatalogTable table = createTable(); + catalog.createTable(path1, table, false); + + CatalogTestUtil.checkEquals( + catalog.getTableStatistics(path1), CatalogTableStatistics.UNKNOWN); + CatalogTestUtil.checkEquals( + catalog.getTableColumnStatistics(path1), CatalogColumnStatistics.UNKNOWN); + + CatalogTableStatistics tableStatistics = new CatalogTableStatistics(5, 2, 100, 575); + catalog.alterTableStatistics(path1, tableStatistics, false); + CatalogTestUtil.checkEquals(tableStatistics, catalog.getTableStatistics(path1)); + CatalogColumnStatistics columnStatistics = createColumnStats(); + catalog.alterTableColumnStatistics(path1, columnStatistics, false); + CatalogTestUtil.checkEquals(columnStatistics, catalog.getTableColumnStatistics(path1)); + + // Partition related + catalog.createDatabase(db2, createDb(), false); + CatalogTable table2 = createPartitionedTable(); + catalog.createTable(path2, table2, false); + CatalogPartitionSpec partitionSpec = createPartitionSpec(); + catalog.createPartition(path2, partitionSpec, createPartition(), false); + + CatalogTestUtil.checkEquals( + catalog.getPartitionStatistics(path2, partitionSpec), + CatalogTableStatistics.UNKNOWN); + CatalogTestUtil.checkEquals( + catalog.getPartitionColumnStatistics(path2, partitionSpec), + CatalogColumnStatistics.UNKNOWN); + + catalog.alterPartitionStatistics(path2, partitionSpec, tableStatistics, false); + CatalogTestUtil.checkEquals( + tableStatistics, catalog.getPartitionStatistics(path2, partitionSpec)); + catalog.alterPartitionColumnStatistics(path2, partitionSpec, columnStatistics, false); + CatalogTestUtil.checkEquals( + columnStatistics, catalog.getPartitionColumnStatistics(path2, partitionSpec)); + + // Clean up + catalog.dropTable(path1, false); + catalog.dropDatabase(db1, false, false); + catalog.dropTable(path2, false); + catalog.dropDatabase(db2, false, false); + } + + @Test + void testBulkGetPartitionStatistics() throws Exception { + // create table + catalog.createDatabase(db1, createDb(), false); + CatalogTable table1 = createPartitionedTable(); + catalog.createTable(path1, table1, false); + // create two partition specs + CatalogPartitionSpec partitionSpec = createPartitionSpec(); + catalog.createPartition(path1, partitionSpec, createPartition(), false); + CatalogPartitionSpec anotherPartitionSpec = createAnotherPartitionSpec(); + catalog.createPartition(path1, anotherPartitionSpec, createPartition(), false); + + List catalogTableStatistics = + catalog.bulkGetPartitionStatistics( + path1, Arrays.asList(partitionSpec, anotherPartitionSpec)); + // got statistic from catalog should be unknown since no statistic has been put into + // partition + for (CatalogTableStatistics statistics : catalogTableStatistics) { + CatalogTestUtil.checkEquals(statistics, CatalogTableStatistics.UNKNOWN); + } + + // put statistic for partition + CatalogTableStatistics tableStatistics = new CatalogTableStatistics(5, 2, 100, 575); + CatalogTableStatistics anotherTableStatistics = new CatalogTableStatistics(1, 1, 1, 5); + catalog.alterPartitionStatistics(path1, partitionSpec, tableStatistics, false); + catalog.alterPartitionStatistics( + path1, anotherPartitionSpec, anotherTableStatistics, false); + + catalogTableStatistics = + catalog.bulkGetPartitionStatistics( + path1, Arrays.asList(partitionSpec, anotherPartitionSpec)); + CatalogTestUtil.checkEquals(catalogTableStatistics.get(0), tableStatistics); + CatalogTestUtil.checkEquals(catalogTableStatistics.get(1), anotherTableStatistics); + } + + @Test + void testBulkGetPartitionColumnStatistics() throws Exception { + // create table + catalog.createDatabase(db1, createDb(), false); + CatalogTable table1 = createPartitionedTable(); + catalog.createTable(path1, table1, false); + // create two partition specs + CatalogPartitionSpec partitionSpec = createPartitionSpec(); + catalog.createPartition(path1, partitionSpec, createPartition(), false); + CatalogPartitionSpec anotherPartitionSpec = createAnotherPartitionSpec(); + catalog.createPartition(path1, anotherPartitionSpec, createPartition(), false); + List catalogPartitionSpecs = + Arrays.asList(partitionSpec, anotherPartitionSpec); + + List catalogColumnStatistics = + catalog.bulkGetPartitionColumnStatistics(path1, catalogPartitionSpecs); + // got statistic from catalog should be unknown since no statistic has been put into + // partition + for (CatalogColumnStatistics statistics : catalogColumnStatistics) { + CatalogTestUtil.checkEquals(statistics, CatalogColumnStatistics.UNKNOWN); + } + + // put statistic for partition + CatalogColumnStatistics columnStatistics = createColumnStats(); + catalog.alterPartitionColumnStatistics(path1, partitionSpec, columnStatistics, false); + catalog.alterPartitionColumnStatistics( + path1, anotherPartitionSpec, columnStatistics, false); + + catalogColumnStatistics = + catalog.bulkGetPartitionColumnStatistics(path1, catalogPartitionSpecs); + for (CatalogColumnStatistics statistics : catalogColumnStatistics) { + CatalogTestUtil.checkEquals(statistics, columnStatistics); + } + } + + // ------ utilities ------ + + @Override + protected boolean isGeneric() { + return true; + } + + private CatalogColumnStatistics createColumnStats() { + CatalogColumnStatisticsDataBoolean booleanColStats = + new CatalogColumnStatisticsDataBoolean(55L, 45L, 5L); + CatalogColumnStatisticsDataLong longColStats = + new CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 79L); + CatalogColumnStatisticsDataString stringColStats = + new CatalogColumnStatisticsDataString(152L, 43.5D, 20L, 0L); + CatalogColumnStatisticsDataDate dateColStats = + new CatalogColumnStatisticsDataDate(new Date(71L), new Date(17923L), 1321L, 0L); + CatalogColumnStatisticsDataDouble doubleColStats = + new CatalogColumnStatisticsDataDouble(-123.35D, 7633.22D, 23L, 79L); + CatalogColumnStatisticsDataBinary binaryColStats = + new CatalogColumnStatisticsDataBinary(755L, 43.5D, 20L); + Map colStatsMap = new HashMap<>(6); + colStatsMap.put("b1", booleanColStats); + colStatsMap.put("l2", longColStats); + colStatsMap.put("s3", stringColStats); + colStatsMap.put("d4", dateColStats); + colStatsMap.put("dd5", doubleColStats); + colStatsMap.put("bb6", binaryColStats); + return new CatalogColumnStatistics(colStatsMap); + } + + @Override + protected CatalogFunction createFunction() { + return new CatalogFunctionImpl(TestGenericUDF.class.getCanonicalName()); + } + + @Override + protected CatalogFunction createAnotherFunction() { + return new CatalogFunctionImpl( + TestSimpleUDF.class.getCanonicalName(), FunctionLanguage.SCALA); + } + + @Override + protected CatalogFunction createPythonFunction() { + return new CatalogFunctionImpl("test.func1", FunctionLanguage.PYTHON); + } + + @Test + void testRegisterCatalog() { + final TableEnvironmentMock tableEnv = TableEnvironmentMock.getStreamingInstance(); + try { + tableEnv.registerCatalog(TEST_CATALOG_NAME, new MyCatalog(TEST_CATALOG_NAME)); + } catch (CatalogException e) { + } + assertThat(tableEnv.getCatalog(TEST_CATALOG_NAME)).isNotPresent(); + } + + class MyCatalog extends GlueCatalog { + + public MyCatalog(String name) { + super(name); + } + + @Override + public void open() { + throw new CatalogException("open catalog failed."); + } + } +} diff --git a/pom.xml b/pom.xml index 072c20037..04d488e05 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,8 @@ under the License. flink-sql-connector-aws-kinesis-streams flink-sql-connector-kinesis + flink-catalog-aws-glue + flink-connector-aws-e2e-tests