diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 825816fdf416..f022c8abcb00 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -38,6 +38,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -60,8 +61,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.Transaction; -import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -69,6 +68,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.flink.util.FlinkAlterTableUtil; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -91,7 +91,6 @@ * independent of the partition of Flink. */ public class FlinkCatalog extends AbstractCatalog { - private final CatalogLoader catalogLoader; private final Catalog icebergCatalog; private final Namespace baseNamespace; @@ -439,14 +438,35 @@ private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTab if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns()) && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs()) && equalsPrimary)) { - throw new UnsupportedOperationException("Altering schema is not supported yet."); + throw new UnsupportedOperationException( + "Altering schema is not supported in the old alterTable API. " + + "To alter schema, use the other alterTable API and provide a list of TableChange's."); } + validateTablePartition(ct1, ct2); + } + + private static void validateTablePartition(CatalogTable ct1, CatalogTable ct2) { if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) { throw new UnsupportedOperationException("Altering partition keys is not supported yet."); } } + /** + * This alterTable API only supports altering table properties. + * + *

Support for adding/removing/renaming columns cannot be done by comparing CatalogTable + * instances, unless the Flink schema contains Iceberg column IDs. + * + *

To alter columns, use the other alterTable API and provide a list of TableChange's. + * + * @param tablePath path of the table or view to be modified + * @param newTable the new table definition + * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if set + * to false, throw an exception, if set to true, do nothing. + * @throws CatalogException in case of any runtime exception + * @throws TableNotExistException if the table does not exist + */ @Override public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws CatalogException, TableNotExistException { @@ -464,12 +484,6 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean } CatalogTable table = toCatalogTable(icebergTable); - - // Currently, Flink SQL only support altering table properties. - - // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by - // comparing - // CatalogTable instances, unless the Flink schema contains Iceberg column IDs. validateTableSchemaAndPartition(table, (CatalogTable) newTable); Map oldProperties = table.getOptions(); @@ -507,7 +521,66 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean } }); - commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties); + FlinkAlterTableUtil.commitChanges( + icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties); + } + + @Override + public void alterTable( + ObjectPath tablePath, + CatalogBaseTable newTable, + List tableChanges, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateFlinkTable(newTable); + + Table icebergTable; + try { + icebergTable = loadIcebergTable(tablePath); + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } else { + return; + } + } + + // Does not support altering partition yet. + validateTablePartition(toCatalogTable(icebergTable), (CatalogTable) newTable); + + String setLocation = null; + String setSnapshotId = null; + String cherrypickSnapshotId = null; + + List propertyChanges = Lists.newArrayList(); + List schemaChanges = Lists.newArrayList(); + for (TableChange change : tableChanges) { + if (change instanceof TableChange.SetOption) { + TableChange.SetOption set = (TableChange.SetOption) change; + + if ("location".equalsIgnoreCase(set.getKey())) { + setLocation = set.getValue(); + } else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) { + setSnapshotId = set.getValue(); + } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) { + cherrypickSnapshotId = set.getValue(); + } else { + propertyChanges.add(change); + } + } else if (change instanceof TableChange.ResetOption) { + propertyChanges.add(change); + } else { + schemaChanges.add(change); + } + } + + FlinkAlterTableUtil.commitChanges( + icebergTable, + setLocation, + setSnapshotId, + cherrypickSnapshotId, + schemaChanges, + propertyChanges); } private static void validateFlinkTable(CatalogBaseTable table) { @@ -552,52 +625,6 @@ private static List toPartitionKeys(PartitionSpec spec, Schema icebergSc return partitionKeysBuilder.build(); } - private static void commitChanges( - Table table, - String setLocation, - String setSnapshotId, - String pickSnapshotId, - Map setProperties) { - // don't allow setting the snapshot and picking a commit at the same time because order is - // ambiguous and choosing - // one order leads to different results - Preconditions.checkArgument( - setSnapshotId == null || pickSnapshotId == null, - "Cannot set the current snapshot ID and cherry-pick snapshot changes"); - - if (setSnapshotId != null) { - long newSnapshotId = Long.parseLong(setSnapshotId); - table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit(); - } - - // if updating the table snapshot, perform that update first in case it fails - if (pickSnapshotId != null) { - long newSnapshotId = Long.parseLong(pickSnapshotId); - table.manageSnapshots().cherrypick(newSnapshotId).commit(); - } - - Transaction transaction = table.newTransaction(); - - if (setLocation != null) { - transaction.updateLocation().setLocation(setLocation).commit(); - } - - if (!setProperties.isEmpty()) { - UpdateProperties updateProperties = transaction.updateProperties(); - setProperties.forEach( - (k, v) -> { - if (v == null) { - updateProperties.remove(k); - } else { - updateProperties.set(k, v); - } - }); - updateProperties.commit(); - } - - transaction.commitTransaction(); - } - static CatalogTable toCatalogTable(Table table) { TableSchema schema = FlinkSchemaUtil.toSchema(table.schema()); List partitionKeys = toPartitionKeys(table.spec(), table.schema()); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java index 25725639c330..a6b53879ad80 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java @@ -134,6 +134,16 @@ public static LogicalType convert(Type type) { return TypeUtil.visit(type, new TypeToFlinkType()); } + /** + * Convert a {@link LogicalType Flink type} to a {@link Type}. + * + * @param flinkType a FlinkType + * @return the equivalent Iceberg type + */ + public static Type convert(LogicalType flinkType) { + return flinkType.accept(new FlinkTypeToType()); + } + /** * Convert a {@link RowType} to a {@link TableSchema}. * diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java index 6f8bfef2ef44..408065f06057 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java @@ -49,6 +49,10 @@ class FlinkTypeToType extends FlinkTypeVisitor { private final RowType root; private int nextId; + FlinkTypeToType() { + this.root = null; + } + FlinkTypeToType(RowType root) { this.root = root; // the root struct's fields use the first ids diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java new file mode 100644 index 000000000000..f0b9bf64fb1a --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import java.util.List; +import java.util.Map; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; + +public class FlinkAlterTableUtil { + private FlinkAlterTableUtil() {} + + public static void commitChanges( + Table table, + String setLocation, + String setSnapshotId, + String pickSnapshotId, + Map setProperties) { + commitManageSnapshots(table, setSnapshotId, pickSnapshotId); + + Transaction transaction = table.newTransaction(); + + if (setLocation != null) { + transaction.updateLocation().setLocation(setLocation).commit(); + } + + if (!setProperties.isEmpty()) { + UpdateProperties updateProperties = transaction.updateProperties(); + setProperties.forEach( + (k, v) -> { + if (v == null) { + updateProperties.remove(k); + } else { + updateProperties.set(k, v); + } + }); + updateProperties.commit(); + } + + transaction.commitTransaction(); + } + + public static void commitChanges( + Table table, + String setLocation, + String setSnapshotId, + String pickSnapshotId, + List schemaChanges, + List propertyChanges) { + commitManageSnapshots(table, setSnapshotId, pickSnapshotId); + + Transaction transaction = table.newTransaction(); + + if (setLocation != null) { + transaction.updateLocation().setLocation(setLocation).commit(); + } + + if (!schemaChanges.isEmpty()) { + UpdateSchema updateSchema = transaction.updateSchema(); + FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges); + updateSchema.commit(); + } + + if (!propertyChanges.isEmpty()) { + UpdateProperties updateProperties = transaction.updateProperties(); + FlinkAlterTableUtil.applyPropertyChanges(updateProperties, propertyChanges); + updateProperties.commit(); + } + + transaction.commitTransaction(); + } + + public static void commitManageSnapshots( + Table table, String setSnapshotId, String cherrypickSnapshotId) { + // don't allow setting the snapshot and picking a commit at the same time because order is + // ambiguous and choosing one order leads to different results + Preconditions.checkArgument( + setSnapshotId == null || cherrypickSnapshotId == null, + "Cannot set the current snapshot ID and cherry-pick snapshot changes"); + + if (setSnapshotId != null) { + long newSnapshotId = Long.parseLong(setSnapshotId); + table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit(); + } + + // if updating the table snapshot, perform that update first in case it fails + if (cherrypickSnapshotId != null) { + long newSnapshotId = Long.parseLong(cherrypickSnapshotId); + table.manageSnapshots().cherrypick(newSnapshotId).commit(); + } + } + + /** + * Applies a list of Flink table changes to an {@link UpdateSchema} operation. + * + * @param pendingUpdate an uncommitted UpdateSchema operation to configure + * @param schemaChanges a list of Flink table changes + */ + public static void applySchemaChanges( + UpdateSchema pendingUpdate, List schemaChanges) { + for (TableChange change : schemaChanges) { + if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + Column flinkColumn = addColumn.getColumn(); + Preconditions.checkArgument( + FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn), + "Unsupported table change: Adding computed column %s.", + flinkColumn.getName()); + Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); + if (flinkColumn.getDataType().getLogicalType().isNullable()) { + pendingUpdate.addColumn(flinkColumn.getName(), icebergType); + } else { + pendingUpdate.addRequiredColumn(flinkColumn.getName(), icebergType); + } + } else if (change instanceof TableChange.ModifyColumn) { + TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; + applyModifyColumn(pendingUpdate, modifyColumn); + } else if (change instanceof TableChange.DropColumn) { + TableChange.DropColumn dropColumn = (TableChange.DropColumn) change; + pendingUpdate.deleteColumn(dropColumn.getColumnName()); + } else if (change instanceof TableChange.AddWatermark) { + throw new UnsupportedOperationException("Unsupported table change: AddWatermark."); + } else if (change instanceof TableChange.ModifyWatermark) { + throw new UnsupportedOperationException("Unsupported table change: ModifyWatermark."); + } else if (change instanceof TableChange.DropWatermark) { + throw new UnsupportedOperationException("Unsupported table change: DropWatermark."); + } else if (change instanceof TableChange.AddUniqueConstraint) { + TableChange.AddUniqueConstraint addPk = (TableChange.AddUniqueConstraint) change; + applyUniqueConstraint(pendingUpdate, addPk.getConstraint()); + } else if (change instanceof TableChange.ModifyUniqueConstraint) { + TableChange.ModifyUniqueConstraint modifyPk = (TableChange.ModifyUniqueConstraint) change; + applyUniqueConstraint(pendingUpdate, modifyPk.getNewConstraint()); + } else if (change instanceof TableChange.DropConstraint) { + throw new UnsupportedOperationException("Unsupported table change: DropConstraint."); + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + } + + /** + * Applies a list of Flink table property changes to an {@link UpdateProperties} operation. + * + * @param pendingUpdate an uncommitted UpdateProperty operation to configure + * @param propertyChanges a list of Flink table changes + */ + public static void applyPropertyChanges( + UpdateProperties pendingUpdate, List propertyChanges) { + for (TableChange change : propertyChanges) { + if (change instanceof TableChange.SetOption) { + TableChange.SetOption setOption = (TableChange.SetOption) change; + pendingUpdate.set(setOption.getKey(), setOption.getValue()); + } else if (change instanceof TableChange.ResetOption) { + TableChange.ResetOption resetOption = (TableChange.ResetOption) change; + pendingUpdate.remove(resetOption.getKey()); + } else { + throw new UnsupportedOperationException( + "The given table change is not a property change: " + change); + } + } + } + + private static void applyModifyColumn( + UpdateSchema pendingUpdate, TableChange.ModifyColumn modifyColumn) { + if (modifyColumn instanceof TableChange.ModifyColumnName) { + TableChange.ModifyColumnName modifyName = (TableChange.ModifyColumnName) modifyColumn; + pendingUpdate.renameColumn(modifyName.getOldColumnName(), modifyName.getNewColumnName()); + } else if (modifyColumn instanceof TableChange.ModifyColumnPosition) { + TableChange.ModifyColumnPosition modifyPosition = + (TableChange.ModifyColumnPosition) modifyColumn; + applyModifyColumnPosition(pendingUpdate, modifyPosition); + } else if (modifyColumn instanceof TableChange.ModifyPhysicalColumnType) { + TableChange.ModifyPhysicalColumnType modifyType = + (TableChange.ModifyPhysicalColumnType) modifyColumn; + Type type = FlinkSchemaUtil.convert(modifyType.getNewType().getLogicalType()); + String columnName = modifyType.getOldColumn().getName(); + pendingUpdate.updateColumn(columnName, type.asPrimitiveType()); + if (modifyType.getNewColumn().getDataType().getLogicalType().isNullable()) { + pendingUpdate.makeColumnOptional(columnName); + } else { + pendingUpdate.requireColumn(columnName); + } + } else if (modifyColumn instanceof TableChange.ModifyColumnComment) { + TableChange.ModifyColumnComment modifyComment = + (TableChange.ModifyColumnComment) modifyColumn; + pendingUpdate.updateColumnDoc( + modifyComment.getOldColumn().getName(), modifyComment.getNewComment()); + } else { + throw new UnsupportedOperationException( + "Cannot apply unknown modify-column change: " + modifyColumn); + } + } + + private static void applyModifyColumnPosition( + UpdateSchema pendingUpdate, TableChange.ModifyColumnPosition modifyColumnPosition) { + TableChange.ColumnPosition newPosition = modifyColumnPosition.getNewPosition(); + if (newPosition instanceof TableChange.First) { + pendingUpdate.moveFirst(modifyColumnPosition.getOldColumn().getName()); + } else if (newPosition instanceof TableChange.After) { + TableChange.After after = (TableChange.After) newPosition; + pendingUpdate.moveAfter(modifyColumnPosition.getOldColumn().getName(), after.column()); + } else { + throw new UnsupportedOperationException( + "Cannot apply unknown modify-column-position change: " + modifyColumnPosition); + } + } + + private static void applyUniqueConstraint( + UpdateSchema pendingUpdate, UniqueConstraint constraint) { + switch (constraint.getType()) { + case PRIMARY_KEY: + pendingUpdate.setIdentifierFields(constraint.getColumns()); + break; + case UNIQUE_KEY: + throw new UnsupportedOperationException( + "Unsupported table change: setting unique key constraints."); + default: + throw new UnsupportedOperationException( + "Cannot apply unknown unique constraint: " + constraint.getType().name()); + } + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java index 2c5c587f4ebf..f02af894e82b 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -39,4 +40,8 @@ public static TypeInformation toTypeInfo(RowType rowType) { public static boolean isPhysicalColumn(TableColumn column) { return column.isPhysical(); } + + public static boolean isPhysicalColumn(Column column) { + return column.isPhysical(); + } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 16dcf4a9f4ce..8f5ddde91851 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.constraints.UniqueConstraint; @@ -297,7 +298,7 @@ public void testLoadTransformPartitionTable() throws TableNotExistException { } @Test - public void testAlterTable() throws TableNotExistException { + public void testAlterTableProperties() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); Map properties = Maps.newHashMap(); properties.put("oldK", "oldV"); @@ -313,39 +314,297 @@ public void testAlterTable() throws TableNotExistException { assertThat(table("tl").properties()).containsAllEntriesOf(properties); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); assertThat(table("tl").properties()).containsAllEntriesOf(properties); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - assertThat(table("tl").properties()).containsAllEntriesOf(properties); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - assertThat(table("tl").properties()).containsAllEntriesOf(properties); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - assertThat(table("tl").properties()).containsAllEntriesOf(properties); + // Adding a required field should fail because Iceberg's SchemaUpdate does not allow + // incompatible changes. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Incompatible change: cannot add required column: pk"); + + // Adding an existing field should fail due to Flink's internal validation. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Try to add a column `id` which already exists in the table."); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail due to Flink's internal validation. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("The column `foo` does not exist in the base table."); + + // Dropping an already-deleted field should fail due to Flink's internal validation. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("The column `dt` does not exist in the base table."); + } + + @Test + public void testAlterTableModifyColumnName() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl RENAME dt TO data"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnType() { + sql("CREATE TABLE tl(id INTEGER, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Promote type from Integer to Long + sql("ALTER TABLE tl MODIFY (id BIGINT)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + + // Type change that doesn't follow the type-promotion rule should fail due to Iceberg's + // validation. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Cannot change column type: dt: string -> int"); + } + + @Test + public void testAlterTableModifyColumnNullability() { + sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Changing nullability from optional to required should fail + // because Iceberg's SchemaUpdate does not allow incompatible changes. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Cannot change column nullability: dt: optional -> required"); + + // Set nullability from required to optional + sql("ALTER TABLE tl MODIFY (id INTEGER)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnPosition() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING FIRST)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(1, "id", Types.LongType.get())) + .asStruct(), + schemaAfter.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)"); + Schema schemaAfterAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfterAfter.asStruct()); + + // Modifying the position of a non-existing column should fail due to Flink's internal + // validation. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (non_existing STRING FIRST)")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Try to modify a column `non_existing` which does not exist in the table."); + + // Moving a column after a non-existing column should fail due to Flink's internal validation. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING AFTER non_existing)")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Referenced column `non_existing` by 'AFTER' does not exist in the table."); + } + + @Test + public void testAlterTableModifyColumnComment() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'comment for dt field')"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get(), "comment for dt field")) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableConstraint() { + sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL, col1 STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + Assert.assertEquals(ImmutableSet.of(), schemaBefore.identifierFieldNames()); + + sql("ALTER TABLE tl ADD (PRIMARY KEY (id) NOT ENFORCED)"); + Schema schemaAfterAdd = table("tl").schema(); + Assert.assertEquals(ImmutableSet.of("id"), schemaAfterAdd.identifierFieldNames()); + + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (dt) NOT ENFORCED)"); + Schema schemaAfterModify = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaAfterModify.asStruct()); + Assert.assertEquals(ImmutableSet.of("dt"), schemaAfterModify.identifierFieldNames()); + + // Composite primary key + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, dt) NOT ENFORCED)"); + Schema schemaAfterComposite = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaAfterComposite.asStruct()); + Assert.assertEquals(ImmutableSet.of("id", "dt"), schemaAfterComposite.identifierFieldNames()); + + // Setting an optional field as primary key should fail + // because Iceberg's SchemaUpdate does not allow incompatible changes. + Assertions.assertThatThrownBy( + () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (col1) NOT ENFORCED)")) + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Cannot add field col1 as an identifier field: not a required field"); + + // Setting a composite key containing an optional field should fail + // because Iceberg's SchemaUpdate does not allow incompatible changes. + Assertions.assertThatThrownBy( + () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, col1) NOT ENFORCED)")) + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Cannot add field col1 as an identifier field: not a required field"); + + // Dropping constraints is not supported yet + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP PRIMARY KEY")) + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(UnsupportedOperationException.class) + .hasRootCauseMessage("Unsupported table change: DropConstraint."); } @Test