Skip to content

Commit

Permalink
[AMORO-3228] Rewrite pos delete files not written by optimizing proce…
Browse files Browse the repository at this point in the history
…ss (#3229)

Rewrite pos delete files not written by optimizing
  • Loading branch information
zhoujinsong authored Oct 9, 2024
1 parent fba4ce9 commit a181281
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.utils.TableFileUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
Expand Down Expand Up @@ -226,8 +227,12 @@ public boolean fileShouldRewrite(DataFile dataFile, List<ContentFile<?>> deletes

public boolean segmentShouldRewritePos(DataFile dataFile, List<ContentFile<?>> deletes) {
Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported fragment file.");
if (deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count()
>= 2) {
long posDeleteFileCount =
deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count();
if (posDeleteFileCount == 1) {
return !TableFileUtil.isOptimizingPosDeleteFile(
dataFile.path().toString(), deletes.get(0).path().toString());
} else if (posDeleteFileCount > 1) {
combinePosSegmentFileCount++;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ private void flushDeletes() {
String fileDir = TableFileUtil.getFileDir(filePath.get().toString());
String deleteFilePath =
format.addExtension(
String.format("%s/%s-delete-%s", fileDir, fileName, fileNameSuffix));
String.format(
"%s/%s",
fileDir,
TableFileUtil.optimizingPosDeleteFileName(fileName, fileNameSuffix)));
EncryptedOutputFile outputFile =
encryptionManager.encrypt(fileIO.newOutputFile(deleteFilePath));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

public class TableFileUtil {
private static final Logger LOG = LoggerFactory.getLogger(TableFileUtil.class);
private static final String POS_DELETE_FILE_IDENTIFIER = "delete";

/**
* Parse file name form file path
Expand Down Expand Up @@ -192,4 +193,13 @@ public static String getParent(String path) {
Path p = new Path(path);
return p.getParent().toString();
}

public static String optimizingPosDeleteFileName(String dataFileName, String suffix) {
return String.format("%s-%s-%s", dataFileName, POS_DELETE_FILE_IDENTIFIER, suffix);
}

public static boolean isOptimizingPosDeleteFile(String dataFilePath, String posDeleteFilePath) {
return getFileName(posDeleteFilePath)
.startsWith(String.format("%s-%s", getFileName(dataFilePath), POS_DELETE_FILE_IDENTIFIER));
}
}

0 comments on commit a181281

Please sign in to comment.