Skip to content

Commit

Permalink
fix compaction task comparator bug
Browse files Browse the repository at this point in the history
  • Loading branch information
shuwenwei committed Oct 21, 2024
1 parent 46a1335 commit 4b2c49b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,15 @@ private void calculateRenamedTargetFiles(boolean needAdjustSourceFilePosition)
TsFileResource resource = filesView.sortedAllSourceFilesInTask.get(i);
File file = resource.getTsFile();
File skippedSourceFile = filesView.skippedSourceFiles.get(i).getTsFile();
TsFileNameGenerator.TsFileName skippedSourceFileName =
TsFileNameGenerator.getTsFileName(skippedSourceFile.getName());
TsFileNameGenerator.TsFileName tsFileName = TsFileNameGenerator.getTsFileName(file.getName());
String newFileName =
String.format(
"%s-%s-%s-%s" + TsFileConstant.TSFILE_SUFFIX,
tsFileName.getTime(),
tsFileName.getVersion(),
tsFileName.getInnerCompactionCnt(),
skippedSourceFileName.getInnerCompactionCnt(),
tsFileName.getCrossCompactionCnt() + 1);
TsFileResource renamedTargetFile =
new TsFileResource(
Expand Down Expand Up @@ -602,6 +604,11 @@ public List<TsFileResource> getSelectedTsFileResourceList() {
return filesView.sourceFilesInCompactionPerformer;
}

public double getAvgCompactionCount() {
return (double) filesView.sumOfCompactionCount
/ filesView.sourceFilesInCompactionPerformer.size();
}

@TestOnly
public void setTargetTsFileResource(TsFileResource targetTsFileResource) {
this.filesView.setTargetFileForRecover(targetTsFileResource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class DefaultCompactionTaskComparatorImpl implements ICompactionTaskCompa
@SuppressWarnings({"squid:S3776", "javabugs:S6320"})
@Override
public int compare(AbstractCompactionTask o1, AbstractCompactionTask o2) {
System.out.println("compare");
if (o1 instanceof InsertionCrossSpaceCompactionTask
&& o2 instanceof InsertionCrossSpaceCompactionTask) {
return o1.getSerialId() < o2.getSerialId() ? -1 : 1;
Expand Down Expand Up @@ -91,10 +92,10 @@ public int compareInnerSpaceCompactionTask(
// if the sum of compaction count of the selected files are different
// we prefer to execute task with smaller compaction count
// this can reduce write amplification
if (((double) o1.getSumOfCompactionCount()) / o1.getSelectedTsFileResourceList().size()
!= ((double) o2.getSumOfCompactionCount()) / o2.getSelectedTsFileResourceList().size()) {
return o1.getSumOfCompactionCount() / o1.getSelectedTsFileResourceList().size()
- o2.getSumOfCompactionCount() / o2.getSelectedTsFileResourceList().size();
double avgCompactionCount1 = o1.getAvgCompactionCount();
double avgCompactionCount2 = o2.getAvgCompactionCount();
if (Math.abs(avgCompactionCount1 - avgCompactionCount2) < 1e-2) {
return Double.compare(avgCompactionCount1, avgCompactionCount2);
}

// if the time partition of o1 and o2 are different
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,12 @@ public static TsFileResource getInnerCompactionTargetFileResource(
public static List<TsFileResource> getNewInnerCompactionTargetFileResources(
List<TsFileResource> tsFileResources, boolean sequence)
throws IOException, DiskSpaceInsufficientException {
long maxInnerCompactionCount = Long.MIN_VALUE;
long maxCrossMergeCount = Long.MIN_VALUE;
int maxTierLevel = 0;
for (TsFileResource resource : tsFileResources) {
TsFileName tsFileName = getTsFileName(resource.getTsFile().getName());
maxInnerCompactionCount = Math.max(tsFileName.innerCompactionCnt, maxInnerCompactionCount);
maxCrossMergeCount = Math.max(tsFileName.crossCompactionCnt, maxCrossMergeCount);
maxTierLevel = Math.max(resource.getTierLevel(), maxTierLevel);
}
Expand All @@ -339,7 +341,7 @@ public static List<TsFileResource> getNewInnerCompactionTargetFileResources(
tsFileResources.get(0).getTimePartition(),
tsFileName.time,
tsFileName.version,
tsFileName.innerCompactionCnt + 1,
maxInnerCompactionCount,
(int) maxCrossMergeCount,
maxTierLevel,
IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX)),
Expand Down

0 comments on commit 4b2c49b

Please sign in to comment.