Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: data-expire by partition info #3273

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
Expand All @@ -63,6 +65,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -75,8 +78,11 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -696,26 +702,152 @@ CloseableIterable<FileEntry> fileScan(

protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, expirationConfig)) {
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
if (mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
});
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(expiredFiles::addFile);
boolean expireByPartitionSuccess = false;
if (expirationConfig
.getExpirationLevel()
.equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
expireByPartitionSuccess =
tryExpireByPartition(entries, expirationConfig, expireTimestamp, expiredFiles);
}
if (!expireByPartitionSuccess) {
expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, expiredFiles);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return expiredFiles;
}

private boolean tryExpireByPartition(
CloseableIterable<FileEntry> entries,
DataExpirationConfig expirationConfig,
long expireTimestamp,
ExpireFiles expiredFiles) {
Types.NestedField expirationField =
table.schema().findField(expirationConfig.getExpirationField());
Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
buildExpirePartitionFieldsMap(expirationField);
// All historical specs have expirationField as the partition field.
boolean allSpecsMatch = expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
if (allSpecsMatch) {
Comparable<?> expirePartitionValue;
try {
expirePartitionValue =
getPartitionUpperBound(expirationConfig, expirationField, expireTimestamp);
} catch (IllegalArgumentException e) {
LOG.error("Failed to get partition upper bound", e);
return false;
}

Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
getExpirePartitionValueMap(
expirePartitionFieldsMap, expirationField, expirePartitionValue);
entries.forEach(
fileEntry -> {
List<Boolean> expiredList = new ArrayList<>();
ContentFile<?> contentFile = fileEntry.getFile();
int fileSpecId = contentFile.specId();
for (Map.Entry<Integer, Comparable<?>> entry :
expirePartitionValueMap.get(fileSpecId).entrySet()) {
Comparable<Object> partitionValue =
contentFile.partition().get(entry.getKey(), entry.getValue().getClass());
boolean expired = partitionValue.compareTo(entry.getValue()) < 0;
expiredList.add(expired);
}
if (!expiredList.isEmpty() && expiredList.stream().allMatch(Boolean::booleanValue)) {
expiredFiles.addFile(fileEntry);
}
});
return true;
}
return false;
}

private void expireByMetricsUpperBound(
CloseableIterable<FileEntry> entries,
DataExpirationConfig expirationConfig,
long expireTimestamp,
ExpireFiles expiredFiles) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
if (mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
});
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(expiredFiles::addFile);
}

private Map<Integer, Map<Integer, PartitionField>> buildExpirePartitionFieldsMap(
Types.NestedField expireField) {
Map<Integer, Map<Integer, PartitionField>> partitionFieldsMap = new HashMap<>();
for (Map.Entry<Integer, PartitionSpec> entry : table.specs().entrySet()) {
int pos = 0;
Map<Integer, PartitionField> posToField = new HashMap<>();
for (PartitionField field : entry.getValue().fields()) {
if (field.sourceId() == expireField.fieldId()) {
posToField.put(pos, field);
}
pos++;
}
partitionFieldsMap.put(entry.getKey(), posToField);
}

return partitionFieldsMap;
}

private Map<Integer, Map<Integer, Comparable<?>>> getExpirePartitionValueMap(
Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap,
Types.NestedField field,
Comparable<?> expireValue) {
Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValue = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, PartitionField>> entry :
expirePartitionFieldsMap.entrySet()) {
Map<Integer, Comparable<?>> posToValue = new HashMap<>();
for (Map.Entry<Integer, PartitionField> posToField : entry.getValue().entrySet()) {
posToValue.put(
posToField.getKey(),
((SerializableFunction<Comparable<?>, Comparable<?>>)
posToField.getValue().transform().bind(field.type()))
.apply(expireValue));
}
expirePartitionValue.put(entry.getKey(), posToValue);
}
return expirePartitionValue;
}

private Comparable<?> getPartitionUpperBound(
DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) {
switch (field.type().typeId()) {
case TIMESTAMP:
return expireTimestamp * 1000;
case LONG:
if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_MS)) {
return expireTimestamp;
} else if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_S)) {
return expireTimestamp / 1000;
} else {
throw new IllegalArgumentException(
"Number dateformat: " + expirationConfig.getNumberDateFormat());
}
case STRING:
return LocalDateTime.ofInstant(
Instant.ofEpochMilli(expireTimestamp), getDefaultZoneId(field))
.format(
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(), Locale.getDefault()));
default:
throw new IllegalArgumentException(
"Unsupported expiration field type: " + field.type().typeId());
}
}

/**
* Create a filter expression for expired files for the `FILE` level. For the `PARTITION` level,
* we need to collect the oldest files to determine if the partition is obsolete, so we will not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,11 @@ private void testUnKeyedPartitionLevel() {

List<Record> expected;
if (tableTestHelper().partitionSpec().isPartitioned()) {
if (expireByStringDate()) {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"));
} else {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"),
createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"),
createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00"));
}
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"),
createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"),
createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00"));
} else {
expected =
Lists.newArrayList(
Expand Down
Loading