Skip to content

Commit

Permalink
Add information from written files to Iceberg conflict detection
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks committed Feb 7, 2025
1 parent ea69280 commit 4db2b70
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class IcebergConfig
private boolean objectStoreLayoutEnabled;
private int metadataParallelism = 8;
private boolean bucketExecutionEnabled = true;
private boolean fileBasedConflictDetectionEnabled = true;

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -580,4 +581,17 @@ public IcebergConfig setBucketExecutionEnabled(boolean bucketExecutionEnabled)
this.bucketExecutionEnabled = bucketExecutionEnabled;
return this;
}

public boolean isFileBasedConflictDetectionEnabled()
{
return fileBasedConflictDetectionEnabled;
}

@Config("iceberg.file-based-conflict-detection")
@ConfigDescription("Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system")
public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConflictDetectionEnabled)
{
this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isBucketExecutionEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isFileBasedConflictDetectionEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isIncrementalRefreshEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
Expand Down Expand Up @@ -307,6 +308,7 @@
import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
import static io.trino.plugin.iceberg.IcebergUtil.getProjectedColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
Expand Down Expand Up @@ -353,6 +355,7 @@
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
import static io.trino.spi.predicate.TupleDomain.withColumnDomains;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.DateType.DATE;
Expand Down Expand Up @@ -3033,8 +3036,13 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
TupleDomain<IcebergColumnHandle> convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertibleToIcebergExpression(domain));
TupleDomain<IcebergColumnHandle> effectivePredicate = dataColumnPredicate.intersect(convertibleUnenforcedPredicate);
TupleDomain<IcebergColumnHandle> effectivePredicate = dataColumnPredicate.intersect(table.getUnenforcedPredicate());
if (isFileBasedConflictDetectionEnabled(session)) {
effectivePredicate = effectivePredicate.intersect(extractTupleDomainsFromCommitTasks(table, icebergTable, commitTasks, typeManager));
}

effectivePredicate = effectivePredicate.filter((_, domain) -> isConvertibleToIcebergExpression(domain));

if (!effectivePredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(effectivePredicate));
}
Expand Down Expand Up @@ -3099,6 +3107,40 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
commitUpdateAndTransaction(rowDelta, session, transaction, "write");
}

static TupleDomain<IcebergColumnHandle> extractTupleDomainsFromCommitTasks(IcebergTableHandle table, Table icebergTable, List<CommitTaskData> commitTasks, TypeManager typeManager)
{
Set<IcebergColumnHandle> partitionColumns = new HashSet<>(getProjectedColumns(icebergTable.schema(), typeManager, identityPartitionColumnsInAllSpecs(icebergTable)));
PartitionSpec partitionSpec = icebergTable.spec();
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Map<IcebergColumnHandle, List<Domain>> domainsFromTasks = new HashMap<>();
for (CommitTaskData commitTask : commitTasks) {
PartitionSpec taskPartitionSpec = PartitionSpecParser.fromJson(schema, commitTask.partitionSpecJson());
if (commitTask.partitionDataJson().isEmpty() || taskPartitionSpec.isUnpartitioned() || !taskPartitionSpec.equals(partitionSpec)) {
// We should not produce any specific domains if there are no partitions or current partitions does not match task partitions for any of tasks
// As each partition value narrows down conflict scope we should produce values from all commit tasks or not at all, to avoid partial information
return TupleDomain.all();
}

PartitionData partitionData = PartitionData.fromJson(commitTask.partitionDataJson().get(), partitionColumnTypes);
Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(partitionData, partitionSpec);
Map<ColumnHandle, NullableValue> partitionValues = getPartitionValues(partitionColumns, partitionKeys);

for (Map.Entry<ColumnHandle, NullableValue> entry : partitionValues.entrySet()) {
IcebergColumnHandle columnHandle = (IcebergColumnHandle) entry.getKey();
NullableValue value = entry.getValue();
Domain newDomain = value.isNull() ? Domain.onlyNull(columnHandle.getType()) : Domain.singleValue(columnHandle.getType(), value.getValue());
domainsFromTasks.computeIfAbsent(columnHandle, _ -> new ArrayList<>()).add(newDomain);
}
}
return withColumnDomains(domainsFromTasks.entrySet().stream()
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> Domain.union(entry.getValue()))));
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, Map<String, Object> viewProperties, boolean replace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public final class IcebergSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled";
public static final String BUCKET_EXECUTION_ENABLED = "bucket_execution_enabled";
public static final String FILE_BASED_CONFLICT_DETECTION_ENABLED = "file_based_conflict_detection_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -398,6 +399,11 @@ public IcebergSessionProperties(
"Enable bucket-aware execution: use physical bucketing information to optimize queries",
icebergConfig.isBucketExecutionEnabled(),
false))
.add(booleanProperty(
FILE_BASED_CONFLICT_DETECTION_ENABLED,
"Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system",
icebergConfig.isFileBasedConflictDetectionEnabled(),
false))
.build();
}

Expand Down Expand Up @@ -646,4 +652,9 @@ public static boolean isBucketExecutionEnabled(ConnectorSession session)
{
return session.getProperty(BUCKET_EXECUTION_ENABLED, Boolean.class);
}

public static boolean isFileBasedConflictDetectionEnabled(ConnectorSession session)
{
return session.getProperty(FILE_BASED_CONFLICT_DETECTION_ENABLED, Boolean.class);
}
}
Loading

0 comments on commit 4db2b70

Please sign in to comment.