From f9a18b461a05098e8f3f57fffc2cf9c595924b10 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 18 Sep 2024 17:16:45 +0900 Subject: [PATCH 1/3] Add support for DML with properties in engine --- .../antlr4/io/trino/grammar/sql/SqlBase.g4 | 10 ++++--- .../trino/sql/analyzer/StatementAnalyzer.java | 22 ++++++++++---- .../trino/metadata/AbstractMockMetadata.java | 5 +++- .../main/java/io/trino/sql/SqlFormatter.java | 14 +++++++-- .../java/io/trino/sql/parser/AstBuilder.java | 29 +++++++++++++++---- .../main/java/io/trino/sql/tree/Table.java | 28 +++++++++++++----- .../io/trino/sql/parser/TestSqlParser.java | 4 +-- 7 files changed, 86 insertions(+), 26 deletions(-) diff --git a/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 b/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 index c11f6b901dfd..f47878e31114 100644 --- a/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 +++ b/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 @@ -71,8 +71,10 @@ statement (COMMENT string)? (WITH properties)? #createTable | DROP TABLE (IF EXISTS)? qualifiedName #dropTable - | INSERT INTO qualifiedName columnAliases? rootQuery #insertInto - | DELETE FROM qualifiedName (WHERE booleanExpression)? #delete + | INSERT INTO qualifiedName ('@' branch=string)? + columnAliases? rootQuery #insertInto + | DELETE FROM qualifiedName ('@' branch=string)? + (WHERE booleanExpression)? #delete | TRUNCATE TABLE qualifiedName #truncateTable | COMMENT ON TABLE qualifiedName IS (string | NULL) #commentTable | COMMENT ON VIEW qualifiedName IS (string | NULL) #commentView @@ -192,10 +194,10 @@ statement | DESCRIBE OUTPUT identifier #describeOutput | SET PATH pathSpecification #setPath | SET TIME ZONE (LOCAL | expression) #setTimeZone - | UPDATE qualifiedName + | UPDATE qualifiedName ('@' branch=string)? SET updateAssignment (',' updateAssignment)* (WHERE where=booleanExpression)? #update - | MERGE INTO qualifiedName (AS? identifier)? + | MERGE INTO qualifiedName (AS? identifier)? ('@' branch=string)? USING relation ON expression mergeCase+ #merge ; diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 7f119f553f5e..026d41653507 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -301,6 +301,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getLast; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.airlift.slice.Slices.utf8Slice; import static io.trino.SystemSessionProperties.getMaxGroupingSets; import static io.trino.metadata.FunctionResolver.toPath; import static io.trino.metadata.GlobalFunctionCatalog.isBuiltinFunctionName; @@ -363,6 +364,7 @@ import static io.trino.spi.StandardErrorCode.VIEW_IS_RECURSIVE; import static io.trino.spi.StandardErrorCode.VIEW_IS_STALE; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH; +import static io.trino.spi.connector.PointerType.TARGET_ID; import static io.trino.spi.connector.StandardWarningCode.REDUNDANT_ORDER_BY; import static io.trino.spi.function.FunctionKind.AGGREGATE; import static io.trino.spi.function.FunctionKind.WINDOW; @@ -580,7 +582,8 @@ protected Scope visitInsert(Insert insert, Optional scope) Scope queryScope = analyze(insert.getQuery(), Optional.empty(), false); // verify the insert destination columns match the query - RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, targetTable); + Optional endVersion = insert.getTable().getBranch().map(branch -> new TableVersion(TARGET_ID, VARCHAR, utf8Slice(branch))); + RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, targetTable, Optional.empty(), endVersion); Optional targetTableHandle = redirection.tableHandle(); targetTable = redirection.redirectedTableName().orElse(targetTable); if (targetTableHandle.isEmpty()) { @@ -821,7 +824,8 @@ protected Scope visitDelete(Delete node, Optional scope) throw semanticException(NOT_SUPPORTED, node, "Deleting from views is not supported"); } - RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName); + Optional endVersion = node.getTable().getBranch().map(branch -> new TableVersion(TARGET_ID, VARCHAR, utf8Slice(branch))); + RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName, Optional.empty(), endVersion); QualifiedObjectName tableName = redirection.redirectedTableName().orElse(originalName); TableHandle handle = redirection.tableHandle() .orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName)); @@ -3368,7 +3372,8 @@ protected Scope visitUpdate(Update update, Optional scope) analysis.setUpdateType("UPDATE"); - RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName); + Optional endVersion = update.getTable().getBranch().map(branch -> new TableVersion(TARGET_ID, VARCHAR, utf8Slice(branch))); + RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalName, Optional.empty(), endVersion); QualifiedObjectName tableName = redirection.redirectedTableName().orElse(originalName); TableHandle handle = redirection.tableHandle() .orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName)); @@ -3498,7 +3503,8 @@ protected Scope visitMerge(Merge merge, Optional scope) analysis.setUpdateType("MERGE"); - RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalTableName); + Optional endVersion = merge.getTargetTable().getBranch().map(branch -> new TableVersion(TARGET_ID, VARCHAR, utf8Slice(branch))); + RedirectionAwareTableHandle redirection = metadata.getRedirectionAwareTableHandle(session, originalTableName, Optional.empty(), endVersion); QualifiedObjectName tableName = redirection.redirectedTableName().orElse(originalTableName); TableHandle targetTableHandle = redirection.tableHandle() .orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName)); @@ -5854,11 +5860,17 @@ private OutputColumn createOutputColumn(Field field) private RedirectionAwareTableHandle getTableHandle(Table table, QualifiedObjectName name, Optional scope) { if (table.getQueryPeriod().isPresent()) { + verify(table.getBranch().isEmpty(), "branch must be empty"); Optional startVersion = extractTableVersion(table, table.getQueryPeriod().get().getStart(), scope); Optional endVersion = extractTableVersion(table, table.getQueryPeriod().get().getEnd(), scope); return metadata.getRedirectionAwareTableHandle(session, name, startVersion, endVersion); } - return metadata.getRedirectionAwareTableHandle(session, name); + if (table.getBranch().isPresent()) { + verify(table.getQueryPeriod().isEmpty(), "query period must be empty"); + Optional endVersion = table.getBranch().map(branch -> new TableVersion(TARGET_ID, VARCHAR, utf8Slice(branch))); + return metadata.getRedirectionAwareTableHandle(session, name, Optional.empty(), endVersion); + } + return metadata.getRedirectionAwareTableHandle(session, name, Optional.empty(), Optional.empty()); } /** diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index 283e73dd6da8..f9c552b9aa49 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -1006,7 +1006,10 @@ public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session sessio @Override public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion) { - throw new UnsupportedOperationException(); + if (startVersion.isPresent() || endVersion.isPresent()) { + throw new UnsupportedOperationException(); + } + return noRedirection(getTableHandle(session, tableName)); } @Override diff --git a/core/trino-parser/src/main/java/io/trino/sql/SqlFormatter.java b/core/trino-parser/src/main/java/io/trino/sql/SqlFormatter.java index 4b860d5eb9c2..893abf435cf4 100644 --- a/core/trino-parser/src/main/java/io/trino/sql/SqlFormatter.java +++ b/core/trino-parser/src/main/java/io/trino/sql/SqlFormatter.java @@ -1080,6 +1080,8 @@ protected Void visitMerge(Merge node, Integer indent) builder.append("MERGE INTO ") .append(formatName(node.getTargetTable().getName())); + node.getTargetTable().getBranch().ifPresent(branch -> builder.append("@").append(branch)); + node.getTargetAlias().ifPresent(value -> builder .append(' ') .append(formatName(value))); @@ -1464,6 +1466,8 @@ protected Void visitDelete(Delete node, Integer indent) builder.append("DELETE FROM ") .append(formatName(node.getTable().getName())); + node.getTable().getBranch().ifPresent(branch -> builder.append("@").append(branch)); + node.getWhere().ifPresent(where -> builder .append(" WHERE ") .append(formatExpression(where))); @@ -1898,6 +1902,8 @@ protected Void visitInsert(Insert node, Integer indent) builder.append("INSERT INTO ") .append(formatName(node.getTarget())); + node.getTable().getBranch().ifPresent(branch -> builder.append("@").append(branch)); + node.getColumns().ifPresent(columns -> builder .append(" (") .append(Joiner.on(", ").join(columns)) @@ -1914,8 +1920,12 @@ protected Void visitInsert(Insert node, Integer indent) protected Void visitUpdate(Update node, Integer indent) { builder.append("UPDATE ") - .append(formatName(node.getTable().getName())) - .append(" SET"); + .append(formatName(node.getTable().getName())); + + node.getTable().getBranch().ifPresent(branch -> builder.append("@").append(branch)); + + builder.append(" SET"); + int setCounter = node.getAssignments().size() - 1; for (UpdateAssignment assignment : node.getAssignments()) { builder.append("\n") diff --git a/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java b/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java index 15f8d6dfa1ea..f154124649f4 100644 --- a/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java +++ b/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java @@ -654,6 +654,11 @@ public Node visitDropView(SqlBaseParser.DropViewContext context) @Override public Node visitInsertInto(SqlBaseParser.InsertIntoContext context) { + Optional branch = Optional.empty(); + if (context.branch != null) { + branch = Optional.of(visitString(context.branch).getValue()); + } + Optional> columnAliases = Optional.empty(); if (context.columnAliases() != null) { columnAliases = Optional.of(visit(context.columnAliases().identifier(), Identifier.class)); @@ -661,7 +666,7 @@ public Node visitInsertInto(SqlBaseParser.InsertIntoContext context) return new Insert( getLocation(context), - new Table(getQualifiedName(context.qualifiedName())), + new Table(getLocation(context), getQualifiedName(context.qualifiedName()), branch), columnAliases, (Query) visit(context.rootQuery())); } @@ -669,18 +674,27 @@ public Node visitInsertInto(SqlBaseParser.InsertIntoContext context) @Override public Node visitDelete(SqlBaseParser.DeleteContext context) { + Optional branch = Optional.empty(); + if (context.branch != null) { + branch = Optional.of(visitString(context.branch).getValue()); + } + return new Delete( getLocation(context), - new Table(getLocation(context), getQualifiedName(context.qualifiedName())), + new Table(getLocation(context), getQualifiedName(context.qualifiedName()), branch), visitIfPresent(context.booleanExpression(), Expression.class)); } @Override public Node visitUpdate(SqlBaseParser.UpdateContext context) { + Optional branch = Optional.empty(); + if (context.branch != null) { + branch = Optional.of(visitString(context.branch).getValue()); + } return new Update( getLocation(context), - new Table(getLocation(context), getQualifiedName(context.qualifiedName())), + new Table(getLocation(context), getQualifiedName(context.qualifiedName()), branch), visit(context.updateAssignment(), UpdateAssignment.class), visitIfPresent(context.booleanExpression(), Expression.class)); } @@ -700,7 +714,12 @@ public Node visitTruncateTable(SqlBaseParser.TruncateTableContext context) @Override public Node visitMerge(SqlBaseParser.MergeContext context) { - Table table = new Table(getLocation(context), getQualifiedName(context.qualifiedName())); + Optional branch = Optional.empty(); + if (context.branch != null) { + branch = Optional.of(visitString(context.branch).getValue()); + } + + Table table = new Table(getLocation(context), getQualifiedName(context.qualifiedName()), branch); Relation targetRelation = table; if (context.identifier() != null) { targetRelation = new AliasedRelation(table, (Identifier) visit(context.identifier()), null); @@ -2011,7 +2030,7 @@ public Node visitAliasedRelation(SqlBaseParser.AliasedRelationContext context) public Node visitTableName(SqlBaseParser.TableNameContext context) { if (context.queryPeriod() != null) { - return new Table(getLocation(context), getQualifiedName(context.qualifiedName()), (QueryPeriod) visit(context.queryPeriod())); + return new Table(getLocation(context), getQualifiedName(context.qualifiedName()), (QueryPeriod) visit(context.queryPeriod()), Optional.empty()); } return new Table(getLocation(context), getQualifiedName(context.qualifiedName())); } diff --git a/core/trino-parser/src/main/java/io/trino/sql/tree/Table.java b/core/trino-parser/src/main/java/io/trino/sql/tree/Table.java index 5afb5a438da2..50888229bdbb 100644 --- a/core/trino-parser/src/main/java/io/trino/sql/tree/Table.java +++ b/core/trino-parser/src/main/java/io/trino/sql/tree/Table.java @@ -26,27 +26,34 @@ public class Table { private final QualifiedName name; private final Optional queryPeriod; + private final Optional branch; public Table(QualifiedName name) { - this(Optional.empty(), name, Optional.empty()); + this(Optional.empty(), name, Optional.empty(), Optional.empty()); } public Table(NodeLocation location, QualifiedName name) { - this(Optional.of(location), name, Optional.empty()); + this(Optional.of(location), name, Optional.empty(), Optional.empty()); } - public Table(NodeLocation location, QualifiedName name, QueryPeriod queryPeriod) + public Table(NodeLocation location, QualifiedName name, Optional properties) { - this(Optional.of(location), name, Optional.of(queryPeriod)); + this(Optional.of(location), name, Optional.empty(), properties); } - private Table(Optional location, QualifiedName name, Optional queryPeriod) + public Table(NodeLocation location, QualifiedName name, QueryPeriod queryPeriod, Optional branch) + { + this(Optional.of(location), name, Optional.of(queryPeriod), branch); + } + + private Table(Optional location, QualifiedName name, Optional queryPeriod, Optional branch) { super(location); this.name = name; this.queryPeriod = queryPeriod; + this.branch = branch; } public QualifiedName getName() @@ -75,6 +82,7 @@ public String toString() return toStringHelper(this) .addValue(name) .addValue(queryPeriod) + .addValue(branch) .toString(); } @@ -90,13 +98,14 @@ public boolean equals(Object o) Table table = (Table) o; return Objects.equals(name, table.name) && - Objects.equals(queryPeriod, table.getQueryPeriod()); + Objects.equals(queryPeriod, table.getQueryPeriod()) && + Objects.equals(branch, table.branch); } @Override public int hashCode() { - return Objects.hash(name, queryPeriod); + return Objects.hash(name, queryPeriod, branch); } @Override @@ -114,4 +123,9 @@ public Optional getQueryPeriod() { return queryPeriod; } + + public Optional getBranch() + { + return branch; + } } diff --git a/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParser.java b/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParser.java index b3c35ef28ab0..5210b4264a5d 100644 --- a/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParser.java +++ b/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParser.java @@ -5978,7 +5978,7 @@ public void testQueryPeriod() { Expression rangeValue = new GenericLiteral(location(1, 37), "TIMESTAMP", "2021-03-01 00:00:01"); QueryPeriod queryPeriod = new QueryPeriod(location(1, 17), QueryPeriod.RangeType.TIMESTAMP, rangeValue); - Table table = new Table(location(1, 15), qualifiedName(location(1, 15), "t"), queryPeriod); + Table table = new Table(location(1, 15), qualifiedName(location(1, 15), "t"), queryPeriod, Optional.empty()); assertThat(statement("SELECT * FROM t FOR TIMESTAMP AS OF TIMESTAMP '2021-03-01 00:00:01'")) .isEqualTo( new Query( @@ -6010,7 +6010,7 @@ public void testQueryPeriod() rangeValue = new StringLiteral(location(1, 35), "version1"); queryPeriod = new QueryPeriod(new NodeLocation(1, 17), QueryPeriod.RangeType.VERSION, rangeValue); - table = new Table(location(1, 15), qualifiedName(location(1, 15), "t"), queryPeriod); + table = new Table(location(1, 15), qualifiedName(location(1, 15), "t"), queryPeriod, Optional.empty()); assertThat(statement("SELECT * FROM t FOR VERSION AS OF 'version1'")) .isEqualTo( new Query( From d9b9694db9be55e0c920e9c0129e1dacb818920d Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Sat, 21 Dec 2024 21:54:47 +0900 Subject: [PATCH 2/3] Prepare access control for creating and dropping branch --- .../java/io/trino/security/AccessControl.java | 22 ++++++++++++++++ .../trino/security/AccessControlManager.java | 26 +++++++++++++++++++ .../trino/security/AllowAllAccessControl.java | 6 +++++ .../trino/security/DenyAllAccessControl.java | 14 ++++++++++ .../security/ForwardingAccessControl.java | 12 +++++++++ .../InjectedConnectorAccessControl.java | 14 ++++++++++ .../trino/tracing/TracingAccessControl.java | 18 +++++++++++++ .../spi/connector/ConnectorAccessControl.java | 22 ++++++++++++++++ .../spi/security/AccessDeniedException.java | 20 ++++++++++++++ .../spi/security/SystemAccessControl.java | 22 ++++++++++++++++ ...ClassLoaderSafeConnectorAccessControl.java | 16 ++++++++++++ .../base/security/AllowAllAccessControl.java | 6 +++++ .../security/AllowAllSystemAccessControl.java | 6 +++++ .../base/security/FileBasedAccessControl.java | 18 +++++++++++++ .../FileBasedSystemAccessControl.java | 12 +++++++++ .../ForwardingConnectorAccessControl.java | 12 +++++++++ .../ForwardingSystemAccessControl.java | 12 +++++++++ .../security/SqlStandardAccessControl.java | 18 +++++++++++++ 18 files changed, 276 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/security/AccessControl.java b/core/trino-main/src/main/java/io/trino/security/AccessControl.java index 475bfad96c47..34552d5d9c69 100644 --- a/core/trino-main/src/main/java/io/trino/security/AccessControl.java +++ b/core/trino-main/src/main/java/io/trino/security/AccessControl.java @@ -37,6 +37,8 @@ import java.util.Optional; import java.util.Set; +import static io.trino.spi.security.AccessDeniedException.denyCreateBranchAndTag; +import static io.trino.spi.security.AccessDeniedException.denyDropBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denySetViewAuthorization; public interface AccessControl @@ -620,4 +622,24 @@ default Map getColumnMasks(SecurityContext context { return ImmutableMap.of(); } + + /** + * Check if identity is allowed to create the specified branch or tag. + * + * @throws AccessDeniedException if not allowed + */ + default void checkCanCreateBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) + { + denyCreateBranchAndTag(tableName.toString(), name); + } + + /** + * Check if identity is allowed to drop the specified branch or tag. + * + * @throws AccessDeniedException if not allowed + */ + default void canCanDropBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) + { + denyDropBranchAndTag(tableName.toString(), name); + } } diff --git a/core/trino-main/src/main/java/io/trino/security/AccessControlManager.java b/core/trino-main/src/main/java/io/trino/security/AccessControlManager.java index 03f448f263fe..776a91189db8 100644 --- a/core/trino-main/src/main/java/io/trino/security/AccessControlManager.java +++ b/core/trino-main/src/main/java/io/trino/security/AccessControlManager.java @@ -1456,6 +1456,32 @@ private ConnectorAccessControl getConnectorAccessControl(TransactionId transacti return connectorAccessControl; } + @Override + public void checkCanCreateBranchAndTag(SecurityContext securityContext, QualifiedObjectName tableName, String name) + { + requireNonNull(securityContext, "securityContext is null"); + requireNonNull(tableName, "tableName is null"); + + checkCanAccessCatalog(securityContext, tableName.catalogName()); + + systemAuthorizationCheck(control -> control.checkCanCreateBranchAndTag(securityContext.toSystemSecurityContext(), tableName.asCatalogSchemaTableName(), name)); + + catalogAuthorizationCheck(tableName.catalogName(), securityContext, (control, context) -> control.checkCanCreateBranchAndTag(context, tableName.asSchemaTableName(), name)); + } + + @Override + public void canCanDropBranchAndTag(SecurityContext securityContext, QualifiedObjectName tableName, String name) + { + requireNonNull(securityContext, "securityContext is null"); + requireNonNull(tableName, "tableName is null"); + + checkCanAccessCatalog(securityContext, tableName.catalogName()); + + systemAuthorizationCheck(control -> control.checkCanDropBranchAndTag(securityContext.toSystemSecurityContext(), tableName.asCatalogSchemaTableName(), name)); + + catalogAuthorizationCheck(tableName.catalogName(), securityContext, (control, context) -> control.checkCanDropBranchAndTag(context, tableName.asSchemaTableName(), name)); + } + @Managed @Nested public CounterStat getAuthorizationSuccess() diff --git a/core/trino-main/src/main/java/io/trino/security/AllowAllAccessControl.java b/core/trino-main/src/main/java/io/trino/security/AllowAllAccessControl.java index a63bd24d4cdc..9cb491cc9c59 100644 --- a/core/trino-main/src/main/java/io/trino/security/AllowAllAccessControl.java +++ b/core/trino-main/src/main/java/io/trino/security/AllowAllAccessControl.java @@ -291,4 +291,10 @@ public void checkCanDropFunction(SecurityContext context, QualifiedObjectName fu @Override public void checkCanShowCreateFunction(SecurityContext context, QualifiedObjectName functionName) {} + + @Override + public void checkCanCreateBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) {} + + @Override + public void canCanDropBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) {} } diff --git a/core/trino-main/src/main/java/io/trino/security/DenyAllAccessControl.java b/core/trino-main/src/main/java/io/trino/security/DenyAllAccessControl.java index b9816afeab12..516c715a8f23 100644 --- a/core/trino-main/src/main/java/io/trino/security/DenyAllAccessControl.java +++ b/core/trino-main/src/main/java/io/trino/security/DenyAllAccessControl.java @@ -39,6 +39,7 @@ import static io.trino.spi.security.AccessDeniedException.denyCommentColumn; import static io.trino.spi.security.AccessDeniedException.denyCommentTable; import static io.trino.spi.security.AccessDeniedException.denyCommentView; +import static io.trino.spi.security.AccessDeniedException.denyCreateBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyCreateCatalog; import static io.trino.spi.security.AccessDeniedException.denyCreateFunction; import static io.trino.spi.security.AccessDeniedException.denyCreateMaterializedView; @@ -51,6 +52,7 @@ import static io.trino.spi.security.AccessDeniedException.denyDenyEntityPrivilege; import static io.trino.spi.security.AccessDeniedException.denyDenySchemaPrivilege; import static io.trino.spi.security.AccessDeniedException.denyDenyTablePrivilege; +import static io.trino.spi.security.AccessDeniedException.denyDropBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyDropCatalog; import static io.trino.spi.security.AccessDeniedException.denyDropColumn; import static io.trino.spi.security.AccessDeniedException.denyDropFunction; @@ -575,4 +577,16 @@ public void checkCanShowCreateFunction(SecurityContext context, QualifiedObjectN { denyShowCreateFunction(functionName.toString()); } + + @Override + public void checkCanCreateBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) + { + denyCreateBranchAndTag(tableName.toString(), name); + } + + @Override + public void canCanDropBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) + { + denyDropBranchAndTag(tableName.toString(), name); + } } diff --git a/core/trino-main/src/main/java/io/trino/security/ForwardingAccessControl.java b/core/trino-main/src/main/java/io/trino/security/ForwardingAccessControl.java index 42c89e5d0f43..8f6750bb192c 100644 --- a/core/trino-main/src/main/java/io/trino/security/ForwardingAccessControl.java +++ b/core/trino-main/src/main/java/io/trino/security/ForwardingAccessControl.java @@ -535,4 +535,16 @@ public Map getColumnMasks(SecurityContext context, { return delegate().getColumnMasks(context, tableName, columns); } + + @Override + public void checkCanCreateBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) + { + delegate().checkCanCreateBranchAndTag(context, tableName, name); + } + + @Override + public void canCanDropBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) + { + delegate().canCanDropBranchAndTag(context, tableName, name); + } } diff --git a/core/trino-main/src/main/java/io/trino/security/InjectedConnectorAccessControl.java b/core/trino-main/src/main/java/io/trino/security/InjectedConnectorAccessControl.java index 9969848871f0..67c56c42f7ee 100644 --- a/core/trino-main/src/main/java/io/trino/security/InjectedConnectorAccessControl.java +++ b/core/trino-main/src/main/java/io/trino/security/InjectedConnectorAccessControl.java @@ -534,6 +534,20 @@ public Map getColumnMasks(ConnectorSecurityContext throw new TrinoException(NOT_SUPPORTED, "Column masking not supported"); } + @Override + public void checkCanCreateBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + checkArgument(context == null, "context must be null"); + accessControl.checkCanCreateBranchAndTag(securityContext, new QualifiedObjectName(catalogName, tableName.getSchemaName(), tableName.getTableName()), name); + } + + @Override + public void checkCanDropBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + checkArgument(context == null, "context must be null"); + accessControl.canCanDropBranchAndTag(securityContext, new QualifiedObjectName(catalogName, tableName.getSchemaName(), tableName.getTableName()), name); + } + private QualifiedObjectName getQualifiedObjectName(SchemaTableName schemaTableName) { return new QualifiedObjectName(catalogName, schemaTableName.getSchemaName(), schemaTableName.getTableName()); diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingAccessControl.java b/core/trino-main/src/main/java/io/trino/tracing/TracingAccessControl.java index 50fb5343ebe9..a2f114883f39 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingAccessControl.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingAccessControl.java @@ -782,6 +782,24 @@ public Map getColumnMasks(SecurityContext context, } } + @Override + public void checkCanCreateBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) + { + Span span = startSpan("checkCanCreateBranchAndTag"); + try (var _ = scopedSpan(span)) { + delegate.checkCanCreateBranchAndTag(context, tableName, name); + } + } + + @Override + public void canCanDropBranchAndTag(SecurityContext context, QualifiedObjectName tableName, String name) + { + Span span = startSpan("canCanDropBranchAndTag"); + try (var _ = scopedSpan(span)) { + delegate.canCanDropBranchAndTag(context, tableName, name); + } + } + private Span startSpan(String methodName) { return tracer.spanBuilder("AccessControl." + methodName) diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorAccessControl.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorAccessControl.java index 0a80c5e35f59..f020a19fd721 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorAccessControl.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorAccessControl.java @@ -31,6 +31,7 @@ import static io.trino.spi.security.AccessDeniedException.denyCommentColumn; import static io.trino.spi.security.AccessDeniedException.denyCommentTable; import static io.trino.spi.security.AccessDeniedException.denyCommentView; +import static io.trino.spi.security.AccessDeniedException.denyCreateBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyCreateFunction; import static io.trino.spi.security.AccessDeniedException.denyCreateMaterializedView; import static io.trino.spi.security.AccessDeniedException.denyCreateRole; @@ -41,6 +42,7 @@ import static io.trino.spi.security.AccessDeniedException.denyDeleteTable; import static io.trino.spi.security.AccessDeniedException.denyDenySchemaPrivilege; import static io.trino.spi.security.AccessDeniedException.denyDenyTablePrivilege; +import static io.trino.spi.security.AccessDeniedException.denyDropBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyDropColumn; import static io.trino.spi.security.AccessDeniedException.denyDropFunction; import static io.trino.spi.security.AccessDeniedException.denyDropMaterializedView; @@ -725,4 +727,24 @@ default Map getColumnMasks(ConnectorSecurityContex .filter(entry -> entry.getValue().isPresent()) .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get())); } + + /** + * Check if identity is allowed to create the specified branch and tag. + * + * @throws AccessDeniedException if not allowed + */ + default void checkCanCreateBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + denyCreateBranchAndTag(tableName.toString(), name); + } + + /** + * Check if identity is allowed to drop the specified branch and tag. + * + * @throws AccessDeniedException if not allowed + */ + default void checkCanDropBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + denyDropBranchAndTag(tableName.toString(), name); + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/security/AccessDeniedException.java b/core/trino-spi/src/main/java/io/trino/spi/security/AccessDeniedException.java index 49a9b1f45544..0f32badcf3af 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/security/AccessDeniedException.java +++ b/core/trino-spi/src/main/java/io/trino/spi/security/AccessDeniedException.java @@ -746,6 +746,26 @@ public static void denyShowCreateFunction(String functionName, String extraInfo) throw new AccessDeniedException(format("Cannot show create function for %s%s", functionName, formatExtraInfo(extraInfo))); } + public static void denyCreateBranchAndTag(String tableName, String name) + { + denyCreateBranchAndTag(tableName, name, null); + } + + public static void denyCreateBranchAndTag(String tableName, String name, String extraInfo) + { + throw new AccessDeniedException(format("Cannot create branch and tag %s@%s%s", tableName, name, formatExtraInfo(extraInfo))); + } + + public static void denyDropBranchAndTag(String tableName, String name) + { + denyDropBranchAndTag(tableName, name, null); + } + + public static void denyDropBranchAndTag(String tableName, String name, String extraInfo) + { + throw new AccessDeniedException(format("Cannot drop branch and tag %s@%s%s", tableName, name, formatExtraInfo(extraInfo))); + } + private static Object formatExtraInfo(String extraInfo) { if (extraInfo == null || extraInfo.isEmpty()) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/security/SystemAccessControl.java b/core/trino-spi/src/main/java/io/trino/spi/security/SystemAccessControl.java index eb4a2f078411..65fd39ebfada 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/security/SystemAccessControl.java +++ b/core/trino-spi/src/main/java/io/trino/spi/security/SystemAccessControl.java @@ -38,6 +38,7 @@ import static io.trino.spi.security.AccessDeniedException.denyCommentColumn; import static io.trino.spi.security.AccessDeniedException.denyCommentTable; import static io.trino.spi.security.AccessDeniedException.denyCommentView; +import static io.trino.spi.security.AccessDeniedException.denyCreateBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyCreateCatalog; import static io.trino.spi.security.AccessDeniedException.denyCreateFunction; import static io.trino.spi.security.AccessDeniedException.denyCreateMaterializedView; @@ -50,6 +51,7 @@ import static io.trino.spi.security.AccessDeniedException.denyDenyEntityPrivilege; import static io.trino.spi.security.AccessDeniedException.denyDenySchemaPrivilege; import static io.trino.spi.security.AccessDeniedException.denyDenyTablePrivilege; +import static io.trino.spi.security.AccessDeniedException.denyDropBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyDropCatalog; import static io.trino.spi.security.AccessDeniedException.denyDropColumn; import static io.trino.spi.security.AccessDeniedException.denyDropFunction; @@ -948,6 +950,26 @@ default Map getColumnMasks(SystemSecurityContext c .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get())); } + /** + * Check if identity is allowed to create the specified branch and tag in the table. + * + * @throws AccessDeniedException if not allowed + */ + default void checkCanCreateBranchAndTag(SystemSecurityContext systemSecurityContext, CatalogSchemaTableName tableName, String name) + { + denyCreateBranchAndTag(tableName.toString(), name); + } + + /** + * Check if identity is allowed to drop the specified branch and tag in the table. + * + * @throws AccessDeniedException if not allowed + */ + default void checkCanDropBranchAndTag(SystemSecurityContext systemSecurityContext, CatalogSchemaTableName tableName, String name) + { + denyDropBranchAndTag(tableName.toString(), name); + } + /** * @return the event listeners provided by this system access control */ diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorAccessControl.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorAccessControl.java index 7a5458f2773d..0556f350223b 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorAccessControl.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorAccessControl.java @@ -581,4 +581,20 @@ public Map getColumnMasks(ConnectorSecurityContext return delegate.getColumnMasks(context, tableName, columns); } } + + @Override + public void checkCanCreateBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) { + delegate.checkCanCreateBranchAndTag(context, tableName, name); + } + } + + @Override + public void checkCanDropBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) { + delegate.checkCanDropBranchAndTag(context, tableName, name); + } + } } diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/AllowAllAccessControl.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/AllowAllAccessControl.java index 984695893588..00adfd7cb179 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/AllowAllAccessControl.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/AllowAllAccessControl.java @@ -258,4 +258,10 @@ public Map getColumnMasks(ConnectorSecurityContext { return ImmutableMap.of(); } + + @Override + public void checkCanCreateBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) {} + + @Override + public void checkCanDropBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) {} } diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/AllowAllSystemAccessControl.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/AllowAllSystemAccessControl.java index 9d062c183382..9681012b6d46 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/AllowAllSystemAccessControl.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/AllowAllSystemAccessControl.java @@ -359,6 +359,12 @@ public Map getColumnMasks(SystemSecurityContext co return ImmutableMap.of(); } + @Override + public void checkCanCreateBranchAndTag(SystemSecurityContext systemSecurityContext, CatalogSchemaTableName tableName, String name) {} + + @Override + public void checkCanDropBranchAndTag(SystemSecurityContext systemSecurityContext, CatalogSchemaTableName tableName, String name) {} + @Override public void shutdown() {} } diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/FileBasedAccessControl.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/FileBasedAccessControl.java index ea43e5de875a..42ec79e955f9 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/FileBasedAccessControl.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/FileBasedAccessControl.java @@ -54,6 +54,7 @@ import static io.trino.spi.security.AccessDeniedException.denyCommentColumn; import static io.trino.spi.security.AccessDeniedException.denyCommentTable; import static io.trino.spi.security.AccessDeniedException.denyCommentView; +import static io.trino.spi.security.AccessDeniedException.denyCreateBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyCreateFunction; import static io.trino.spi.security.AccessDeniedException.denyCreateMaterializedView; import static io.trino.spi.security.AccessDeniedException.denyCreateRole; @@ -64,6 +65,7 @@ import static io.trino.spi.security.AccessDeniedException.denyDeleteTable; import static io.trino.spi.security.AccessDeniedException.denyDenySchemaPrivilege; import static io.trino.spi.security.AccessDeniedException.denyDenyTablePrivilege; +import static io.trino.spi.security.AccessDeniedException.denyDropBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyDropColumn; import static io.trino.spi.security.AccessDeniedException.denyDropFunction; import static io.trino.spi.security.AccessDeniedException.denyDropMaterializedView; @@ -769,6 +771,22 @@ public Map getColumnMasks(ConnectorSecurityContext } } + @Override + public void checkCanCreateBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + if (!checkTablePermission(context, tableName, OWNERSHIP)) { + denyCreateBranchAndTag(tableName.toString(), name); + } + } + + @Override + public void checkCanDropBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + if (!checkTablePermission(context, tableName, OWNERSHIP)) { + denyDropBranchAndTag(tableName.toString(), name); + } + } + private boolean canSetSessionProperty(ConnectorSecurityContext context, String property) { ConnectorIdentity identity = context.getIdentity(); diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/FileBasedSystemAccessControl.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/FileBasedSystemAccessControl.java index cb982a5350f8..981c75022bda 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/FileBasedSystemAccessControl.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/FileBasedSystemAccessControl.java @@ -1103,6 +1103,18 @@ public Map getColumnMasks(SystemSecurityContext co } } + @Override + public void checkCanCreateBranchAndTag(SystemSecurityContext systemSecurityContext, CatalogSchemaTableName tableName, String name) + { + throw new UnsupportedOperationException(); + } + + @Override + public void checkCanDropBranchAndTag(SystemSecurityContext systemSecurityContext, CatalogSchemaTableName tableName, String name) + { + throw new UnsupportedOperationException(); + } + private boolean checkAnyCatalogAccess(SystemSecurityContext context, String catalogName) { if (canAccessCatalog(context, catalogName, OWNER)) { diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/ForwardingConnectorAccessControl.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/ForwardingConnectorAccessControl.java index a51be521f26e..2664beea413e 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/ForwardingConnectorAccessControl.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/ForwardingConnectorAccessControl.java @@ -453,4 +453,16 @@ public Map getColumnMasks(ConnectorSecurityContext { return delegate().getColumnMasks(context, tableName, columns); } + + @Override + public void checkCanCreateBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + delegate().checkCanCreateBranchAndTag(context, tableName, name); + } + + @Override + public void checkCanDropBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + delegate().checkCanDropBranchAndTag(context, tableName, name); + } } diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/ForwardingSystemAccessControl.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/ForwardingSystemAccessControl.java index 8370b3b49ca9..c1d5fc9e7634 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/ForwardingSystemAccessControl.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/security/ForwardingSystemAccessControl.java @@ -557,6 +557,18 @@ public Map getColumnMasks(SystemSecurityContext co return delegate().getColumnMasks(context, tableName, columns); } + @Override + public void checkCanCreateBranchAndTag(SystemSecurityContext systemSecurityContext, CatalogSchemaTableName tableName, String name) + { + delegate().checkCanCreateBranchAndTag(systemSecurityContext, tableName, name); + } + + @Override + public void checkCanDropBranchAndTag(SystemSecurityContext systemSecurityContext, CatalogSchemaTableName tableName, String name) + { + delegate().checkCanDropBranchAndTag(systemSecurityContext, tableName, name); + } + @Override public void shutdown() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/security/SqlStandardAccessControl.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/security/SqlStandardAccessControl.java index a8cf4eb942c5..27e1975ca375 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/security/SqlStandardAccessControl.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/security/SqlStandardAccessControl.java @@ -64,6 +64,7 @@ import static io.trino.spi.security.AccessDeniedException.denyCommentColumn; import static io.trino.spi.security.AccessDeniedException.denyCommentTable; import static io.trino.spi.security.AccessDeniedException.denyCommentView; +import static io.trino.spi.security.AccessDeniedException.denyCreateBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyCreateFunction; import static io.trino.spi.security.AccessDeniedException.denyCreateMaterializedView; import static io.trino.spi.security.AccessDeniedException.denyCreateRole; @@ -72,6 +73,7 @@ import static io.trino.spi.security.AccessDeniedException.denyCreateView; import static io.trino.spi.security.AccessDeniedException.denyCreateViewWithSelect; import static io.trino.spi.security.AccessDeniedException.denyDeleteTable; +import static io.trino.spi.security.AccessDeniedException.denyDropBranchAndTag; import static io.trino.spi.security.AccessDeniedException.denyDropColumn; import static io.trino.spi.security.AccessDeniedException.denyDropFunction; import static io.trino.spi.security.AccessDeniedException.denyDropMaterializedView; @@ -646,6 +648,22 @@ public Map getColumnMasks(ConnectorSecurityContext return ImmutableMap.of(); } + @Override + public void checkCanCreateBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + if (!isTableOwner(context, tableName)) { + denyCreateBranchAndTag(tableName.toString(), name); + } + } + + @Override + public void checkCanDropBranchAndTag(ConnectorSecurityContext context, SchemaTableName tableName, String name) + { + if (!isTableOwner(context, tableName)) { + denyDropBranchAndTag(tableName.toString(), name); + } + } + private boolean isAdmin(ConnectorSecurityContext context) { return isRoleEnabled(context.getIdentity(), hivePrincipal -> metastore.listRoleGrants(context, hivePrincipal), ADMIN_ROLE_NAME); From 732ac7e32049871c9240a105c0e9d6969787a1ba Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 18 Sep 2024 16:48:36 +0900 Subject: [PATCH 3/3] Add support for branching in Iceberg --- .../trino/plugin/iceberg/IcebergMetadata.java | 237 +++++++++-- .../trino/plugin/iceberg/IcebergModule.java | 6 + .../iceberg/IcebergPageSinkProvider.java | 3 + .../plugin/iceberg/IcebergTableHandle.java | 19 +- .../iceberg/IcebergTableProperties.java | 12 + .../iceberg/IcebergWritableTableHandle.java | 5 +- .../procedure/CreateBranchProcedure.java | 42 ++ .../procedure/DropBranchProcedure.java | 42 ++ .../procedure/FastForwardProcedure.java | 47 +++ .../procedure/IcebergCreateBranchHandle.java | 25 ++ .../procedure/IcebergDropBranchHandle.java | 25 ++ .../procedure/IcebergFastForwardHandle.java | 26 ++ .../procedure/IcebergProcedureHandle.java | 3 + .../procedure/IcebergTableProcedureId.java | 3 + .../plugin/iceberg/TestIcebergBranching.java | 382 ++++++++++++++++++ ...stIcebergNodeLocalDynamicSplitPruning.java | 4 + .../iceberg/TestIcebergSplitSource.java | 1 + ...TestConnectorPushdownRulesWithIceberg.java | 4 + 18 files changed, 861 insertions(+), 25 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 316b793c24b6..d7cc5f0e17bd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -49,8 +49,11 @@ import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle; import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle; +import io.trino.plugin.iceberg.procedure.IcebergCreateBranchHandle; +import io.trino.plugin.iceberg.procedure.IcebergDropBranchHandle; import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle; import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle; +import io.trino.plugin.iceberg.procedure.IcebergFastForwardHandle; import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle; import io.trino.plugin.iceberg.procedure.IcebergOptimizeManifestsHandle; import io.trino.plugin.iceberg.procedure.IcebergRemoveOrphanFilesHandle; @@ -237,6 +240,7 @@ import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.base.util.ExecutorUtil.processWithAdditionalThreads; +import static io.trino.plugin.base.util.Functions.checkFunctionArgument; import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; import static io.trino.plugin.hive.HiveMetadata.TRANSACTIONAL; import static io.trino.plugin.hive.HiveTimestampPrecision.DEFAULT_PRECISION; @@ -329,19 +333,24 @@ import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.TRINO_QUERY_START_TIME; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES_FROM_TABLE; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.CREATE_BRANCH; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_BRANCH; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_EXTENDED_STATS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.FAST_FORWARD; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE_MANIFESTS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_SNAPSHOT; import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFiles; import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFilesFromTable; +import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; +import static io.trino.spi.StandardErrorCode.NOT_FOUND; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; @@ -375,6 +384,7 @@ import static org.apache.iceberg.MetadataTableType.ENTRIES; import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations; import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations; +import static org.apache.iceberg.SnapshotRef.MAIN_BRANCH; import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; @@ -517,7 +527,7 @@ public ConnectorTableHandle getTableHandle( BaseTable storageTable = catalog.getMaterializedViewStorageTable(session, materializedViewName) .orElseThrow(() -> new TrinoException(TABLE_NOT_FOUND, "Storage table metadata not found for materialized view " + tableName)); - return tableHandleForCurrentSnapshot(session, tableName, storageTable); + return tableHandleForCurrentSnapshot(session, tableName, storageTable, Optional.empty()); } if (!isDataTable(tableName.getTableName())) { @@ -541,20 +551,29 @@ public ConnectorTableHandle getTableHandle( throw e; } + Optional branch = Optional.empty(); if (endVersion.isPresent()) { - long snapshotId = getSnapshotIdFromVersion(session, table, endVersion.get()); + ConnectorTableVersion version = endVersion.get(); + long snapshotId = getSnapshotIdFromVersion(session, table, version); + Optional partitionSpec = Optional.empty(); + branch = getBranch(version); + if (branch.isPresent()) { + int schemaId = table.snapshot(snapshotId).schemaId(); + partitionSpec = Optional.ofNullable(table.specs().get(schemaId)); + } return tableHandleForSnapshot( session, tableName, table, Optional.of(snapshotId), schemaFor(table, snapshotId), - Optional.empty()); + partitionSpec, + branch); } - return tableHandleForCurrentSnapshot(session, tableName, table); + return tableHandleForCurrentSnapshot(session, tableName, table, branch); } - private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession session, SchemaTableName tableName, BaseTable table) + private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession session, SchemaTableName tableName, BaseTable table, Optional branch) { return tableHandleForSnapshot( session, @@ -562,7 +581,8 @@ private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession sessio table, Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId), table.schema(), - Optional.of(table.spec())); + Optional.of(table.spec()), + branch); } private IcebergTableHandle tableHandleForSnapshot( @@ -571,7 +591,8 @@ private IcebergTableHandle tableHandleForSnapshot( BaseTable table, Optional tableSnapshotId, Schema tableSchema, - Optional partitionSpec) + Optional partitionSpec, + Optional branch) { Map tableProperties = table.properties(); return new IcebergTableHandle( @@ -591,6 +612,7 @@ private IcebergTableHandle tableHandleForSnapshot( table.location(), table.properties(), getTablePartitioning(session, table), + branch, false, Optional.empty(), ImmutableSet.of(), @@ -629,7 +651,7 @@ private Optional getTablePartitioning(ConnectorSession IntStream.range(0, partitioningHandle.partitionFunctions().size()).boxed().collect(toImmutableList()))); } - private static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version) + public static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version) { io.trino.spi.type.Type versionType = version.getVersionType(); return switch (version.getPointerType()) { @@ -638,6 +660,21 @@ private static long getSnapshotIdFromVersion(ConnectorSession session, Table tab }; } + public static Optional getBranch(ConnectorTableVersion version) + { + io.trino.spi.type.Type versionType = version.getVersionType(); + return switch (version.getPointerType()) { + case TEMPORAL -> Optional.empty(); + case TARGET_ID -> { + if (versionType instanceof VarcharType) { + String refName = ((Slice) version.getVersion()).toStringUtf8(); + yield Optional.of(refName); + } + yield Optional.empty(); + } + }; + } + private static long getTargetSnapshotIdFromVersion(Table table, ConnectorTableVersion version, io.trino.spi.type.Type versionType) { long snapshotId; @@ -1235,7 +1272,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con "Cannot create a table on a non-empty location: %s, set 'iceberg.unique-table-location=true' in your Iceberg catalog properties " + "to use unique table locations for every table.", location)); } - return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), retryMode); + return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), SchemaParser.toJson(transaction.table().schema()), Optional.empty(), retryMode); } catch (IOException e) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e); @@ -1337,12 +1374,15 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto { IcebergTableHandle table = (IcebergTableHandle) tableHandle; Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Optional branch = table.getBranch(); - validateNotModifyingOldSnapshot(table, icebergTable); + branch.ifPresentOrElse( + name -> checkBranch(icebergTable, name), + () -> validateNotModifyingOldSnapshot(table, icebergTable)); beginTransaction(icebergTable); - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), branch, retryMode); } private List getChildNamespaces(ConnectorSession session, String parentNamespace) @@ -1358,20 +1398,22 @@ private List getChildNamespaces(ConnectorSession session, String parentN .collect(toImmutableList()); } - private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table, RetryMode retryMode) + private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName schemaTableName, Table table, String schemaAsJson, Optional branch, RetryMode retryMode) { + Schema schema = SchemaParser.fromJson(schemaAsJson); return new IcebergWritableTableHandle( - name, - SchemaParser.toJson(table.schema()), + schemaTableName, + schemaAsJson, transformValues(table.specs(), PartitionSpecParser::toJson), table.spec().specId(), - getSupportedSortFields(table.schema(), table.sortOrder()), - getProjectedColumns(table.schema(), typeManager), + getSupportedSortFields(schema, table.sortOrder()), + getProjectedColumns(schema, typeManager), table.location(), getFileFormat(table), table.properties(), retryMode, - table.io().properties()); + table.io().properties(), + branch); } private static List getSupportedSortFields(Schema schema, SortOrder sortOrder) @@ -1442,6 +1484,7 @@ public Optional finishInsert( } appendFiles.appendFile(builder.build()); + table.branch().ifPresent(appendFiles::toBranch); writtenFiles.add(task.path()); } @@ -1452,7 +1495,7 @@ public Optional finishInsert( commitUpdateAndTransaction(appendFiles, session, transaction, "insert"); // TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer - long newSnapshotId = transaction.table().currentSnapshot().snapshotId(); + long newSnapshotId = table.branch().isEmpty() ? transaction.table().currentSnapshot().snapshotId() : transaction.table().refs().get(table.branch().get()).snapshotId(); transaction = null; // TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats atomically @@ -1604,6 +1647,9 @@ public Optional getTableHandleForExecute( case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties); case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties); case ADD_FILES_FROM_TABLE -> getTableHandleForAddFilesFromTable(session, accessControl, tableHandle, executeProperties); + case CREATE_BRANCH -> getTableHandleForCreateBranch(session, accessControl, tableHandle, executeProperties); + case DROP_BRANCH -> getTableHandleForDropBranch(session, accessControl, tableHandle, executeProperties); + case FAST_FORWARD -> getTableHandleForFastForward(session, tableHandle, executeProperties); }; } @@ -1684,6 +1730,51 @@ private Optional getTableHandleForRemoveOrphanFiles icebergTable.io().properties())); } + private Optional getTableHandleForCreateBranch(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) + { + String name = (String) executeProperties.get("name"); + accessControl.checkCanCreateBranchAndTag(null, tableHandle.getSchemaTableName(), name); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + CREATE_BRANCH, + new IcebergCreateBranchHandle(name), + icebergTable.location(), + icebergTable.io().properties())); + } + + private Optional getTableHandleForDropBranch(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) + { + String name = (String) executeProperties.get("name"); + accessControl.checkCanDropBranchAndTag(null, tableHandle.getSchemaTableName(), name); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + DROP_BRANCH, + new IcebergDropBranchHandle(name), + icebergTable.location(), + icebergTable.io().properties())); + } + + private Optional getTableHandleForFastForward(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) + { + String from = (String) executeProperties.get("from"); + String to = (String) executeProperties.get("to"); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + if (!icebergTable.refs().containsKey(from)) { + throw new TrinoException(ALREADY_EXISTS, "Branch '%s' does not exit".formatted(from)); + } + if (!icebergTable.refs().containsKey(to)) { + throw new TrinoException(ALREADY_EXISTS, "Branch '%s' does not exit".formatted(to)); + } + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + FAST_FORWARD, + new IcebergFastForwardHandle(from, to), + icebergTable.location(), + icebergTable.io().properties())); + } + private Optional getTableHandleForAddFiles(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) { if (!addFilesProcedureEnabled) { @@ -1816,6 +1907,9 @@ public Optional getLayoutForTableExecute(ConnectorSession case REMOVE_ORPHAN_FILES: case ADD_FILES: case ADD_FILES_FROM_TABLE: + case CREATE_BRANCH: + case DROP_BRANCH: + case FAST_FORWARD: // handled via executeTableExecute } throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'"); @@ -1847,6 +1941,9 @@ public BeginTableExecuteResult> readerForManifest(Table table, ManifestFile manifest) { return switch (manifest.content()) { @@ -2982,11 +3149,15 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT verifyTableVersionForUpdate(table); Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); - validateNotModifyingOldSnapshot(table, icebergTable); + Optional branch = table.getBranch(); + + branch.ifPresentOrElse( + name -> checkBranch(icebergTable, name), + () -> validateNotModifyingOldSnapshot(table, icebergTable)); beginTransaction(icebergTable); - IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), branch, retryMode); return new IcebergMergeTableHandle(table, insertHandle); } @@ -2995,8 +3166,9 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg { IcebergMergeTableHandle mergeHandle = (IcebergMergeTableHandle) mergeTableHandle; IcebergTableHandle handle = mergeHandle.getTableHandle(); + Optional branch = mergeHandle.getInsertTableHandle().branch(); RetryMode retryMode = mergeHandle.getInsertTableHandle().retryMode(); - finishWrite(session, handle, fragments, retryMode); + finishWrite(session, handle, branch, fragments, retryMode); } private static void verifyTableVersionForUpdate(IcebergTableHandle table) @@ -3013,7 +3185,7 @@ private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Ta } } - private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection fragments, RetryMode retryMode) + private void finishWrite(ConnectorSession session, IcebergTableHandle table, Optional branch, Collection fragments, RetryMode retryMode) { Table icebergTable = transaction.table(); @@ -3031,6 +3203,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); RowDelta rowDelta = transaction.newRowDelta(); + branch.ifPresent(rowDelta::toBranch); table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); TupleDomain convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertibleToIcebergExpression(domain)); @@ -3165,6 +3338,10 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle DeleteFiles deleteFiles = icebergTable.newDelete() .deleteFromRowFilter(toIcebergExpression(handle.getEnforcedPredicate())); + handle.getBranch().ifPresent(branch -> { + checkBranch(icebergTable, branch); + deleteFiles.toBranch(branch); + }); commitUpdate(deleteFiles, session, "delete"); Map summary = icebergTable.currentSnapshot().summary(); @@ -3223,6 +3400,7 @@ public Optional> applyLimit(Connect table.getTableLocation(), table.getStorageProperties(), table.getTablePartitioning(), + table.getBranch(), table.isRecordScannedFiles(), table.getMaxScannedFileSize(), table.getConstraintColumns(), @@ -3332,6 +3510,7 @@ else if (isMetadataColumnId(columnHandle.getId())) { table.getTableLocation(), table.getStorageProperties(), table.getTablePartitioning(), + table.getBranch(), table.isRecordScannedFiles(), table.getMaxScannedFileSize(), newConstraintColumns, @@ -3483,6 +3662,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab originalHandle.getTableLocation(), originalHandle.getStorageProperties(), Optional.empty(), // requiredTablePartitioning does not affect stats + originalHandle.getBranch(), false, // recordScannedFiles does not affect stats originalHandle.getMaxScannedFileSize(), ImmutableSet.of(), // constraintColumns do not affect stats @@ -3578,7 +3758,7 @@ && getOnlyElement(sourceTableHandles) instanceof IcebergTableHandle handle fromSnapshotForRefresh = Optional.of(Long.parseLong(sourceTable.getValue())); } - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), Optional.empty(), retryMode); } @Override @@ -3994,4 +4174,15 @@ private static TableStatistics mergeColumnStatistics(TableStatistics currentStat newStats.getColumnStatistics().forEach(statisticsBuilder::setColumnStatistics); return statisticsBuilder.build(); } + + private static void checkBranch(Table table, String branch) + { + SnapshotRef ref = table.refs().get(branch); + if (ref == null) { + throw new TrinoException(NOT_FOUND, "Branch does not exist: '%s'".formatted(branch)); + } + if (ref.isTag()) { + throw new TrinoException(NOT_SUPPORTED, "Branch '%s' does not exist, but a tag with that name exists".formatted(branch)); + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index ea88078bf1d8..4067e1437e90 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -44,8 +44,11 @@ import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider; import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure; import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure; +import io.trino.plugin.iceberg.procedure.CreateBranchProcedure; +import io.trino.plugin.iceberg.procedure.DropBranchProcedure; import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure; import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure; +import io.trino.plugin.iceberg.procedure.FastForwardProcedure; import io.trino.plugin.iceberg.procedure.OptimizeManifestsTableProcedure; import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure; import io.trino.plugin.iceberg.procedure.RegisterTableProcedure; @@ -138,6 +141,9 @@ public void configure(Binder binder) tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(CreateBranchProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(DropBranchProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(FastForwardProcedure.class).in(Scopes.SINGLETON); newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index bc4d694483f7..afdd7ec943a2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -153,6 +153,9 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa case REMOVE_ORPHAN_FILES: case ADD_FILES: case ADD_FILES_FROM_TABLE: + case CREATE_BRANCH: + case DROP_BRANCH: + case FAST_FORWARD: // handled via ConnectorMetadata.executeTableExecute } throw new IllegalArgumentException("Unknown procedure: " + executeHandle.procedureId()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index 21e1e0535e1e..c4b41761fce7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -49,6 +49,7 @@ public class IcebergTableHandle private final int formatVersion; private final String tableLocation; private final Map storageProperties; + private final Optional branch; // Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector private final TupleDomain unenforcedPredicate; @@ -92,7 +93,8 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( @JsonProperty("projectedColumns") Set projectedColumns, @JsonProperty("nameMappingJson") Optional nameMappingJson, @JsonProperty("tableLocation") String tableLocation, - @JsonProperty("storageProperties") Map storageProperties) + @JsonProperty("storageProperties") Map storageProperties, + @JsonProperty("branch") Optional branch) { return new IcebergTableHandle( catalog, @@ -111,6 +113,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( tableLocation, storageProperties, Optional.empty(), + branch, false, Optional.empty(), ImmutableSet.of(), @@ -134,6 +137,7 @@ public IcebergTableHandle( String tableLocation, Map storageProperties, Optional tablePartitioning, + Optional branch, boolean recordScannedFiles, Optional maxScannedFileSize, Set constraintColumns, @@ -155,6 +159,7 @@ public IcebergTableHandle( this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); this.tablePartitioning = requireNonNull(tablePartitioning, "tablePartitioning is null"); + this.branch = requireNonNull(branch, "branch is null"); this.recordScannedFiles = recordScannedFiles; this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null")); @@ -261,6 +266,12 @@ public Optional getTablePartitioning() return tablePartitioning; } + @JsonProperty + public Optional getBranch() + { + return branch; + } + @JsonIgnore public boolean isRecordScannedFiles() { @@ -314,6 +325,7 @@ public IcebergTableHandle withProjectedColumns(Set projecte tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, maxScannedFileSize, constraintColumns, @@ -339,6 +351,7 @@ public IcebergTableHandle forAnalyze() tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, maxScannedFileSize, constraintColumns, @@ -364,6 +377,7 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, Optional.of(maxScannedFileSize), constraintColumns, @@ -389,6 +403,7 @@ public IcebergTableHandle withTablePartitioning(Optional SUPPORTED_PROPERTIES = ImmutableSet.builder() @@ -76,6 +77,7 @@ public class IcebergTableProperties .add(ORC_BLOOM_FILTER_FPP_PROPERTY) .add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY) .add(DATA_LOCATION_PROPERTY) + .add(TARGET_BRANCH_PROPERTY) .add(EXTRA_PROPERTIES_PROPERTY) .add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY) .build(); @@ -203,6 +205,11 @@ public IcebergTableProperties( "File system location URI for the table's data files", null, false)) + .add(stringProperty( + TARGET_BRANCH_PROPERTY, + "Target branch name", + null, + true)) .build(); checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream() @@ -292,6 +299,11 @@ public static Optional getDataLocation(Map tableProperti return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY)); } + public static Optional getTargetBranch(Map tableProperties) + { + return Optional.ofNullable((String) tableProperties.get(TARGET_BRANCH_PROPERTY)); + } + public static Optional> getExtraProperties(Map tableProperties) { return Optional.ofNullable((Map) tableProperties.get(EXTRA_PROPERTIES_PROPERTY)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java index 7f347564c3b7..831deb830771 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -37,7 +38,8 @@ public record IcebergWritableTableHandle( IcebergFileFormat fileFormat, Map storageProperties, RetryMode retryMode, - Map fileIoProperties) + Map fileIoProperties, + Optional branch) implements ConnectorInsertTableHandle, ConnectorOutputTableHandle { public IcebergWritableTableHandle @@ -53,6 +55,7 @@ public record IcebergWritableTableHandle( requireNonNull(retryMode, "retryMode is null"); checkArgument(partitionsSpecsAsJson.containsKey(partitionSpecId), "partitionSpecId missing from partitionSpecs"); fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); + requireNonNull(branch, "branch is null"); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java new file mode 100644 index 000000000000..81cdf9f830b6 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.CREATE_BRANCH; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class CreateBranchProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + CREATE_BRANCH.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "name", + "Branch name", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java new file mode 100644 index 000000000000..123dbe678b90 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_BRANCH; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class DropBranchProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + DROP_BRANCH.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "name", + "Branch name", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java new file mode 100644 index 000000000000..e3ddd8af7f39 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.FAST_FORWARD; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class FastForwardProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + FAST_FORWARD.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "from", + "Branch to fast-forward", + null, + false)) + .add(stringProperty( + "to", + "Ref for the from branch to be fast forwarded to", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java new file mode 100644 index 000000000000..3cd9a0986bfd --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergCreateBranchHandle(String name) + implements IcebergProcedureHandle +{ + public IcebergCreateBranchHandle + { + requireNonNull(name, "name is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java new file mode 100644 index 000000000000..27de2deb8385 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergDropBranchHandle(String name) + implements IcebergProcedureHandle +{ + public IcebergDropBranchHandle + { + requireNonNull(name, "name is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java new file mode 100644 index 000000000000..23dc9daf0e45 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergFastForwardHandle(String from, String to) + implements IcebergProcedureHandle +{ + public IcebergFastForwardHandle + { + requireNonNull(from, "from is null"); + requireNonNull(to, "to is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java index bd2c64e0778c..22f970bc9a63 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java @@ -28,5 +28,8 @@ @JsonSubTypes.Type(value = IcebergRemoveOrphanFilesHandle.class, name = "remove_orphan_files"), @JsonSubTypes.Type(value = IcebergAddFilesHandle.class, name = "add_files"), @JsonSubTypes.Type(value = IcebergAddFilesFromTableHandle.class, name = "add_files_from_table"), + @JsonSubTypes.Type(value = IcebergCreateBranchHandle.class, name = "create_branch"), + @JsonSubTypes.Type(value = IcebergDropBranchHandle.class, name = "drop_branch"), + @JsonSubTypes.Type(value = IcebergFastForwardHandle.class, name = "fast_forward"), }) public interface IcebergProcedureHandle {} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java index 6230f8b779b6..b30777caf2ed 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java @@ -23,4 +23,7 @@ public enum IcebergTableProcedureId REMOVE_ORPHAN_FILES, ADD_FILES, ADD_FILES_FROM_TABLE, + CREATE_BRANCH, + DROP_BRANCH, + FAST_FORWARD, } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java new file mode 100644 index 000000000000..98fccc939007 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java @@ -0,0 +1,382 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.metastore.HiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Table; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +final class TestIcebergBranching + extends AbstractTestQueryFramework +{ + private HiveMetastore metastore; + private TrinoFileSystemFactory fileSystemFactory; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + QueryRunner queryRunner = IcebergQueryRunner.builder().build(); + metastore = getHiveMetastore(queryRunner); + fileSystemFactory = getFileSystemFactory(queryRunner); + return queryRunner; + } + + @Test + void testCreateBranch() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_branch", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + assertBranch(table.getName(), "main", "test-branch"); + + createBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main", "test-branch", "TEST-BRANCH"); + + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE create_branch('test-branch')", "Branch 'test-branch' already exists"); + } + } + + @Test + void testDropBranch() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_drop_branch", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + createBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main", "test-branch", "TEST-BRANCH"); + + dropBranch(table.getName(), "test-branch"); + dropBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main"); + + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE drop_branch('test-branch')", "Branch 'test-branch' does not exit"); + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE drop_branch('main')", "Cannot drop 'main' branch"); + } + } + + @Test + void testFastForward() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_fast_forward", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES 1, 2, 3", 3); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + fastForward(table.getName(), "main", "test-branch"); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(3L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('non-existing-branch', 'main')", + "Branch 'non-existing-branch' does not exit"); + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('main', 'non-existing-branch')", + "Branch 'non-existing-branch' does not exit"); + } + } + + @Test + void testFastForwardNotAncestor() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_fast_forward", "(x int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("INSERT INTO " + table.getName() + " VALUES 1", 1); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES 1, 2, 3", 3); + + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('main', 'test-branch')", + "Branch 'main' is not an ancestor of 'test-branch'"); + } + } + + @Test + void testInsert() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert_into_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + // insert into main (default) branch + assertUpdate("INSERT INTO " + table.getName() + " @ 'main' VALUES (1, 2)", 1); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(0L); + + // insert into another branch + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (10, 20), (30, 40)", 2); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(2L); + + // insert into another branch with a partial column + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' (x) VALUES 50", 1); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertQueryFails( + "INSERT INTO " + table.getName() + " @ 'non-existing' VALUES (1, 2, 3)", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testInsertAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert_into_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2)", 1); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN z int"); + + assertQueryFails( + "INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 2, 3)", + "\\Qline 1:1: Insert query has mismatched column types: Table: [integer, integer], Query: [integer, integer, integer]"); + + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' SELECT x + 10, y + 10 FROM " + table.getName(), 1); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (1, 2, CAST(NULL AS integer))"); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES (11, 12, CAST(NULL AS integer))"); + } + } + + @Test + void testInsertIntoTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "INSERT INTO " + table.getName() + " @ 'test-tag' VALUES (1, 2)", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + @Test + void testDelete() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_from_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 10), (2, 20), (3, 30)", 3); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertUpdate("DELETE FROM " + table.getName() + " @ 'test-branch'"); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(0L); + + assertQueryFails( + "DELETE FROM " + table.getName() + " @ 'non-existing'", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testDeleteAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_from_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 10), (2, 20), (3, 30)", 3); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + + // TODO This should be fixed after once https://github.com/trinodb/trino/issues/23601 is resolved + assertThat(query("DELETE FROM " + table.getName() + " @ 'test-branch' WHERE y = 30")).nonTrinoExceptionFailure() + .hasMessageContaining("Invalid metadata file") + .hasStackTraceContaining("Cannot find field 'y'"); + + // branch returns the latest schema once a new snapshot is created + assertUpdate("DELETE FROM " + table.getName() + " @ 'test-branch' WHERE x = 1", 1); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 2, 3"); + } + } + + @Test + void testDeleteFromTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "DELETE FROM " + table.getName() + " @ 'test-tag'", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + @Test + void testUpdate() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_branch", "(x int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES 1, 2, 3", 3); + + assertUpdate("UPDATE " + table.getName() + " @ 'test-branch' SET x = x * 2", 3); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 2, 4, 6"); + + assertQueryFails( + "UPDATE " + table.getName() + " @ 'non-existing' SET x = x * 2", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testUpdateAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 10), (2, 20), (3, 30)", 3); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + assertUpdate("UPDATE " + table.getName() + " @ 'test-branch' SET y = 10", 3); + + // branch returns the latest schema once a new snapshot is created + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1, 2, 3"); + } + } + + @Test + void testUpdateTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "UPDATE " + table.getName() + " @ 'test-tag' SET x = 2", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + @Test + void testMerge() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_merge_branch", "(x int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("MERGE INTO " + table.getName() + " @ 'test-branch' USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1)", 1); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1"); + + assertUpdate("MERGE INTO " + table.getName() + " @ 'test-branch' USING (VALUES 42) t(dummy) ON true " + + " WHEN MATCHED THEN UPDATE SET x = 10", 1); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 10"); + + assertQueryFails( + "MERGE INTO " + table.getName() + " @ 'not-existing' USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1)", + "Cannot find snapshot with reference name: not-existing"); + } + } + + @Test + void testMergeAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_merge_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + assertUpdate("MERGE INTO " + table.getName() + " @ 'test-branch' USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1, 2)", 1); + + // branch returns the latest schema once a new snapshot is created + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1"); + } + } + + @Test + void testMergeIntoTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "MERGE INTO " + table.getName() + " @ 'test-tag' USING (VALUES 42) t(dummy) ON false WHEN NOT MATCHED THEN INSERT VALUES (1, 2)", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + private void createBranch(String table, String branch) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE create_branch('" + branch + "')"); + } + + private void dropBranch(String table, String branch) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE drop_branch('" + branch + "')"); + } + + private void fastForward(String table, String from, String to) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE fast_forward('" + from + "', '" + to + "')"); + } + + private void createTag(String table, String tag) + { + BaseTable icebergTable = loadTable(table); + icebergTable.manageSnapshots() + .createTag(tag, icebergTable.currentSnapshot().snapshotId()) + .commit(); + } + + private void assertBranch(String tableName, String... branchNames) + { + Table table = loadTable(tableName); + table.refresh(); + assertThat(table.refs()).containsOnlyKeys(branchNames); + } + + private BaseTable loadTable(String tableName) + { + return IcebergTestUtils.loadTable(tableName, metastore, fileSystemFactory, "iceberg", "tpch"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 01bcecfe8d50..e55852b6f8a4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -172,6 +172,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -233,6 +234,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -344,6 +346,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -506,6 +509,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilterPartitionEvolution tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index a7fb5fc85859..faed01901ce3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -490,6 +490,7 @@ private static IcebergTableHandle createTableHandle(SchemaTableName schemaTableN nationTable.location(), nationTable.properties(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index 37ebe5d04f8d..5e0f4ea829bd 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -176,6 +176,7 @@ public void testProjectionPushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -261,6 +262,7 @@ public void testPredicatePushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -313,6 +315,7 @@ public void testColumnPruningProjectionPushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -375,6 +378,7 @@ public void testPushdownWithDuplicateExpressions() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(),