Skip to content

Commit

Permalink
[to rc/1.3.3] Enhance repair compaction & exception handle with full …
Browse files Browse the repository at this point in the history
…disks (#13613)

* enhance repair compaction

* add config

* delete duplicate check

* modify log

* add ut

* fix compile
  • Loading branch information
shuwenwei authored Sep 27, 2024
1 parent 547f3eb commit a3c4628
Show file tree
Hide file tree
Showing 17 changed files with 225 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ public class IoTDBConfig {
/** Compact the unsequence files into the overlapped sequence files */
private volatile boolean enableCrossSpaceCompaction = true;

/** Enable auto repair compaction */
private volatile boolean enableAutoRepairCompaction = true;

/** The buffer for sort operation */
private long sortBufferSize = 1024 * 1024L;

Expand Down Expand Up @@ -2863,6 +2866,14 @@ public void setEnableCrossSpaceCompaction(boolean enableCrossSpaceCompaction) {
this.enableCrossSpaceCompaction = enableCrossSpaceCompaction;
}

public boolean isEnableAutoRepairCompaction() {
return enableAutoRepairCompaction;
}

public void setEnableAutoRepairCompaction(boolean enableAutoRepairCompaction) {
this.enableAutoRepairCompaction = enableAutoRepairCompaction;
}

public InnerSequenceCompactionSelector getInnerSequenceCompactionSelector() {
return innerSequenceCompactionSelector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
"compaction_schedule_interval_in_ms",
Long.toString(conf.getCompactionScheduleIntervalInMs()))));

conf.setEnableAutoRepairCompaction(
Boolean.parseBoolean(
properties.getProperty(
"enable_auto_repair_compaction",
Boolean.toString(conf.isEnableAutoRepairCompaction()))));

conf.setEnableCrossSpaceCompaction(
Boolean.parseBoolean(
properties.getProperty(
Expand Down Expand Up @@ -1209,6 +1215,12 @@ private void loadCompactionHotModifiedProps(Properties properties)
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
CompactionTaskManager.getInstance()
.setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());

conf.setEnableAutoRepairCompaction(
Boolean.parseBoolean(
properties.getProperty(
"enable_auto_repair_compaction",
Boolean.toString(conf.isEnableAutoRepairCompaction()))));
}

private boolean loadCompactionTaskHotModifiedProps(Properties properties) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1647,9 +1647,12 @@ public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor
|| tsFileProcessor.alreadyMarkedClosing()) {
return CompletableFuture.completedFuture(null);
}
TsFileResource resource = tsFileProcessor.getTsFileResource();
logger.info(
"Async close tsfile: {}",
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
"Async close tsfile: {}, file start time: {}, file end time: {}",
resource.getTsFile().getAbsolutePath(),
resource.getFileStartTime(),
resource.getFileEndTime());
Future<?> future;
if (sequence) {
closingSequenceTsFileProcessor.add(tsFileProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.RepairUnsortedFileCompactionWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
Expand All @@ -34,11 +35,8 @@
/** Used for fixing files which contains internal unsorted data */
public class RepairUnsortedFileCompactionPerformer extends ReadPointCompactionPerformer {

private final boolean rewriteFile;

public RepairUnsortedFileCompactionPerformer(boolean rewriteFile) {
public RepairUnsortedFileCompactionPerformer() {
super();
this.rewriteFile = rewriteFile;
}

@Override
Expand All @@ -52,7 +50,8 @@ protected AbstractCompactionWriter getCompactionWriter(

@Override
public void perform() throws Exception {
if (rewriteFile) {
TsFileResource resource = !seqFiles.isEmpty() ? seqFiles.get(0) : unseqFiles.get(0);
if (resource.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE) {
super.perform();
} else {
prepareTargetFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected void handleException(Logger logger, Exception e) {
// these exceptions generally caused by unsorted data, mark all source files as NEED_TO_REPAIR
for (TsFileResource resource : unsortedTsFileResources) {
if (resource.getTsFileRepairStatus() != TsFileRepairStatus.CAN_NOT_REPAIR) {
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR);
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
}
}
} else if (e instanceof InterruptedException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -301,6 +302,8 @@ private boolean shouldRollback() {

private void rollback() throws IOException {
// if the task has started,
targetTsfileResourceList =
targetTsfileResourceList == null ? Collections.emptyList() : targetTsfileResourceList;
if (recoverMemoryStatus) {
replaceTsFileInMemory(
targetTsfileResourceList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.RepairUnsortedFileCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairDataFileScanUtil;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.RepairUnsortedFileCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
Expand Down Expand Up @@ -59,7 +60,6 @@ public static long getInitialAllocatedFileTimestamp() {
}

private final TsFileResource sourceFile;
private final boolean rewriteFile;
private CountDownLatch latch;

public RepairUnsortedFileCompactionTask(
Expand All @@ -73,74 +73,33 @@ public RepairUnsortedFileCompactionTask(
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(true),
new RepairUnsortedFileCompactionPerformer(),
serialId);
this.sourceFile = sourceFile;
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
this.rewriteFile = false;
}

public RepairUnsortedFileCompactionTask(
long timePartition,
TsFileManager tsFileManager,
TsFileResource sourceFile,
boolean sequence,
boolean rewriteFile,
long serialId) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(rewriteFile),
serialId);
this.sourceFile = sourceFile;
if (rewriteFile) {
if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) {
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
}
this.rewriteFile = rewriteFile;
}

public RepairUnsortedFileCompactionTask(
long timePartition,
TsFileManager tsFileManager,
TsFileResource sourceFile,
CountDownLatch latch,
boolean sequence,
long serialId) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(true),
serialId);
this.sourceFile = sourceFile;
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
this.latch = latch;
this.rewriteFile = false;
}

// used for 'start repair data'
public RepairUnsortedFileCompactionTask(
long timePartition,
TsFileManager tsFileManager,
TsFileResource sourceFile,
CountDownLatch latch,
boolean sequence,
boolean rewriteFile,
long serialId) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(rewriteFile),
new RepairUnsortedFileCompactionPerformer(),
serialId);
this.sourceFile = sourceFile;
if (rewriteFile) {
if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) {
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
}
this.rewriteFile = rewriteFile;
this.latch = latch;
}

Expand Down Expand Up @@ -205,7 +164,7 @@ protected void prepareTargetFiles() throws IOException {
storageGroupName,
dataRegionId);

if (rewriteFile) {
if (sourceFile.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE) {
CompactionUtils.combineModsInInnerCompaction(
filesView.sourceFilesInCompactionPerformer, filesView.targetFilesInPerformer);
} else {
Expand All @@ -219,6 +178,10 @@ protected void prepareTargetFiles() throws IOException {

@Override
protected boolean doCompaction() {
calculateRepairMethod();
if (!sourceFile.getTsFileRepairStatus().isRepairCompactionCandidate()) {
return true;
}
boolean isSuccess = super.doCompaction();
if (!isSuccess) {
LOGGER.info("Failed to repair file {}", sourceFile.getTsFile().getAbsolutePath());
Expand All @@ -227,6 +190,27 @@ protected boolean doCompaction() {
return isSuccess;
}

private void calculateRepairMethod() {
if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_CHECK) {
return;
}
RepairDataFileScanUtil repairDataFileScanUtil = new RepairDataFileScanUtil(sourceFile, true);
repairDataFileScanUtil.scanTsFile();
if (repairDataFileScanUtil.isBrokenFile()) {
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR);
return;
}
if (repairDataFileScanUtil.hasUnsortedData()) {
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
return;
}
if (sourceFile.isSeq()) {
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE);
return;
}
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NORMAL);
}

@Override
public long getEstimatedMemoryCost() {
if (innerSpaceEstimator != null && memoryCost == 0L) {
Expand All @@ -252,7 +236,7 @@ public long getEstimatedMemoryCost() {

@Override
public boolean isDiskSpaceCheckPassed() {
if (!rewriteFile) {
if (sourceFile.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) {
return true;
}
return super.isDiskSpaceCheckPassed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,25 @@ protected boolean doCompaction() {

double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
if (isSuccess) {
LOGGER.info(
"{}-{} [Compaction] SettleCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s."
+ "Fully_dirty files num is {} and partially_dirty files num is {}.",
storageGroupName,
dataRegionId,
String.format("%.2f", costTime),
String.format(
"%.2f",
(fullyDirtyFileSize + partiallyDirtyFileSize) / 1024.0d / 1024.0d / costTime),
fullyDirtyFiles.size(),
filesView.sourceFilesInCompactionPerformer.size());
if (partiallyDirtyFileSize == 0) {
LOGGER.info(
"{}-{} [Compaction] SettleCompaction task finishes successfully, time cost is {} s."
+ "Fully_dirty files num is {}.",
storageGroupName,
dataRegionId,
String.format("%.2f", costTime),
fullyDirtyFiles.size());
} else {
LOGGER.info(
"{}-{} [Compaction] SettleCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s."
+ "Fully_dirty files num is {} and partially_dirty files num is {}.",
storageGroupName,
dataRegionId,
String.format("%.2f", costTime),
String.format("%.2f", (partiallyDirtyFileSize) / 1024.0d / 1024.0d / costTime),
fullyDirtyFiles.size(),
filesView.sourceFilesInCompactionPerformer.size());
}
} else {
LOGGER.info(
"{}-{} [Compaction] SettleCompaction task finishes with some error, time cost is {} s."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ public class RepairDataFileScanUtil {
private boolean hasUnsortedData;
private boolean isBrokenFile;
private long previousTime;
private boolean printLog;

public RepairDataFileScanUtil(TsFileResource resource) {
this(resource, false);
}

public RepairDataFileScanUtil(TsFileResource resource, boolean printLog) {
this.resource = resource;
this.hasUnsortedData = false;
this.previousTime = Long.MIN_VALUE;
this.printLog = printLog;
}

public void scanTsFile() {
Expand All @@ -99,6 +105,12 @@ public void scanTsFile() {
}
} catch (CompactionLastTimeCheckFailedException lastTimeCheckFailedException) {
this.hasUnsortedData = true;
if (printLog) {
logger.error(
"File {} has unsorted data: ",
resource.getTsFile().getPath(),
lastTimeCheckFailedException);
}
} catch (Exception e) {
// ignored the exception caused by thread interrupt
if (Thread.currentThread().isInterrupted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ private void checkInternalUnsortedFileAndRepair(RepairTimePartition timePartitio
} finally {
sourceFile.readUnlock();
}
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
LOGGER.info(
"[RepairScheduler] {} need to repair because it has internal unsorted data", sourceFile);
TsFileManager tsFileManager = timePartition.getTsFileManager();
Expand Down Expand Up @@ -152,14 +153,14 @@ private void checkOverlapInSequenceSpaceAndRepair(RepairTimePartition timePartit
}
checkTaskStatusAndMayStop();
CountDownLatch latch = new CountDownLatch(1);
overlapFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE);
RepairUnsortedFileCompactionTask task =
new RepairUnsortedFileCompactionTask(
timePartition.getTimePartitionId(),
timePartition.getTsFileManager(),
overlapFile,
latch,
true,
false,
tsFileManager.getNextCompactionTaskId());
LOGGER.info(
"[RepairScheduler] {} need to repair because it is overlapped with other files",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void recoverRepairProgress(RepairTaskRecoverLogParser recoverLogParser)
continue;
}
if (cannotRepairFiles.contains(resource.getTsFile().getName())) {
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR);
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ICompactionSelector;
Expand Down Expand Up @@ -327,6 +328,9 @@ private boolean canSubmitCrossTask(
@Override
public List<CrossCompactionTaskResource> selectCrossSpaceTask(
List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFilelist) {
if (!CompactionUtils.isDiskHasSpace()) {
return Collections.emptyList();
}
return selectCrossSpaceTask(sequenceFileList, unsequenceFilelist, false);
}

Expand Down
Loading

0 comments on commit a3c4628

Please sign in to comment.