From 055888575e3d2385335d9bf7131e49b367565106 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 24 Sep 2024 19:00:39 +0800 Subject: [PATCH 1/6] enhance repair compaction --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 12 +++ .../storageengine/dataregion/DataRegion.java | 7 +- ...RepairUnsortedFileCompactionPerformer.java | 9 +- .../execute/task/AbstractCompactionTask.java | 2 +- .../task/CrossSpaceCompactionTask.java | 3 + .../RepairUnsortedFileCompactionTask.java | 82 ++++++++----------- .../execute/task/SettleCompactionTask.java | 8 ++ .../repair/RepairDataFileScanUtil.java | 12 +++ .../repair/RepairTimePartitionScanTask.java | 3 +- .../UnsortedFileRepairTaskScheduler.java | 2 +- .../impl/SizeTieredCompactionSelector.java | 6 +- .../dataregion/tsfile/TsFileRepairStatus.java | 16 +++- .../RepairUnsortedFileCompactionTest.java | 35 ++++++-- .../RepairUnsortedFileSchedulerTest.java | 2 +- .../conf/iotdb-system.properties.template | 5 ++ 16 files changed, 140 insertions(+), 75 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 07efa4d97c73..8e9f75cb1877 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -439,6 +439,9 @@ public class IoTDBConfig { /** Compact the unsequence files into the overlapped sequence files */ private volatile boolean enableCrossSpaceCompaction = true; + /** Enable auto repair compaction */ + private volatile boolean enableAutoRepairCompaction = true; + /** Enable the service for AINode */ private boolean enableAINodeService = false; @@ -2869,6 +2872,14 @@ public void setEnableCrossSpaceCompaction(boolean enableCrossSpaceCompaction) { this.enableCrossSpaceCompaction = enableCrossSpaceCompaction; } + public boolean isEnableAutoRepairCompaction() { + return enableAutoRepairCompaction; + } + + public void setEnableAutoRepairCompaction(boolean enableAutoRepairCompaction) { + this.enableAutoRepairCompaction = enableAutoRepairCompaction; + } + public boolean isEnableAINodeService() { return enableAINodeService; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 442cdbb1c4e7..4a887e077aae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -444,6 +444,12 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO "compaction_schedule_interval_in_ms", Long.toString(conf.getCompactionScheduleIntervalInMs())))); + conf.setEnableAutoRepairCompaction( + Boolean.parseBoolean( + properties.getProperty( + "enable_auto_repair_compaction", + Boolean.toString(conf.isEnableAutoRepairCompaction())))); + conf.setEnableCrossSpaceCompaction( Boolean.parseBoolean( properties.getProperty( @@ -1222,6 +1228,12 @@ private void loadCompactionHotModifiedProps(Properties properties) .setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec()); CompactionTaskManager.getInstance() .setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec()); + + conf.setEnableAutoRepairCompaction( + Boolean.parseBoolean( + properties.getProperty( + "enable_auto_repair_compaction", + Boolean.toString(conf.isEnableAutoRepairCompaction())))); } private boolean loadCompactionTaskHotModifiedProps(Properties properties) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index fd2d8c63fd72..5b45eb7a853e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1647,9 +1647,12 @@ public Future asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor || tsFileProcessor.alreadyMarkedClosing()) { return CompletableFuture.completedFuture(null); } + TsFileResource resource = tsFileProcessor.getTsFileResource(); logger.info( - "Async close tsfile: {}", - tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath()); + "Async close tsfile: {}, file start time: {}, file end time: {}", + resource.getTsFile().getAbsolutePath(), + resource.getFileStartTime(), + resource.getFileEndTime()); Future future; if (sequence) { closingSequenceTsFileProcessor.add(tsFileProcessor); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java index 3485feb45347..79a3b9fb157a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.RepairUnsortedFileCompactionWriter; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; @@ -34,11 +35,8 @@ /** Used for fixing files which contains internal unsorted data */ public class RepairUnsortedFileCompactionPerformer extends ReadPointCompactionPerformer { - private final boolean rewriteFile; - - public RepairUnsortedFileCompactionPerformer(boolean rewriteFile) { + public RepairUnsortedFileCompactionPerformer() { super(); - this.rewriteFile = rewriteFile; } @Override @@ -52,7 +50,8 @@ protected AbstractCompactionWriter getCompactionWriter( @Override public void perform() throws Exception { - if (rewriteFile) { + TsFileResource resource = !seqFiles.isEmpty() ? seqFiles.get(0) : unseqFiles.get(0); + if (resource.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE) { super.perform(); } else { prepareTargetFile(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index 7173167541f7..48706fcb4c83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -158,7 +158,7 @@ protected void handleException(Logger logger, Exception e) { // these exceptions generally caused by unsorted data, mark all source files as NEED_TO_REPAIR for (TsFileResource resource : unsortedTsFileResources) { if (resource.getTsFileRepairStatus() != TsFileRepairStatus.CAN_NOT_REPAIR) { - resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR); + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); } } } else if (e instanceof InterruptedException diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java index b80f66737a01..7dcd312b1bc6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -301,6 +302,8 @@ private boolean shouldRollback() { private void rollback() throws IOException { // if the task has started, + targetTsfileResourceList = + targetTsfileResourceList == null ? Collections.emptyList() : targetTsfileResourceList; if (recoverMemoryStatus) { replaceTsFileInMemory( targetTsfileResourceList, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java index 7f1e23c956a6..c1fec56e0338 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.RepairUnsortedFileCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; +import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairDataFileScanUtil; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.RepairUnsortedFileCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus; @@ -59,7 +60,6 @@ public static long getInitialAllocatedFileTimestamp() { } private final TsFileResource sourceFile; - private final boolean rewriteFile; private CountDownLatch latch; public RepairUnsortedFileCompactionTask( @@ -73,74 +73,33 @@ public RepairUnsortedFileCompactionTask( tsFileManager, Collections.singletonList(sourceFile), sequence, - new RepairUnsortedFileCompactionPerformer(true), + new RepairUnsortedFileCompactionPerformer(), serialId); this.sourceFile = sourceFile; - this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator(); - this.rewriteFile = false; - } - - public RepairUnsortedFileCompactionTask( - long timePartition, - TsFileManager tsFileManager, - TsFileResource sourceFile, - boolean sequence, - boolean rewriteFile, - long serialId) { - super( - timePartition, - tsFileManager, - Collections.singletonList(sourceFile), - sequence, - new RepairUnsortedFileCompactionPerformer(rewriteFile), - serialId); - this.sourceFile = sourceFile; - if (rewriteFile) { + if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) { this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator(); } - this.rewriteFile = rewriteFile; - } - - public RepairUnsortedFileCompactionTask( - long timePartition, - TsFileManager tsFileManager, - TsFileResource sourceFile, - CountDownLatch latch, - boolean sequence, - long serialId) { - super( - timePartition, - tsFileManager, - Collections.singletonList(sourceFile), - sequence, - new RepairUnsortedFileCompactionPerformer(true), - serialId); - this.sourceFile = sourceFile; - this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator(); - this.latch = latch; - this.rewriteFile = false; } + // used for 'start repair data' public RepairUnsortedFileCompactionTask( long timePartition, TsFileManager tsFileManager, TsFileResource sourceFile, CountDownLatch latch, boolean sequence, - boolean rewriteFile, long serialId) { super( timePartition, tsFileManager, Collections.singletonList(sourceFile), sequence, - new RepairUnsortedFileCompactionPerformer(rewriteFile), + new RepairUnsortedFileCompactionPerformer(), serialId); this.sourceFile = sourceFile; - if (rewriteFile) { + if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) { this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator(); } - this.rewriteFile = rewriteFile; this.latch = latch; } @@ -205,7 +164,7 @@ protected void prepareTargetFiles() throws IOException { storageGroupName, dataRegionId); - if (rewriteFile) { + if (sourceFile.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE) { CompactionUtils.combineModsInInnerCompaction( filesView.sourceFilesInCompactionPerformer, filesView.targetFilesInPerformer); } else { @@ -219,6 +178,10 @@ protected void prepareTargetFiles() throws IOException { @Override protected boolean doCompaction() { + calculateRepairMethod(); + if (!sourceFile.getTsFileRepairStatus().isRepairCompactionCandidate()) { + return true; + } boolean isSuccess = super.doCompaction(); if (!isSuccess) { LOGGER.info("Failed to repair file {}", sourceFile.getTsFile().getAbsolutePath()); @@ -227,6 +190,27 @@ protected boolean doCompaction() { return isSuccess; } + private void calculateRepairMethod() { + if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_CHECK) { + return; + } + RepairDataFileScanUtil repairDataFileScanUtil = new RepairDataFileScanUtil(sourceFile); + repairDataFileScanUtil.scanTsFile(); + if (repairDataFileScanUtil.isBrokenFile()) { + sourceFile.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR); + return; + } + if (repairDataFileScanUtil.hasUnsortedData()) { + sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); + return; + } + if (sourceFile.isSeq()) { + sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE); + return; + } + sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NORMAL); + } + @Override public long getEstimatedMemoryCost() { if (innerSpaceEstimator != null && memoryCost == 0L) { @@ -252,7 +236,7 @@ public long getEstimatedMemoryCost() { @Override public boolean isDiskSpaceCheckPassed() { - if (!rewriteFile) { + if (sourceFile.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) { return true; } return super.isDiskSpaceCheckPassed(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java index 2a6d6e6099b1..bd944f7e3641 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java @@ -409,4 +409,12 @@ public boolean equalsOtherTask(AbstractCompactionTask otherTask) { public int hashCode() { return Objects.hash(fullyDirtyFiles, filesView.sourceFilesInCompactionPerformer, performer); } + + @Override + public boolean isDiskSpaceCheckPassed() { + if (this.partiallyDirtyFileSize == 0) { + return true; + } + return super.isDiskSpaceCheckPassed(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java index 8c224cb18f27..84a598d02fa3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java @@ -69,11 +69,17 @@ public class RepairDataFileScanUtil { private boolean hasUnsortedData; private boolean isBrokenFile; private long previousTime; + private boolean printLog; public RepairDataFileScanUtil(TsFileResource resource) { this.resource = resource; this.hasUnsortedData = false; this.previousTime = Long.MIN_VALUE; + this.printLog = false; + } + + public void setPrintLog(boolean printLog) { + this.printLog = printLog; } public void scanTsFile() { @@ -99,6 +105,12 @@ public void scanTsFile() { } } catch (CompactionLastTimeCheckFailedException lastTimeCheckFailedException) { this.hasUnsortedData = true; + if (printLog) { + logger.error( + "File {} has unsorted data: ", + resource.getTsFile().getPath(), + lastTimeCheckFailedException); + } } catch (Exception e) { // ignored the exception caused by thread interrupt if (Thread.currentThread().isInterrupted()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java index 0a263ca6ce3b..d044824a3f28 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java @@ -118,6 +118,7 @@ private void checkInternalUnsortedFileAndRepair(RepairTimePartition timePartitio } finally { sourceFile.readUnlock(); } + sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); LOGGER.info( "[RepairScheduler] {} need to repair because it has internal unsorted data", sourceFile); TsFileManager tsFileManager = timePartition.getTsFileManager(); @@ -152,6 +153,7 @@ private void checkOverlapInSequenceSpaceAndRepair(RepairTimePartition timePartit } checkTaskStatusAndMayStop(); CountDownLatch latch = new CountDownLatch(1); + overlapFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE); RepairUnsortedFileCompactionTask task = new RepairUnsortedFileCompactionTask( timePartition.getTimePartitionId(), @@ -159,7 +161,6 @@ private void checkOverlapInSequenceSpaceAndRepair(RepairTimePartition timePartit overlapFile, latch, true, - false, tsFileManager.getNextCompactionTaskId()); LOGGER.info( "[RepairScheduler] {} need to repair because it is overlapped with other files", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/UnsortedFileRepairTaskScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/UnsortedFileRepairTaskScheduler.java index 5cdf9493581e..5993f8bc9b37 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/UnsortedFileRepairTaskScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/UnsortedFileRepairTaskScheduler.java @@ -148,7 +148,7 @@ private void recoverRepairProgress(RepairTaskRecoverLogParser recoverLogParser) continue; } if (cannotRepairFiles.contains(resource.getTsFile().getName())) { - resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR); + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java index 65401dcef256..244246e66191 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java @@ -31,7 +31,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.IInnerSeqSpaceSelector; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.IInnerUnseqSpaceSelector; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; @@ -163,8 +162,7 @@ private List> selectTsFileResourcesByLevel(int level) throw private boolean cannotSelectCurrentFileToNormalCompaction(TsFileResource resource) { return resource.getStatus() != TsFileResourceStatus.NORMAL - || resource.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR - || resource.getTsFileRepairStatus() == TsFileRepairStatus.CAN_NOT_REPAIR; + || !resource.getTsFileRepairStatus().isNormalCompactionCandidate(); } /** @@ -212,7 +210,7 @@ private List selectFileNeedToRepair() { List taskList = new ArrayList<>(); for (TsFileResource resource : tsFileResources) { if (resource.getStatus() == TsFileResourceStatus.NORMAL - && resource.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR) { + && resource.getTsFileRepairStatus().isRepairCompactionCandidate()) { taskList.add( new RepairUnsortedFileCompactionTask( timePartition, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileRepairStatus.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileRepairStatus.java index 933551d227b9..72ef7f816fed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileRepairStatus.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileRepairStatus.java @@ -21,6 +21,18 @@ public enum TsFileRepairStatus { NORMAL, - NEED_TO_REPAIR, - CAN_NOT_REPAIR + NEED_TO_CHECK, + NEED_TO_REPAIR_BY_REWRITE, + NEED_TO_REPAIR_BY_MOVE, + CAN_NOT_REPAIR; + + public boolean isNormalCompactionCandidate() { + return this == NORMAL; + } + + public boolean isRepairCompactionCandidate() { + return this == NEED_TO_CHECK + || this == NEED_TO_REPAIR_BY_REWRITE + || this == NEED_TO_REPAIR_BY_MOVE; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java index b4275aa61cb0..1fbd2c8ee717 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java @@ -112,6 +112,7 @@ public void testRepairUnsortedDataBetweenPageWithNonAlignedSeries() throws IOExc writer.endFile(); } Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); RepairUnsortedFileCompactionTask task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); task.start(); @@ -136,6 +137,7 @@ public void testRepairUnsortedDataBetweenPageWithAlignedSeries() throws IOExcept writer.endFile(); } Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); RepairUnsortedFileCompactionTask task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); task.start(); @@ -164,6 +166,7 @@ public void testRepairUnsortedDataInOnePageWithNonAlignedSeries() throws IOExcep writer.endFile(); } Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); RepairUnsortedFileCompactionTask task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); task.start(); @@ -196,6 +199,7 @@ public void testRepairUnsortedDataInOnePageWithMultiNonAlignedSeries() throws IO writer.endFile(); } Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); RepairUnsortedFileCompactionTask task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); task.start(); @@ -224,6 +228,7 @@ public void testRepairUnsortedDataInOnePageWithUnseqFile() throws IOException { writer.endFile(); } Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); RepairUnsortedFileCompactionTask task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); task.start(); @@ -252,6 +257,7 @@ public void testRepairUnsortedDataInOnePageWithAlignedSeries() throws IOExceptio writer.endFile(); } Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource)); + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); RepairUnsortedFileCompactionTask task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, resource.isSeq(), 0); task.start(); @@ -307,7 +313,7 @@ public void testMarkFileAndRepairWithInnerSeqSpaceCompactionTask() Assert.assertFalse(task.start()); for (TsFileResource resource : tsFileManager.getTsFileList(true)) { - Assert.assertEquals(resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR); + Assert.assertTrue(resource.getTsFileRepairStatus().isRepairCompactionCandidate()); } long initialFinishedCompactionTaskNum = @@ -382,7 +388,7 @@ public void testMarkFileAndRepairWithInnerUnSeqSpaceCompactionTask() Assert.assertFalse(task.start()); for (TsFileResource resource : tsFileManager.getTsFileList(true)) { - Assert.assertEquals(resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR); + Assert.assertTrue(resource.getTsFileRepairStatus().isRepairCompactionCandidate()); } long initialFinishedCompactionTaskNum = @@ -458,7 +464,7 @@ public void testMarkFileAndRepairWithCrossSpaceCompactionTask() Assert.assertFalse(task.start()); for (TsFileResource resource : tsFileManager.getTsFileList(true)) { - Assert.assertEquals(resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR); + Assert.assertTrue(resource.getTsFileRepairStatus().isRepairCompactionCandidate()); } long initialFinishedCompactionTaskNum = @@ -521,8 +527,9 @@ public void testRepairOverlapBetweenFile() throws IOException { tsFileManager.addAll(seqResources, true); Assert.assertFalse(TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(seqResources)); + seqResource2.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE); RepairUnsortedFileCompactionTask task = - new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2, true, false, 0); + new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2, true, 0); Assert.assertTrue(task.start()); Assert.assertEquals(1, tsFileManager.getTsFileList(true).size()); Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); @@ -571,8 +578,9 @@ public void testRepairOverlapBetweenFileWithModFile() throws IOException, Illega tsFileManager.addAll(seqResources, true); Assert.assertFalse(TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(seqResources)); + seqResource2.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE); RepairUnsortedFileCompactionTask task = - new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2, true, false, 0); + new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2, true, 0); Assert.assertTrue(task.start()); Assert.assertEquals(1, tsFileManager.getTsFileList(true).size()); Assert.assertEquals(1, tsFileManager.getTsFileList(false).size()); @@ -630,10 +638,17 @@ public void testEstimateRepairCompactionMemory() throws IOException { writer.endChunkGroup(); writer.endFile(); } + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); RepairUnsortedFileCompactionTask task = - new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, true, 0); + new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0); Assert.assertTrue(task.getEstimatedMemoryCost() > 0); - task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, false, 0); + + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); + task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, false, 0); + Assert.assertTrue(task.getEstimatedMemoryCost() > 0); + + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE); + task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, false, 0); Assert.assertEquals(0, task.getEstimatedMemoryCost()); } @@ -657,8 +672,9 @@ public void testMergeAlignedSeriesPointWithSameTimestamp() throws IOException { writer.endChunkGroup(); writer.endFile(); } + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); RepairUnsortedFileCompactionTask task = - new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, true, 0); + new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0); Assert.assertTrue(task.start()); TsFileResource target = tsFileManager.getTsFileList(false).get(0); try (TsFileSequenceReader reader = new TsFileSequenceReader(target.getTsFilePath())) { @@ -704,8 +720,9 @@ public void testSplitChunk() throws IOException { writer.endChunkGroup(); writer.endFile(); } + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); RepairUnsortedFileCompactionTask task = - new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, true, 0); + new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0); Assert.assertTrue(task.start()); TsFileResource target = tsFileManager.getTsFileList(false).get(0); try (TsFileSequenceReader reader = new TsFileSequenceReader(target.getTsFilePath())) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java index 646626fb8c4f..92337fe151cd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java @@ -198,7 +198,7 @@ public void testRecoverRepairScheduleSkipRepairedTimePartitionAndMarkFile() thro scheduler.run(); Assert.assertEquals(3, tsFileManager.getTsFileList(true).size()); // check whether the repair status is marked correctly - Assert.assertEquals(TsFileRepairStatus.NEED_TO_REPAIR, seqResource3.getTsFileRepairStatus()); + Assert.assertTrue(seqResource3.getTsFileRepairStatus().isRepairCompactionCandidate()); } @Test diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 945199534fc9..aab967e054f5 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1170,6 +1170,11 @@ enable_unseq_space_compaction=true # Datatype: boolean enable_cross_space_compaction=true +# enable auto repair unsorted file by compaction +# effectiveMode: hot_reload +# Datatype: boolean +enable_auto_repair_compaction=true + # the selector of cross space compaction task # effectiveMode: restart # Options: rewrite From bca16b46c0720e51e02929df16b2f73ba774af42 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 25 Sep 2024 09:41:35 +0800 Subject: [PATCH 2/6] add config --- .../execute/task/RepairUnsortedFileCompactionTask.java | 2 +- .../compaction/repair/RepairDataFileScanUtil.java | 8 ++++---- .../selector/impl/SizeTieredCompactionSelector.java | 3 +++ 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java index c1fec56e0338..52bee85c50db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java @@ -194,7 +194,7 @@ private void calculateRepairMethod() { if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_CHECK) { return; } - RepairDataFileScanUtil repairDataFileScanUtil = new RepairDataFileScanUtil(sourceFile); + RepairDataFileScanUtil repairDataFileScanUtil = new RepairDataFileScanUtil(sourceFile, true); repairDataFileScanUtil.scanTsFile(); if (repairDataFileScanUtil.isBrokenFile()) { sourceFile.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java index 84a598d02fa3..9b9f55af9fab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java @@ -72,13 +72,13 @@ public class RepairDataFileScanUtil { private boolean printLog; public RepairDataFileScanUtil(TsFileResource resource) { + this(resource, false); + } + + public RepairDataFileScanUtil(TsFileResource resource, boolean printLog) { this.resource = resource; this.hasUnsortedData = false; this.previousTime = Long.MIN_VALUE; - this.printLog = false; - } - - public void setPrintLog(boolean printLog) { this.printLog = printLog; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java index 244246e66191..41324f4fafc7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java @@ -207,6 +207,9 @@ protected List selectTaskBaseOnLevel() } private List selectFileNeedToRepair() { + if (!config.isEnableAutoRepairCompaction()) { + return Collections.emptyList(); + } List taskList = new ArrayList<>(); for (TsFileResource resource : tsFileResources) { if (resource.getStatus() == TsFileResourceStatus.NORMAL From 6bc094114b34aeb79aa02347b0619e6a4e574965 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 25 Sep 2024 09:56:35 +0800 Subject: [PATCH 3/6] delete duplicate check --- .../compaction/execute/task/SettleCompactionTask.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java index bd944f7e3641..2a6d6e6099b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java @@ -409,12 +409,4 @@ public boolean equalsOtherTask(AbstractCompactionTask otherTask) { public int hashCode() { return Objects.hash(fullyDirtyFiles, filesView.sourceFilesInCompactionPerformer, performer); } - - @Override - public boolean isDiskSpaceCheckPassed() { - if (this.partiallyDirtyFileSize == 0) { - return true; - } - return super.isDiskSpaceCheckPassed(); - } } From abdbfc45940f1121c426eea20f9670e53091fb23 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 25 Sep 2024 10:16:57 +0800 Subject: [PATCH 4/6] modify log --- .../execute/task/SettleCompactionTask.java | 30 ++++++++++++------- .../RewriteCrossSpaceCompactionSelector.java | 4 +++ .../impl/SizeTieredCompactionSelector.java | 3 +- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java index 2a6d6e6099b1..c3641b415b9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java @@ -156,17 +156,25 @@ protected boolean doCompaction() { double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; if (isSuccess) { - LOGGER.info( - "{}-{} [Compaction] SettleCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s." - + "Fully_dirty files num is {} and partially_dirty files num is {}.", - storageGroupName, - dataRegionId, - String.format("%.2f", costTime), - String.format( - "%.2f", - (fullyDirtyFileSize + partiallyDirtyFileSize) / 1024.0d / 1024.0d / costTime), - fullyDirtyFiles.size(), - filesView.sourceFilesInCompactionPerformer.size()); + if (partiallyDirtyFileSize == 0) { + LOGGER.info( + "{}-{} [Compaction] SettleCompaction task finishes successfully, time cost is {} s." + + "Fully_dirty files num is {}.", + storageGroupName, + dataRegionId, + String.format("%.2f", costTime), + fullyDirtyFiles.size()); + } else { + LOGGER.info( + "{}-{} [Compaction] SettleCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s." + + "Fully_dirty files num is {} and partially_dirty files num is {}.", + storageGroupName, + dataRegionId, + String.format("%.2f", costTime), + String.format("%.2f", (partiallyDirtyFileSize) / 1024.0d / 1024.0d / costTime), + fullyDirtyFiles.size(), + filesView.sourceFilesInCompactionPerformer.size()); + } } else { LOGGER.info( "{}-{} [Compaction] SettleCompaction task finishes with some error, time cost is {} s." diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java index dc1b541158f2..31c4823f784c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.MergeException; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ICompactionSelector; @@ -336,6 +337,9 @@ private boolean canSubmitCrossTask( @Override public List selectCrossSpaceTask( List sequenceFileList, List unsequenceFilelist) { + if (!CompactionUtils.isDiskHasSpace()) { + return Collections.emptyList(); + } return selectCrossSpaceTask(sequenceFileList, unsequenceFilelist, false); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java index 41324f4fafc7..022b528ba5af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.RepairUnsortedFileCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.comparator.ICompactionTaskComparator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.IInnerSeqSpaceSelector; @@ -182,7 +183,7 @@ public List selectInnerSpaceTask(List try { // 1. select compaction task based on file which need to repair List taskList = selectFileNeedToRepair(); - if (!taskList.isEmpty()) { + if (!taskList.isEmpty() || !CompactionUtils.isDiskHasSpace()) { return taskList; } // 2. if a suitable compaction task is not selected in the first step, select the compaction From 163233e4919dda5049f7ae61c0acb264c36899e0 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 25 Sep 2024 10:25:25 +0800 Subject: [PATCH 5/6] add ut --- .../RepairUnsortedFileCompactionTest.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java index 1fbd2c8ee717..53039f4d1faa 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java @@ -706,6 +706,71 @@ public void testMergeAlignedSeriesPointWithSameTimestamp() throws IOException { } } + @Test + public void testRepairFilesWithCheck1() throws IOException { + TsFileResource resource = createEmptyFileAndResource(true); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d1"); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s1", "s2", "s3"), + new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20)}}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(true, false, false)); + writer.endChunkGroup(); + writer.endFile(); + } + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); + RepairUnsortedFileCompactionTask task = + new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0, true); + Assert.assertTrue(task.start()); + Assert.assertEquals( + resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE); + } + + @Test + public void testRepairFilesWithCheck2() throws IOException { + TsFileResource resource = createEmptyFileAndResource(true); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d1"); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s1", "s2", "s3"), + new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20), new TimeRange(1, 10)}}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(true, false, false)); + writer.endChunkGroup(); + writer.endFile(); + } + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); + RepairUnsortedFileCompactionTask task = + new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0, true); + Assert.assertTrue(task.start()); + Assert.assertEquals( + resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); + } + + @Test + public void testRepairFilesWithCheck3() throws IOException { + TsFileResource resource = createEmptyFileAndResource(false); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d1"); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s1", "s2", "s3"), + new TimeRange[][] {new TimeRange[] {new TimeRange(10, 20)}}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(true, false, false)); + writer.endChunkGroup(); + writer.endFile(); + } + resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); + RepairUnsortedFileCompactionTask task = + new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0, true); + Assert.assertTrue(task.start()); + Assert.assertEquals(resource.getTsFileRepairStatus(), TsFileRepairStatus.NORMAL); + } + @Test public void testSplitChunk() throws IOException { TsFileResource resource = createEmptyFileAndResource(true); From 4cb5325cb2a5c027ffb3058dfd6b8a784eb39c14 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 25 Sep 2024 12:08:09 +0800 Subject: [PATCH 6/6] fix compile --- .../compaction/repair/RepairUnsortedFileCompactionTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java index 53039f4d1faa..3122aaed50af 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java @@ -722,7 +722,7 @@ public void testRepairFilesWithCheck1() throws IOException { } resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); RepairUnsortedFileCompactionTask task = - new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0, true); + new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0); Assert.assertTrue(task.start()); Assert.assertEquals( resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE); @@ -744,7 +744,7 @@ public void testRepairFilesWithCheck2() throws IOException { } resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); RepairUnsortedFileCompactionTask task = - new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0, true); + new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0); Assert.assertTrue(task.start()); Assert.assertEquals( resource.getTsFileRepairStatus(), TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE); @@ -766,7 +766,7 @@ public void testRepairFilesWithCheck3() throws IOException { } resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK); RepairUnsortedFileCompactionTask task = - new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0, true); + new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 0); Assert.assertTrue(task.start()); Assert.assertEquals(resource.getTsFileRepairStatus(), TsFileRepairStatus.NORMAL); }