Skip to content

Commit

Permalink
Add information from written files to Iceberg conflict detection
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks committed Dec 23, 2024
1 parent 5ce80be commit 7f3392a
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class IcebergConfig
private boolean objectStoreLayoutEnabled;
private int metadataParallelism = 8;
private boolean bucketExecutionEnabled = true;
private boolean fileBasedConflictDetectionEnabled;

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -564,4 +565,17 @@ public IcebergConfig setBucketExecutionEnabled(boolean bucketExecutionEnabled)
this.bucketExecutionEnabled = bucketExecutionEnabled;
return this;
}

public boolean isFileBasedConflictDetectionEnabled()
{
return fileBasedConflictDetectionEnabled;
}

@Config("iceberg.file-based-conflict-detection")
@ConfigDescription("Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system")
public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConflictDetectionEnabled)
{
this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isBucketExecutionEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isFileBasedConflictDetectionEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isIncrementalRefreshEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
Expand Down Expand Up @@ -298,6 +299,7 @@
import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
import static io.trino.plugin.iceberg.IcebergUtil.getProjectedColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
Expand Down Expand Up @@ -340,6 +342,7 @@
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
import static io.trino.spi.predicate.TupleDomain.withColumnDomains;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.DateType.DATE;
Expand Down Expand Up @@ -2894,6 +2897,12 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
TupleDomain<IcebergColumnHandle> convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertibleToIcebergExpression(domain));
TupleDomain<IcebergColumnHandle> effectivePredicate = dataColumnPredicate.intersect(convertibleUnenforcedPredicate);

if (isFileBasedConflictDetectionEnabled(session)) {
TupleDomain<IcebergColumnHandle> commitTasksDomains = extractTupleDomainsFromCommitTasks(table, icebergTable, commitTasks).filter((_, domain) -> isConvertibleToIcebergExpression(domain));
effectivePredicate = effectivePredicate.intersect(commitTasksDomains);
}

if (!effectivePredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(effectivePredicate));
}
Expand Down Expand Up @@ -2958,6 +2967,38 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
commitUpdateAndTransaction(rowDelta, session, transaction, "write");
}

private TupleDomain<IcebergColumnHandle> extractTupleDomainsFromCommitTasks(IcebergTableHandle table, Table icebergTable, List<CommitTaskData> commitTasks)
{
Set<IcebergColumnHandle> partitionColumns = new HashSet<>(getProjectedColumns(icebergTable.schema(), typeManager, identityPartitionColumnsInAllSpecs(icebergTable)));
PartitionSpec partitionSpec = icebergTable.spec();
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Map<IcebergColumnHandle, Domain> domainsFromTasks = new HashMap<>();
for (CommitTaskData commitTask : commitTasks) {
PartitionSpec taskPartitionSpec = PartitionSpecParser.fromJson(schema, commitTask.partitionSpecJson());
if (commitTask.partitionDataJson().isEmpty() || taskPartitionSpec.isUnpartitioned() || !taskPartitionSpec.equals(partitionSpec)) {
return TupleDomain.all(); // We should not produce any specific domains if there are no partitions or current partitions does not match task partitions for any of tasks
}

PartitionData partitionData = PartitionData.fromJson(commitTask.partitionDataJson().get(), partitionColumnTypes);
Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(partitionData, partitionSpec);
Map<ColumnHandle, NullableValue> partitionValues = getPartitionValues(partitionColumns, partitionKeys);

for (Map.Entry<ColumnHandle, NullableValue> entry : partitionValues.entrySet()) {
IcebergColumnHandle columnHandle = (IcebergColumnHandle) entry.getKey();
NullableValue value = entry.getValue();
if (value.isNull()) {
return TupleDomain.all(); // We should not produce any specific domains if any of partition value is null
}
Domain newDomain = Domain.singleValue(columnHandle.getType(), value.getValue());
domainsFromTasks.merge(columnHandle, newDomain, Domain::union);
}
}
return withColumnDomains(domainsFromTasks);
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, Map<String, Object> viewProperties, boolean replace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public final class IcebergSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled";
public static final String BUCKET_EXECUTION_ENABLED = "bucket_execution_enabled";
public static final String FILE_BASED_CONFLICT_DETECTION_ENABLED = "file_based_conflict_detection_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -398,6 +399,11 @@ public IcebergSessionProperties(
"Enable bucket-aware execution: use physical bucketing information to optimize queries",
icebergConfig.isBucketExecutionEnabled(),
false))
.add(booleanProperty(
FILE_BASED_CONFLICT_DETECTION_ENABLED,
"Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system",
icebergConfig.isFileBasedConflictDetectionEnabled(),
false))
.build();
}

Expand Down Expand Up @@ -646,4 +652,9 @@ public static boolean isBucketExecutionEnabled(ConnectorSession session)
{
return session.getProperty(BUCKET_EXECUTION_ENABLED, Boolean.class);
}

public static boolean isFileBasedConflictDetectionEnabled(ConnectorSession session)
{
return session.getProperty(FILE_BASED_CONFLICT_DETECTION_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void testDefaults()
.setIncrementalRefreshEnabled(true)
.setObjectStoreLayoutEnabled(false)
.setMetadataParallelism(8)
.setBucketExecutionEnabled(true));
.setBucketExecutionEnabled(true)
.setFileBasedConflictDetectionEnabled(false));
}

@Test
Expand Down Expand Up @@ -118,6 +119,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.object-store-layout.enabled", "true")
.put("iceberg.metadata.parallelism", "10")
.put("iceberg.bucket-execution", "false")
.put("iceberg.file-based-conflict-detection", "true")
.buildOrThrow();

IcebergConfig expected = new IcebergConfig()
Expand Down Expand Up @@ -154,7 +156,8 @@ public void testExplicitPropertyMappings()
.setIncrementalRefreshEnabled(false)
.setObjectStoreLayoutEnabled(true)
.setMetadataParallelism(10)
.setBucketExecutionEnabled(false);
.setBucketExecutionEnabled(false)
.setFileBasedConflictDetectionEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.MoreFutures;
import io.trino.Session;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
Expand All @@ -33,6 +34,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static io.trino.plugin.iceberg.IcebergSessionProperties.FILE_BASED_CONFLICT_DETECTION_ENABLED;
import static io.trino.testing.QueryAssertions.getTrinoExceptionCause;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
Expand Down Expand Up @@ -669,6 +671,72 @@ void testConcurrentUpdateAndInserts()
}
}

// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@RepeatedTest(3)
public void testConcurrentMerge()
throws Exception
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
String tableName = "test_concurrent_merges_table_" + randomNameSuffix();
String sourceTableName = "test_concurrent_merges_source_table_" + randomNameSuffix();

// Helper table to simulate longer query time during MERGE
assertUpdate("CREATE TABLE " + sourceTableName + " (a, part, string_rep) AS SELECT *, format('a%spart%s', a, part) FROM " +
"(select * from UNNEST(SEQUENCE(1, 2000)) AS t(a)) CROSS JOIN (select * from UNNEST(SEQUENCE(1, 2000)) AS t(part))", 4000000);

assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioning = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4);
// Add more files in the partition 30
assertUpdate("INSERT INTO " + tableName + " VALUES (22, 30)", 1);
try {
// merge data concurrently by using non-overlapping partition predicate
executor.invokeAll(ImmutableList.<Callable<Void>>builder()
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(
withFileBasedConflictDetectionSession(),
"""
MERGE INTO %s t USING (select a, part from %s where string_rep LIKE '%%a12part20') AS s
ON (FALSE)
WHEN NOT MATCHED THEN INSERT (a, part) VALUES(s.a, s.part)
""".formatted(tableName, sourceTableName));
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(
withFileBasedConflictDetectionSession(),
"""
MERGE INTO %s t USING (VALUES (21, 30)) AS s(a, part)
ON (t.part = s.part)
WHEN MATCHED THEN DELETE
""".formatted(tableName));
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(
withFileBasedConflictDetectionSession(),
"""
MERGE INTO %s t USING (VALUES (32, 40)) AS s(a, part)
ON (t.part = s.part)
WHEN MATCHED THEN UPDATE SET a = s.a
""".formatted(tableName));
return null;
})
.build())
.forEach(MoreFutures::getDone);

assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (12, 20), (32, 40)");
}
finally {
assertUpdate("DROP TABLE " + tableName);
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@RepeatedTest(3)
void testConcurrentMergeAndInserts()
Expand Down Expand Up @@ -795,6 +863,83 @@ void testConcurrentDeleteAndDeletePushdownAndInsert()
}
}

// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@RepeatedTest(3)
public void testConcurrentMergeWithTimesamptAndDate()
throws Exception
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
String targetTableName = "test_concurrent_merges_target_table_" + randomNameSuffix();
String sourceTableName = "test_concurrent_merges_source_table_" + randomNameSuffix();

assertUpdate("CREATE TABLE " + sourceTableName + " (timestamp_source TIMESTAMP)");
assertUpdate("INSERT INTO " + sourceTableName + " VALUES (TIMESTAMP '2024-01-01 00:00:01'), (TIMESTAMP '2024-01-02 00:00:01'), (TIMESTAMP '2024-01-03 00:00:01')", 3);
assertUpdate("CREATE TABLE " + targetTableName + " (a BIGINT, date_target DATE) WITH (partitioning = ARRAY['date_target'])");
assertUpdate("INSERT INTO " + targetTableName + " VALUES (1, DATE '2024-01-01'), (2, DATE '2024-01-02'), (3, DATE '2024-01-03')", 3);

try {
// merge data concurrently by using non-overlapping partition predicate (different day in timestamps)
executor.invokeAll(ImmutableList.<Callable<Void>>builder()
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(
withFileBasedConflictDetectionSession(),
"""
MERGE INTO %s t USING (SELECT DATE(timestamp_source) AS date_source from %s
WHERE timestamp_source BETWEEN TIMESTAMP '2024-01-01 00:00:00' AND TIMESTAMP '2024-01-01 23:59:59.999999') AS s
ON (t.date_target = s.date_source)
WHEN MATCHED THEN UPDATE SET
a = a + 1
""".formatted(targetTableName, sourceTableName));
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(
withFileBasedConflictDetectionSession(),
"""
MERGE INTO %s t USING (SELECT DATE(timestamp_source) AS date_source from %s
WHERE timestamp_source BETWEEN TIMESTAMP '2024-01-02 00:00:00' AND TIMESTAMP '2024-01-02 23:59:59.999999') AS s
ON (t.date_target = s.date_source)
WHEN MATCHED THEN UPDATE SET
a = a + 10
""".formatted(targetTableName, sourceTableName));
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(
withFileBasedConflictDetectionSession(),
"""
MERGE INTO %s t USING (SELECT DATE(timestamp_source) AS date_source from %s
WHERE timestamp_source BETWEEN TIMESTAMP '2024-01-03 00:00:00' AND TIMESTAMP '2024-01-03 23:59:59.999999') AS s
ON (t.date_target = s.date_source)
WHEN MATCHED THEN UPDATE SET
a = a + 100
""".formatted(targetTableName, sourceTableName));
return null;
})
.build())
.forEach(MoreFutures::getDone);

assertThat(query("SELECT * FROM " + targetTableName))
.matches("""
VALUES
(CAST(2 AS BIGINT), DATE '2024-01-01'),
(CAST(12 AS BIGINT), DATE '2024-01-02'),
(CAST(103 AS BIGINT), DATE '2024-01-03')
""");
}
finally {
assertUpdate("DROP TABLE " + sourceTableName);
assertUpdate("DROP TABLE " + targetTableName);
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@RepeatedTest(3)
void testConcurrentUpdateWithPartitionTransformation()
Expand Down Expand Up @@ -1053,4 +1198,12 @@ private long getCurrentSnapshotId(String tableName)
{
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
}

private Session withFileBasedConflictDetectionSession()
{
Session fileBasedConflictDetectionSession = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), FILE_BASED_CONFLICT_DETECTION_ENABLED, "true")
.build();
return fileBasedConflictDetectionSession;
}
}

0 comments on commit 7f3392a

Please sign in to comment.