Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to rc/1.3.3] Enhance repair compaction & exception handle with full disks #13613

Merged
merged 7 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ public class IoTDBConfig {
/** Compact the unsequence files into the overlapped sequence files */
private volatile boolean enableCrossSpaceCompaction = true;

/** Enable auto repair compaction */
private volatile boolean enableAutoRepairCompaction = true;

/** The buffer for sort operation */
private long sortBufferSize = 1024 * 1024L;

Expand Down Expand Up @@ -2863,6 +2866,14 @@ public void setEnableCrossSpaceCompaction(boolean enableCrossSpaceCompaction) {
this.enableCrossSpaceCompaction = enableCrossSpaceCompaction;
}

public boolean isEnableAutoRepairCompaction() {
return enableAutoRepairCompaction;
}

public void setEnableAutoRepairCompaction(boolean enableAutoRepairCompaction) {
this.enableAutoRepairCompaction = enableAutoRepairCompaction;
}

public InnerSequenceCompactionSelector getInnerSequenceCompactionSelector() {
return innerSequenceCompactionSelector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
"compaction_schedule_interval_in_ms",
Long.toString(conf.getCompactionScheduleIntervalInMs()))));

conf.setEnableAutoRepairCompaction(
Boolean.parseBoolean(
properties.getProperty(
"enable_auto_repair_compaction",
Boolean.toString(conf.isEnableAutoRepairCompaction()))));

conf.setEnableCrossSpaceCompaction(
Boolean.parseBoolean(
properties.getProperty(
Expand Down Expand Up @@ -1209,6 +1215,12 @@ private void loadCompactionHotModifiedProps(Properties properties)
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
CompactionTaskManager.getInstance()
.setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());

conf.setEnableAutoRepairCompaction(
Boolean.parseBoolean(
properties.getProperty(
"enable_auto_repair_compaction",
Boolean.toString(conf.isEnableAutoRepairCompaction()))));
}

private boolean loadCompactionTaskHotModifiedProps(Properties properties) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1647,9 +1647,12 @@ public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor
|| tsFileProcessor.alreadyMarkedClosing()) {
return CompletableFuture.completedFuture(null);
}
TsFileResource resource = tsFileProcessor.getTsFileResource();
logger.info(
"Async close tsfile: {}",
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
"Async close tsfile: {}, file start time: {}, file end time: {}",
resource.getTsFile().getAbsolutePath(),
resource.getFileStartTime(),
resource.getFileEndTime());
Future<?> future;
if (sequence) {
closingSequenceTsFileProcessor.add(tsFileProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.RepairUnsortedFileCompactionWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
Expand All @@ -34,11 +35,8 @@
/** Used for fixing files which contains internal unsorted data */
public class RepairUnsortedFileCompactionPerformer extends ReadPointCompactionPerformer {

private final boolean rewriteFile;

public RepairUnsortedFileCompactionPerformer(boolean rewriteFile) {
public RepairUnsortedFileCompactionPerformer() {
super();
this.rewriteFile = rewriteFile;
}

@Override
Expand All @@ -52,7 +50,8 @@ protected AbstractCompactionWriter getCompactionWriter(

@Override
public void perform() throws Exception {
if (rewriteFile) {
TsFileResource resource = !seqFiles.isEmpty() ? seqFiles.get(0) : unseqFiles.get(0);
if (resource.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE) {
super.perform();
} else {
prepareTargetFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected void handleException(Logger logger, Exception e) {
// these exceptions generally caused by unsorted data, mark all source files as NEED_TO_REPAIR
for (TsFileResource resource : unsortedTsFileResources) {
if (resource.getTsFileRepairStatus() != TsFileRepairStatus.CAN_NOT_REPAIR) {
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR);
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
}
}
} else if (e instanceof InterruptedException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -301,6 +302,8 @@ private boolean shouldRollback() {

private void rollback() throws IOException {
// if the task has started,
targetTsfileResourceList =
targetTsfileResourceList == null ? Collections.emptyList() : targetTsfileResourceList;
if (recoverMemoryStatus) {
replaceTsFileInMemory(
targetTsfileResourceList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.RepairUnsortedFileCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairDataFileScanUtil;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.RepairUnsortedFileCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
Expand Down Expand Up @@ -59,7 +60,6 @@ public static long getInitialAllocatedFileTimestamp() {
}

private final TsFileResource sourceFile;
private final boolean rewriteFile;
private CountDownLatch latch;

public RepairUnsortedFileCompactionTask(
Expand All @@ -73,74 +73,33 @@ public RepairUnsortedFileCompactionTask(
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(true),
new RepairUnsortedFileCompactionPerformer(),
serialId);
this.sourceFile = sourceFile;
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
this.rewriteFile = false;
}

public RepairUnsortedFileCompactionTask(
long timePartition,
TsFileManager tsFileManager,
TsFileResource sourceFile,
boolean sequence,
boolean rewriteFile,
long serialId) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(rewriteFile),
serialId);
this.sourceFile = sourceFile;
if (rewriteFile) {
if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) {
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
}
this.rewriteFile = rewriteFile;
}

public RepairUnsortedFileCompactionTask(
long timePartition,
TsFileManager tsFileManager,
TsFileResource sourceFile,
CountDownLatch latch,
boolean sequence,
long serialId) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(true),
serialId);
this.sourceFile = sourceFile;
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
this.latch = latch;
this.rewriteFile = false;
}

// used for 'start repair data'
public RepairUnsortedFileCompactionTask(
long timePartition,
TsFileManager tsFileManager,
TsFileResource sourceFile,
CountDownLatch latch,
boolean sequence,
boolean rewriteFile,
long serialId) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(rewriteFile),
new RepairUnsortedFileCompactionPerformer(),
serialId);
this.sourceFile = sourceFile;
if (rewriteFile) {
if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) {
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
}
this.rewriteFile = rewriteFile;
this.latch = latch;
}

Expand Down Expand Up @@ -205,7 +164,7 @@ protected void prepareTargetFiles() throws IOException {
storageGroupName,
dataRegionId);

if (rewriteFile) {
if (sourceFile.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE) {
CompactionUtils.combineModsInInnerCompaction(
filesView.sourceFilesInCompactionPerformer, filesView.targetFilesInPerformer);
} else {
Expand All @@ -219,6 +178,10 @@ protected void prepareTargetFiles() throws IOException {

@Override
protected boolean doCompaction() {
calculateRepairMethod();
if (!sourceFile.getTsFileRepairStatus().isRepairCompactionCandidate()) {
return true;
}
boolean isSuccess = super.doCompaction();
if (!isSuccess) {
LOGGER.info("Failed to repair file {}", sourceFile.getTsFile().getAbsolutePath());
Expand All @@ -227,6 +190,27 @@ protected boolean doCompaction() {
return isSuccess;
}

private void calculateRepairMethod() {
if (this.sourceFile.getTsFileRepairStatus() != TsFileRepairStatus.NEED_TO_CHECK) {
return;
}
RepairDataFileScanUtil repairDataFileScanUtil = new RepairDataFileScanUtil(sourceFile, true);
repairDataFileScanUtil.scanTsFile();
if (repairDataFileScanUtil.isBrokenFile()) {
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR);
return;
}
if (repairDataFileScanUtil.hasUnsortedData()) {
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
return;
}
if (sourceFile.isSeq()) {
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE);
return;
}
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NORMAL);
}

@Override
public long getEstimatedMemoryCost() {
if (innerSpaceEstimator != null && memoryCost == 0L) {
Expand All @@ -252,7 +236,7 @@ public long getEstimatedMemoryCost() {

@Override
public boolean isDiskSpaceCheckPassed() {
if (!rewriteFile) {
if (sourceFile.getTsFileRepairStatus() == TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) {
return true;
}
return super.isDiskSpaceCheckPassed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,25 @@ protected boolean doCompaction() {

double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
if (isSuccess) {
LOGGER.info(
"{}-{} [Compaction] SettleCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s."
+ "Fully_dirty files num is {} and partially_dirty files num is {}.",
storageGroupName,
dataRegionId,
String.format("%.2f", costTime),
String.format(
"%.2f",
(fullyDirtyFileSize + partiallyDirtyFileSize) / 1024.0d / 1024.0d / costTime),
fullyDirtyFiles.size(),
filesView.sourceFilesInCompactionPerformer.size());
if (partiallyDirtyFileSize == 0) {
LOGGER.info(
"{}-{} [Compaction] SettleCompaction task finishes successfully, time cost is {} s."
+ "Fully_dirty files num is {}.",
storageGroupName,
dataRegionId,
String.format("%.2f", costTime),
fullyDirtyFiles.size());
} else {
LOGGER.info(
"{}-{} [Compaction] SettleCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s."
+ "Fully_dirty files num is {} and partially_dirty files num is {}.",
storageGroupName,
dataRegionId,
String.format("%.2f", costTime),
String.format("%.2f", (partiallyDirtyFileSize) / 1024.0d / 1024.0d / costTime),
fullyDirtyFiles.size(),
filesView.sourceFilesInCompactionPerformer.size());
}
} else {
LOGGER.info(
"{}-{} [Compaction] SettleCompaction task finishes with some error, time cost is {} s."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ public class RepairDataFileScanUtil {
private boolean hasUnsortedData;
private boolean isBrokenFile;
private long previousTime;
private boolean printLog;

public RepairDataFileScanUtil(TsFileResource resource) {
this(resource, false);
}

public RepairDataFileScanUtil(TsFileResource resource, boolean printLog) {
this.resource = resource;
this.hasUnsortedData = false;
this.previousTime = Long.MIN_VALUE;
this.printLog = printLog;
}

public void scanTsFile() {
Expand All @@ -99,6 +105,12 @@ public void scanTsFile() {
}
} catch (CompactionLastTimeCheckFailedException lastTimeCheckFailedException) {
this.hasUnsortedData = true;
if (printLog) {
logger.error(
"File {} has unsorted data: ",
resource.getTsFile().getPath(),
lastTimeCheckFailedException);
}
} catch (Exception e) {
// ignored the exception caused by thread interrupt
if (Thread.currentThread().isInterrupted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ private void checkInternalUnsortedFileAndRepair(RepairTimePartition timePartitio
} finally {
sourceFile.readUnlock();
}
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
LOGGER.info(
"[RepairScheduler] {} need to repair because it has internal unsorted data", sourceFile);
TsFileManager tsFileManager = timePartition.getTsFileManager();
Expand Down Expand Up @@ -152,14 +153,14 @@ private void checkOverlapInSequenceSpaceAndRepair(RepairTimePartition timePartit
}
checkTaskStatusAndMayStop();
CountDownLatch latch = new CountDownLatch(1);
overlapFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE);
RepairUnsortedFileCompactionTask task =
new RepairUnsortedFileCompactionTask(
timePartition.getTimePartitionId(),
timePartition.getTsFileManager(),
overlapFile,
latch,
true,
false,
tsFileManager.getNextCompactionTaskId());
LOGGER.info(
"[RepairScheduler] {} need to repair because it is overlapped with other files",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void recoverRepairProgress(RepairTaskRecoverLogParser recoverLogParser)
continue;
}
if (cannotRepairFiles.contains(resource.getTsFile().getName())) {
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR);
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ICompactionSelector;
Expand Down Expand Up @@ -327,6 +328,9 @@ private boolean canSubmitCrossTask(
@Override
public List<CrossCompactionTaskResource> selectCrossSpaceTask(
List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFilelist) {
if (!CompactionUtils.isDiskHasSpace()) {
return Collections.emptyList();
}
return selectCrossSpaceTask(sequenceFileList, unsequenceFilelist, false);
}

Expand Down
Loading
Loading