Skip to content

Commit

Permalink
Flink: Support alter table column (apache#7628)
Browse files Browse the repository at this point in the history
  • Loading branch information
linyanghao authored Sep 27, 2023
1 parent b7296a7 commit 1e52f2e
Show file tree
Hide file tree
Showing 6 changed files with 634 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand All @@ -60,15 +61,14 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.flink.util.FlinkAlterTableUtil;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -91,7 +91,6 @@
* independent of the partition of Flink.
*/
public class FlinkCatalog extends AbstractCatalog {

private final CatalogLoader catalogLoader;
private final Catalog icebergCatalog;
private final Namespace baseNamespace;
Expand Down Expand Up @@ -439,14 +438,35 @@ private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTab
if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
&& Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
&& equalsPrimary)) {
throw new UnsupportedOperationException("Altering schema is not supported yet.");
throw new UnsupportedOperationException(
"Altering schema is not supported in the old alterTable API. "
+ "To alter schema, use the other alterTable API and provide a list of TableChange's.");
}

validateTablePartition(ct1, ct2);
}

private static void validateTablePartition(CatalogTable ct1, CatalogTable ct2) {
if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
}
}

/**
* This alterTable API only supports altering table properties.
*
* <p>Support for adding/removing/renaming columns cannot be done by comparing CatalogTable
* instances, unless the Flink schema contains Iceberg column IDs.
*
* <p>To alter columns, use the other alterTable API and provide a list of TableChange's.
*
* @param tablePath path of the table or view to be modified
* @param newTable the new table definition
* @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if set
* to false, throw an exception, if set to true, do nothing.
* @throws CatalogException in case of any runtime exception
* @throws TableNotExistException if the table does not exist
*/
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws CatalogException, TableNotExistException {
Expand All @@ -464,12 +484,6 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
}

CatalogTable table = toCatalogTable(icebergTable);

// Currently, Flink SQL only support altering table properties.

// For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by
// comparing
// CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
validateTableSchemaAndPartition(table, (CatalogTable) newTable);

Map<String, String> oldProperties = table.getOptions();
Expand Down Expand Up @@ -507,7 +521,66 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
}
});

commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
FlinkAlterTableUtil.commitChanges(
icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
}

@Override
public void alterTable(
ObjectPath tablePath,
CatalogBaseTable newTable,
List<TableChange> tableChanges,
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
validateFlinkTable(newTable);

Table icebergTable;
try {
icebergTable = loadIcebergTable(tablePath);
} catch (TableNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
} else {
return;
}
}

// Does not support altering partition yet.
validateTablePartition(toCatalogTable(icebergTable), (CatalogTable) newTable);

String setLocation = null;
String setSnapshotId = null;
String cherrypickSnapshotId = null;

List<TableChange> propertyChanges = Lists.newArrayList();
List<TableChange> schemaChanges = Lists.newArrayList();
for (TableChange change : tableChanges) {
if (change instanceof TableChange.SetOption) {
TableChange.SetOption set = (TableChange.SetOption) change;

if ("location".equalsIgnoreCase(set.getKey())) {
setLocation = set.getValue();
} else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) {
setSnapshotId = set.getValue();
} else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) {
cherrypickSnapshotId = set.getValue();
} else {
propertyChanges.add(change);
}
} else if (change instanceof TableChange.ResetOption) {
propertyChanges.add(change);
} else {
schemaChanges.add(change);
}
}

FlinkAlterTableUtil.commitChanges(
icebergTable,
setLocation,
setSnapshotId,
cherrypickSnapshotId,
schemaChanges,
propertyChanges);
}

private static void validateFlinkTable(CatalogBaseTable table) {
Expand Down Expand Up @@ -552,52 +625,6 @@ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSc
return partitionKeysBuilder.build();
}

private static void commitChanges(
Table table,
String setLocation,
String setSnapshotId,
String pickSnapshotId,
Map<String, String> setProperties) {
// don't allow setting the snapshot and picking a commit at the same time because order is
// ambiguous and choosing
// one order leads to different results
Preconditions.checkArgument(
setSnapshotId == null || pickSnapshotId == null,
"Cannot set the current snapshot ID and cherry-pick snapshot changes");

if (setSnapshotId != null) {
long newSnapshotId = Long.parseLong(setSnapshotId);
table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
}

// if updating the table snapshot, perform that update first in case it fails
if (pickSnapshotId != null) {
long newSnapshotId = Long.parseLong(pickSnapshotId);
table.manageSnapshots().cherrypick(newSnapshotId).commit();
}

Transaction transaction = table.newTransaction();

if (setLocation != null) {
transaction.updateLocation().setLocation(setLocation).commit();
}

if (!setProperties.isEmpty()) {
UpdateProperties updateProperties = transaction.updateProperties();
setProperties.forEach(
(k, v) -> {
if (v == null) {
updateProperties.remove(k);
} else {
updateProperties.set(k, v);
}
});
updateProperties.commit();
}

transaction.commitTransaction();
}

static CatalogTable toCatalogTable(Table table) {
TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public static LogicalType convert(Type type) {
return TypeUtil.visit(type, new TypeToFlinkType());
}

/**
* Convert a {@link LogicalType Flink type} to a {@link Type}.
*
* @param flinkType a FlinkType
* @return the equivalent Iceberg type
*/
public static Type convert(LogicalType flinkType) {
return flinkType.accept(new FlinkTypeToType());
}

/**
* Convert a {@link RowType} to a {@link TableSchema}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class FlinkTypeToType extends FlinkTypeVisitor<Type> {
private final RowType root;
private int nextId;

FlinkTypeToType() {
this.root = null;
}

FlinkTypeToType(RowType root) {
this.root = root;
// the root struct's fields use the first ids
Expand Down
Loading

0 comments on commit 1e52f2e

Please sign in to comment.