From 732ac7e32049871c9240a105c0e9d6969787a1ba Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 18 Sep 2024 16:48:36 +0900 Subject: [PATCH] Add support for branching in Iceberg --- .../trino/plugin/iceberg/IcebergMetadata.java | 237 +++++++++-- .../trino/plugin/iceberg/IcebergModule.java | 6 + .../iceberg/IcebergPageSinkProvider.java | 3 + .../plugin/iceberg/IcebergTableHandle.java | 19 +- .../iceberg/IcebergTableProperties.java | 12 + .../iceberg/IcebergWritableTableHandle.java | 5 +- .../procedure/CreateBranchProcedure.java | 42 ++ .../procedure/DropBranchProcedure.java | 42 ++ .../procedure/FastForwardProcedure.java | 47 +++ .../procedure/IcebergCreateBranchHandle.java | 25 ++ .../procedure/IcebergDropBranchHandle.java | 25 ++ .../procedure/IcebergFastForwardHandle.java | 26 ++ .../procedure/IcebergProcedureHandle.java | 3 + .../procedure/IcebergTableProcedureId.java | 3 + .../plugin/iceberg/TestIcebergBranching.java | 382 ++++++++++++++++++ ...stIcebergNodeLocalDynamicSplitPruning.java | 4 + .../iceberg/TestIcebergSplitSource.java | 1 + ...TestConnectorPushdownRulesWithIceberg.java | 4 + 18 files changed, 861 insertions(+), 25 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 316b793c24b6..d7cc5f0e17bd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -49,8 +49,11 @@ import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle; import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle; +import io.trino.plugin.iceberg.procedure.IcebergCreateBranchHandle; +import io.trino.plugin.iceberg.procedure.IcebergDropBranchHandle; import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle; import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle; +import io.trino.plugin.iceberg.procedure.IcebergFastForwardHandle; import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle; import io.trino.plugin.iceberg.procedure.IcebergOptimizeManifestsHandle; import io.trino.plugin.iceberg.procedure.IcebergRemoveOrphanFilesHandle; @@ -237,6 +240,7 @@ import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.base.util.ExecutorUtil.processWithAdditionalThreads; +import static io.trino.plugin.base.util.Functions.checkFunctionArgument; import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; import static io.trino.plugin.hive.HiveMetadata.TRANSACTIONAL; import static io.trino.plugin.hive.HiveTimestampPrecision.DEFAULT_PRECISION; @@ -329,19 +333,24 @@ import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.TRINO_QUERY_START_TIME; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES_FROM_TABLE; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.CREATE_BRANCH; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_BRANCH; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_EXTENDED_STATS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.FAST_FORWARD; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE_MANIFESTS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_SNAPSHOT; import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFiles; import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFilesFromTable; +import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; +import static io.trino.spi.StandardErrorCode.NOT_FOUND; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; @@ -375,6 +384,7 @@ import static org.apache.iceberg.MetadataTableType.ENTRIES; import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations; import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations; +import static org.apache.iceberg.SnapshotRef.MAIN_BRANCH; import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; @@ -517,7 +527,7 @@ public ConnectorTableHandle getTableHandle( BaseTable storageTable = catalog.getMaterializedViewStorageTable(session, materializedViewName) .orElseThrow(() -> new TrinoException(TABLE_NOT_FOUND, "Storage table metadata not found for materialized view " + tableName)); - return tableHandleForCurrentSnapshot(session, tableName, storageTable); + return tableHandleForCurrentSnapshot(session, tableName, storageTable, Optional.empty()); } if (!isDataTable(tableName.getTableName())) { @@ -541,20 +551,29 @@ public ConnectorTableHandle getTableHandle( throw e; } + Optional branch = Optional.empty(); if (endVersion.isPresent()) { - long snapshotId = getSnapshotIdFromVersion(session, table, endVersion.get()); + ConnectorTableVersion version = endVersion.get(); + long snapshotId = getSnapshotIdFromVersion(session, table, version); + Optional partitionSpec = Optional.empty(); + branch = getBranch(version); + if (branch.isPresent()) { + int schemaId = table.snapshot(snapshotId).schemaId(); + partitionSpec = Optional.ofNullable(table.specs().get(schemaId)); + } return tableHandleForSnapshot( session, tableName, table, Optional.of(snapshotId), schemaFor(table, snapshotId), - Optional.empty()); + partitionSpec, + branch); } - return tableHandleForCurrentSnapshot(session, tableName, table); + return tableHandleForCurrentSnapshot(session, tableName, table, branch); } - private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession session, SchemaTableName tableName, BaseTable table) + private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession session, SchemaTableName tableName, BaseTable table, Optional branch) { return tableHandleForSnapshot( session, @@ -562,7 +581,8 @@ private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession sessio table, Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId), table.schema(), - Optional.of(table.spec())); + Optional.of(table.spec()), + branch); } private IcebergTableHandle tableHandleForSnapshot( @@ -571,7 +591,8 @@ private IcebergTableHandle tableHandleForSnapshot( BaseTable table, Optional tableSnapshotId, Schema tableSchema, - Optional partitionSpec) + Optional partitionSpec, + Optional branch) { Map tableProperties = table.properties(); return new IcebergTableHandle( @@ -591,6 +612,7 @@ private IcebergTableHandle tableHandleForSnapshot( table.location(), table.properties(), getTablePartitioning(session, table), + branch, false, Optional.empty(), ImmutableSet.of(), @@ -629,7 +651,7 @@ private Optional getTablePartitioning(ConnectorSession IntStream.range(0, partitioningHandle.partitionFunctions().size()).boxed().collect(toImmutableList()))); } - private static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version) + public static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version) { io.trino.spi.type.Type versionType = version.getVersionType(); return switch (version.getPointerType()) { @@ -638,6 +660,21 @@ private static long getSnapshotIdFromVersion(ConnectorSession session, Table tab }; } + public static Optional getBranch(ConnectorTableVersion version) + { + io.trino.spi.type.Type versionType = version.getVersionType(); + return switch (version.getPointerType()) { + case TEMPORAL -> Optional.empty(); + case TARGET_ID -> { + if (versionType instanceof VarcharType) { + String refName = ((Slice) version.getVersion()).toStringUtf8(); + yield Optional.of(refName); + } + yield Optional.empty(); + } + }; + } + private static long getTargetSnapshotIdFromVersion(Table table, ConnectorTableVersion version, io.trino.spi.type.Type versionType) { long snapshotId; @@ -1235,7 +1272,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con "Cannot create a table on a non-empty location: %s, set 'iceberg.unique-table-location=true' in your Iceberg catalog properties " + "to use unique table locations for every table.", location)); } - return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), retryMode); + return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), SchemaParser.toJson(transaction.table().schema()), Optional.empty(), retryMode); } catch (IOException e) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e); @@ -1337,12 +1374,15 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto { IcebergTableHandle table = (IcebergTableHandle) tableHandle; Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Optional branch = table.getBranch(); - validateNotModifyingOldSnapshot(table, icebergTable); + branch.ifPresentOrElse( + name -> checkBranch(icebergTable, name), + () -> validateNotModifyingOldSnapshot(table, icebergTable)); beginTransaction(icebergTable); - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), branch, retryMode); } private List getChildNamespaces(ConnectorSession session, String parentNamespace) @@ -1358,20 +1398,22 @@ private List getChildNamespaces(ConnectorSession session, String parentN .collect(toImmutableList()); } - private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table, RetryMode retryMode) + private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName schemaTableName, Table table, String schemaAsJson, Optional branch, RetryMode retryMode) { + Schema schema = SchemaParser.fromJson(schemaAsJson); return new IcebergWritableTableHandle( - name, - SchemaParser.toJson(table.schema()), + schemaTableName, + schemaAsJson, transformValues(table.specs(), PartitionSpecParser::toJson), table.spec().specId(), - getSupportedSortFields(table.schema(), table.sortOrder()), - getProjectedColumns(table.schema(), typeManager), + getSupportedSortFields(schema, table.sortOrder()), + getProjectedColumns(schema, typeManager), table.location(), getFileFormat(table), table.properties(), retryMode, - table.io().properties()); + table.io().properties(), + branch); } private static List getSupportedSortFields(Schema schema, SortOrder sortOrder) @@ -1442,6 +1484,7 @@ public Optional finishInsert( } appendFiles.appendFile(builder.build()); + table.branch().ifPresent(appendFiles::toBranch); writtenFiles.add(task.path()); } @@ -1452,7 +1495,7 @@ public Optional finishInsert( commitUpdateAndTransaction(appendFiles, session, transaction, "insert"); // TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer - long newSnapshotId = transaction.table().currentSnapshot().snapshotId(); + long newSnapshotId = table.branch().isEmpty() ? transaction.table().currentSnapshot().snapshotId() : transaction.table().refs().get(table.branch().get()).snapshotId(); transaction = null; // TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats atomically @@ -1604,6 +1647,9 @@ public Optional getTableHandleForExecute( case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties); case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties); case ADD_FILES_FROM_TABLE -> getTableHandleForAddFilesFromTable(session, accessControl, tableHandle, executeProperties); + case CREATE_BRANCH -> getTableHandleForCreateBranch(session, accessControl, tableHandle, executeProperties); + case DROP_BRANCH -> getTableHandleForDropBranch(session, accessControl, tableHandle, executeProperties); + case FAST_FORWARD -> getTableHandleForFastForward(session, tableHandle, executeProperties); }; } @@ -1684,6 +1730,51 @@ private Optional getTableHandleForRemoveOrphanFiles icebergTable.io().properties())); } + private Optional getTableHandleForCreateBranch(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) + { + String name = (String) executeProperties.get("name"); + accessControl.checkCanCreateBranchAndTag(null, tableHandle.getSchemaTableName(), name); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + CREATE_BRANCH, + new IcebergCreateBranchHandle(name), + icebergTable.location(), + icebergTable.io().properties())); + } + + private Optional getTableHandleForDropBranch(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) + { + String name = (String) executeProperties.get("name"); + accessControl.checkCanDropBranchAndTag(null, tableHandle.getSchemaTableName(), name); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + DROP_BRANCH, + new IcebergDropBranchHandle(name), + icebergTable.location(), + icebergTable.io().properties())); + } + + private Optional getTableHandleForFastForward(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) + { + String from = (String) executeProperties.get("from"); + String to = (String) executeProperties.get("to"); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + if (!icebergTable.refs().containsKey(from)) { + throw new TrinoException(ALREADY_EXISTS, "Branch '%s' does not exit".formatted(from)); + } + if (!icebergTable.refs().containsKey(to)) { + throw new TrinoException(ALREADY_EXISTS, "Branch '%s' does not exit".formatted(to)); + } + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + FAST_FORWARD, + new IcebergFastForwardHandle(from, to), + icebergTable.location(), + icebergTable.io().properties())); + } + private Optional getTableHandleForAddFiles(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) { if (!addFilesProcedureEnabled) { @@ -1816,6 +1907,9 @@ public Optional getLayoutForTableExecute(ConnectorSession case REMOVE_ORPHAN_FILES: case ADD_FILES: case ADD_FILES_FROM_TABLE: + case CREATE_BRANCH: + case DROP_BRANCH: + case FAST_FORWARD: // handled via executeTableExecute } throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'"); @@ -1847,6 +1941,9 @@ public BeginTableExecuteResult> readerForManifest(Table table, ManifestFile manifest) { return switch (manifest.content()) { @@ -2982,11 +3149,15 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT verifyTableVersionForUpdate(table); Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); - validateNotModifyingOldSnapshot(table, icebergTable); + Optional branch = table.getBranch(); + + branch.ifPresentOrElse( + name -> checkBranch(icebergTable, name), + () -> validateNotModifyingOldSnapshot(table, icebergTable)); beginTransaction(icebergTable); - IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), branch, retryMode); return new IcebergMergeTableHandle(table, insertHandle); } @@ -2995,8 +3166,9 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg { IcebergMergeTableHandle mergeHandle = (IcebergMergeTableHandle) mergeTableHandle; IcebergTableHandle handle = mergeHandle.getTableHandle(); + Optional branch = mergeHandle.getInsertTableHandle().branch(); RetryMode retryMode = mergeHandle.getInsertTableHandle().retryMode(); - finishWrite(session, handle, fragments, retryMode); + finishWrite(session, handle, branch, fragments, retryMode); } private static void verifyTableVersionForUpdate(IcebergTableHandle table) @@ -3013,7 +3185,7 @@ private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Ta } } - private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection fragments, RetryMode retryMode) + private void finishWrite(ConnectorSession session, IcebergTableHandle table, Optional branch, Collection fragments, RetryMode retryMode) { Table icebergTable = transaction.table(); @@ -3031,6 +3203,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); RowDelta rowDelta = transaction.newRowDelta(); + branch.ifPresent(rowDelta::toBranch); table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); TupleDomain convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertibleToIcebergExpression(domain)); @@ -3165,6 +3338,10 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle DeleteFiles deleteFiles = icebergTable.newDelete() .deleteFromRowFilter(toIcebergExpression(handle.getEnforcedPredicate())); + handle.getBranch().ifPresent(branch -> { + checkBranch(icebergTable, branch); + deleteFiles.toBranch(branch); + }); commitUpdate(deleteFiles, session, "delete"); Map summary = icebergTable.currentSnapshot().summary(); @@ -3223,6 +3400,7 @@ public Optional> applyLimit(Connect table.getTableLocation(), table.getStorageProperties(), table.getTablePartitioning(), + table.getBranch(), table.isRecordScannedFiles(), table.getMaxScannedFileSize(), table.getConstraintColumns(), @@ -3332,6 +3510,7 @@ else if (isMetadataColumnId(columnHandle.getId())) { table.getTableLocation(), table.getStorageProperties(), table.getTablePartitioning(), + table.getBranch(), table.isRecordScannedFiles(), table.getMaxScannedFileSize(), newConstraintColumns, @@ -3483,6 +3662,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab originalHandle.getTableLocation(), originalHandle.getStorageProperties(), Optional.empty(), // requiredTablePartitioning does not affect stats + originalHandle.getBranch(), false, // recordScannedFiles does not affect stats originalHandle.getMaxScannedFileSize(), ImmutableSet.of(), // constraintColumns do not affect stats @@ -3578,7 +3758,7 @@ && getOnlyElement(sourceTableHandles) instanceof IcebergTableHandle handle fromSnapshotForRefresh = Optional.of(Long.parseLong(sourceTable.getValue())); } - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), Optional.empty(), retryMode); } @Override @@ -3994,4 +4174,15 @@ private static TableStatistics mergeColumnStatistics(TableStatistics currentStat newStats.getColumnStatistics().forEach(statisticsBuilder::setColumnStatistics); return statisticsBuilder.build(); } + + private static void checkBranch(Table table, String branch) + { + SnapshotRef ref = table.refs().get(branch); + if (ref == null) { + throw new TrinoException(NOT_FOUND, "Branch does not exist: '%s'".formatted(branch)); + } + if (ref.isTag()) { + throw new TrinoException(NOT_SUPPORTED, "Branch '%s' does not exist, but a tag with that name exists".formatted(branch)); + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index ea88078bf1d8..4067e1437e90 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -44,8 +44,11 @@ import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider; import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure; import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure; +import io.trino.plugin.iceberg.procedure.CreateBranchProcedure; +import io.trino.plugin.iceberg.procedure.DropBranchProcedure; import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure; import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure; +import io.trino.plugin.iceberg.procedure.FastForwardProcedure; import io.trino.plugin.iceberg.procedure.OptimizeManifestsTableProcedure; import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure; import io.trino.plugin.iceberg.procedure.RegisterTableProcedure; @@ -138,6 +141,9 @@ public void configure(Binder binder) tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(CreateBranchProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(DropBranchProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(FastForwardProcedure.class).in(Scopes.SINGLETON); newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index bc4d694483f7..afdd7ec943a2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -153,6 +153,9 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa case REMOVE_ORPHAN_FILES: case ADD_FILES: case ADD_FILES_FROM_TABLE: + case CREATE_BRANCH: + case DROP_BRANCH: + case FAST_FORWARD: // handled via ConnectorMetadata.executeTableExecute } throw new IllegalArgumentException("Unknown procedure: " + executeHandle.procedureId()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index 21e1e0535e1e..c4b41761fce7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -49,6 +49,7 @@ public class IcebergTableHandle private final int formatVersion; private final String tableLocation; private final Map storageProperties; + private final Optional branch; // Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector private final TupleDomain unenforcedPredicate; @@ -92,7 +93,8 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( @JsonProperty("projectedColumns") Set projectedColumns, @JsonProperty("nameMappingJson") Optional nameMappingJson, @JsonProperty("tableLocation") String tableLocation, - @JsonProperty("storageProperties") Map storageProperties) + @JsonProperty("storageProperties") Map storageProperties, + @JsonProperty("branch") Optional branch) { return new IcebergTableHandle( catalog, @@ -111,6 +113,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( tableLocation, storageProperties, Optional.empty(), + branch, false, Optional.empty(), ImmutableSet.of(), @@ -134,6 +137,7 @@ public IcebergTableHandle( String tableLocation, Map storageProperties, Optional tablePartitioning, + Optional branch, boolean recordScannedFiles, Optional maxScannedFileSize, Set constraintColumns, @@ -155,6 +159,7 @@ public IcebergTableHandle( this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); this.tablePartitioning = requireNonNull(tablePartitioning, "tablePartitioning is null"); + this.branch = requireNonNull(branch, "branch is null"); this.recordScannedFiles = recordScannedFiles; this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null")); @@ -261,6 +266,12 @@ public Optional getTablePartitioning() return tablePartitioning; } + @JsonProperty + public Optional getBranch() + { + return branch; + } + @JsonIgnore public boolean isRecordScannedFiles() { @@ -314,6 +325,7 @@ public IcebergTableHandle withProjectedColumns(Set projecte tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, maxScannedFileSize, constraintColumns, @@ -339,6 +351,7 @@ public IcebergTableHandle forAnalyze() tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, maxScannedFileSize, constraintColumns, @@ -364,6 +377,7 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, Optional.of(maxScannedFileSize), constraintColumns, @@ -389,6 +403,7 @@ public IcebergTableHandle withTablePartitioning(Optional SUPPORTED_PROPERTIES = ImmutableSet.builder() @@ -76,6 +77,7 @@ public class IcebergTableProperties .add(ORC_BLOOM_FILTER_FPP_PROPERTY) .add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY) .add(DATA_LOCATION_PROPERTY) + .add(TARGET_BRANCH_PROPERTY) .add(EXTRA_PROPERTIES_PROPERTY) .add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY) .build(); @@ -203,6 +205,11 @@ public IcebergTableProperties( "File system location URI for the table's data files", null, false)) + .add(stringProperty( + TARGET_BRANCH_PROPERTY, + "Target branch name", + null, + true)) .build(); checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream() @@ -292,6 +299,11 @@ public static Optional getDataLocation(Map tableProperti return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY)); } + public static Optional getTargetBranch(Map tableProperties) + { + return Optional.ofNullable((String) tableProperties.get(TARGET_BRANCH_PROPERTY)); + } + public static Optional> getExtraProperties(Map tableProperties) { return Optional.ofNullable((Map) tableProperties.get(EXTRA_PROPERTIES_PROPERTY)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java index 7f347564c3b7..831deb830771 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -37,7 +38,8 @@ public record IcebergWritableTableHandle( IcebergFileFormat fileFormat, Map storageProperties, RetryMode retryMode, - Map fileIoProperties) + Map fileIoProperties, + Optional branch) implements ConnectorInsertTableHandle, ConnectorOutputTableHandle { public IcebergWritableTableHandle @@ -53,6 +55,7 @@ public record IcebergWritableTableHandle( requireNonNull(retryMode, "retryMode is null"); checkArgument(partitionsSpecsAsJson.containsKey(partitionSpecId), "partitionSpecId missing from partitionSpecs"); fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); + requireNonNull(branch, "branch is null"); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java new file mode 100644 index 000000000000..81cdf9f830b6 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.CREATE_BRANCH; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class CreateBranchProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + CREATE_BRANCH.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "name", + "Branch name", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java new file mode 100644 index 000000000000..123dbe678b90 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_BRANCH; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class DropBranchProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + DROP_BRANCH.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "name", + "Branch name", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java new file mode 100644 index 000000000000..e3ddd8af7f39 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.FAST_FORWARD; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class FastForwardProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + FAST_FORWARD.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "from", + "Branch to fast-forward", + null, + false)) + .add(stringProperty( + "to", + "Ref for the from branch to be fast forwarded to", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java new file mode 100644 index 000000000000..3cd9a0986bfd --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergCreateBranchHandle(String name) + implements IcebergProcedureHandle +{ + public IcebergCreateBranchHandle + { + requireNonNull(name, "name is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java new file mode 100644 index 000000000000..27de2deb8385 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergDropBranchHandle(String name) + implements IcebergProcedureHandle +{ + public IcebergDropBranchHandle + { + requireNonNull(name, "name is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java new file mode 100644 index 000000000000..23dc9daf0e45 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergFastForwardHandle(String from, String to) + implements IcebergProcedureHandle +{ + public IcebergFastForwardHandle + { + requireNonNull(from, "from is null"); + requireNonNull(to, "to is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java index bd2c64e0778c..22f970bc9a63 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java @@ -28,5 +28,8 @@ @JsonSubTypes.Type(value = IcebergRemoveOrphanFilesHandle.class, name = "remove_orphan_files"), @JsonSubTypes.Type(value = IcebergAddFilesHandle.class, name = "add_files"), @JsonSubTypes.Type(value = IcebergAddFilesFromTableHandle.class, name = "add_files_from_table"), + @JsonSubTypes.Type(value = IcebergCreateBranchHandle.class, name = "create_branch"), + @JsonSubTypes.Type(value = IcebergDropBranchHandle.class, name = "drop_branch"), + @JsonSubTypes.Type(value = IcebergFastForwardHandle.class, name = "fast_forward"), }) public interface IcebergProcedureHandle {} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java index 6230f8b779b6..b30777caf2ed 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java @@ -23,4 +23,7 @@ public enum IcebergTableProcedureId REMOVE_ORPHAN_FILES, ADD_FILES, ADD_FILES_FROM_TABLE, + CREATE_BRANCH, + DROP_BRANCH, + FAST_FORWARD, } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java new file mode 100644 index 000000000000..98fccc939007 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java @@ -0,0 +1,382 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.metastore.HiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Table; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +final class TestIcebergBranching + extends AbstractTestQueryFramework +{ + private HiveMetastore metastore; + private TrinoFileSystemFactory fileSystemFactory; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + QueryRunner queryRunner = IcebergQueryRunner.builder().build(); + metastore = getHiveMetastore(queryRunner); + fileSystemFactory = getFileSystemFactory(queryRunner); + return queryRunner; + } + + @Test + void testCreateBranch() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_branch", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + assertBranch(table.getName(), "main", "test-branch"); + + createBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main", "test-branch", "TEST-BRANCH"); + + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE create_branch('test-branch')", "Branch 'test-branch' already exists"); + } + } + + @Test + void testDropBranch() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_drop_branch", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + createBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main", "test-branch", "TEST-BRANCH"); + + dropBranch(table.getName(), "test-branch"); + dropBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main"); + + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE drop_branch('test-branch')", "Branch 'test-branch' does not exit"); + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE drop_branch('main')", "Cannot drop 'main' branch"); + } + } + + @Test + void testFastForward() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_fast_forward", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES 1, 2, 3", 3); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + fastForward(table.getName(), "main", "test-branch"); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(3L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('non-existing-branch', 'main')", + "Branch 'non-existing-branch' does not exit"); + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('main', 'non-existing-branch')", + "Branch 'non-existing-branch' does not exit"); + } + } + + @Test + void testFastForwardNotAncestor() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_fast_forward", "(x int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("INSERT INTO " + table.getName() + " VALUES 1", 1); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES 1, 2, 3", 3); + + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('main', 'test-branch')", + "Branch 'main' is not an ancestor of 'test-branch'"); + } + } + + @Test + void testInsert() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert_into_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + // insert into main (default) branch + assertUpdate("INSERT INTO " + table.getName() + " @ 'main' VALUES (1, 2)", 1); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(0L); + + // insert into another branch + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (10, 20), (30, 40)", 2); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(2L); + + // insert into another branch with a partial column + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' (x) VALUES 50", 1); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertQueryFails( + "INSERT INTO " + table.getName() + " @ 'non-existing' VALUES (1, 2, 3)", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testInsertAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert_into_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2)", 1); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN z int"); + + assertQueryFails( + "INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 2, 3)", + "\\Qline 1:1: Insert query has mismatched column types: Table: [integer, integer], Query: [integer, integer, integer]"); + + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' SELECT x + 10, y + 10 FROM " + table.getName(), 1); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (1, 2, CAST(NULL AS integer))"); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES (11, 12, CAST(NULL AS integer))"); + } + } + + @Test + void testInsertIntoTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "INSERT INTO " + table.getName() + " @ 'test-tag' VALUES (1, 2)", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + @Test + void testDelete() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_from_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 10), (2, 20), (3, 30)", 3); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertUpdate("DELETE FROM " + table.getName() + " @ 'test-branch'"); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(0L); + + assertQueryFails( + "DELETE FROM " + table.getName() + " @ 'non-existing'", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testDeleteAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_from_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 10), (2, 20), (3, 30)", 3); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + + // TODO This should be fixed after once https://github.com/trinodb/trino/issues/23601 is resolved + assertThat(query("DELETE FROM " + table.getName() + " @ 'test-branch' WHERE y = 30")).nonTrinoExceptionFailure() + .hasMessageContaining("Invalid metadata file") + .hasStackTraceContaining("Cannot find field 'y'"); + + // branch returns the latest schema once a new snapshot is created + assertUpdate("DELETE FROM " + table.getName() + " @ 'test-branch' WHERE x = 1", 1); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 2, 3"); + } + } + + @Test + void testDeleteFromTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "DELETE FROM " + table.getName() + " @ 'test-tag'", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + @Test + void testUpdate() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_branch", "(x int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES 1, 2, 3", 3); + + assertUpdate("UPDATE " + table.getName() + " @ 'test-branch' SET x = x * 2", 3); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 2, 4, 6"); + + assertQueryFails( + "UPDATE " + table.getName() + " @ 'non-existing' SET x = x * 2", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testUpdateAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 10), (2, 20), (3, 30)", 3); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + assertUpdate("UPDATE " + table.getName() + " @ 'test-branch' SET y = 10", 3); + + // branch returns the latest schema once a new snapshot is created + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1, 2, 3"); + } + } + + @Test + void testUpdateTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "UPDATE " + table.getName() + " @ 'test-tag' SET x = 2", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + @Test + void testMerge() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_merge_branch", "(x int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("MERGE INTO " + table.getName() + " @ 'test-branch' USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1)", 1); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1"); + + assertUpdate("MERGE INTO " + table.getName() + " @ 'test-branch' USING (VALUES 42) t(dummy) ON true " + + " WHEN MATCHED THEN UPDATE SET x = 10", 1); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 10"); + + assertQueryFails( + "MERGE INTO " + table.getName() + " @ 'not-existing' USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1)", + "Cannot find snapshot with reference name: not-existing"); + } + } + + @Test + void testMergeAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_merge_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + assertUpdate("MERGE INTO " + table.getName() + " @ 'test-branch' USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1, 2)", 1); + + // branch returns the latest schema once a new snapshot is created + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1"); + } + } + + @Test + void testMergeIntoTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "MERGE INTO " + table.getName() + " @ 'test-tag' USING (VALUES 42) t(dummy) ON false WHEN NOT MATCHED THEN INSERT VALUES (1, 2)", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + private void createBranch(String table, String branch) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE create_branch('" + branch + "')"); + } + + private void dropBranch(String table, String branch) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE drop_branch('" + branch + "')"); + } + + private void fastForward(String table, String from, String to) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE fast_forward('" + from + "', '" + to + "')"); + } + + private void createTag(String table, String tag) + { + BaseTable icebergTable = loadTable(table); + icebergTable.manageSnapshots() + .createTag(tag, icebergTable.currentSnapshot().snapshotId()) + .commit(); + } + + private void assertBranch(String tableName, String... branchNames) + { + Table table = loadTable(tableName); + table.refresh(); + assertThat(table.refs()).containsOnlyKeys(branchNames); + } + + private BaseTable loadTable(String tableName) + { + return IcebergTestUtils.loadTable(tableName, metastore, fileSystemFactory, "iceberg", "tpch"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 01bcecfe8d50..e55852b6f8a4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -172,6 +172,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -233,6 +234,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -344,6 +346,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -506,6 +509,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilterPartitionEvolution tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index a7fb5fc85859..faed01901ce3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -490,6 +490,7 @@ private static IcebergTableHandle createTableHandle(SchemaTableName schemaTableN nationTable.location(), nationTable.properties(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index 37ebe5d04f8d..5e0f4ea829bd 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -176,6 +176,7 @@ public void testProjectionPushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -261,6 +262,7 @@ public void testPredicatePushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -313,6 +315,7 @@ public void testColumnPruningProjectionPushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -375,6 +378,7 @@ public void testPushdownWithDuplicateExpressions() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(),