Skip to content

Commit

Permalink
Decrease TTL Deletion in compaction modification cache (#12687)
Browse files Browse the repository at this point in the history
* decrease TTL Deletion in compaction modification cache

* Update iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java

Co-authored-by: Jiang Tian <[email protected]>

* modify MultiTsFileDeviceIterator

* fix spotless

---------

Co-authored-by: Jiang Tian <[email protected]>
  • Loading branch information
shuwenwei and jt2594838 authored Jun 14, 2024
1 parent 3168bf6 commit 9c085d1
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;

import org.apache.iotdb.commons.conf.IoTDBConstant;

import org.apache.tsfile.file.metadata.IDeviceID;

public class CompactionLastTimeCheckFailedException extends RuntimeException {

public CompactionLastTimeCheckFailedException(
Expand All @@ -32,6 +36,19 @@ public CompactionLastTimeCheckFailedException(
+ lastTimestamp);
}

public CompactionLastTimeCheckFailedException(
IDeviceID device, String measurement, long currentTimestamp, long lastTimestamp) {
super(
"Timestamp of the current point of "
+ device
+ IoTDBConstant.PATH_SEPARATOR
+ measurement
+ " is "
+ currentTimestamp
+ ", which should be later than the last time "
+ lastTimestamp);
}

@Override
public Throwable fillInStackTrace() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionTargetFileCountExceededException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.ReadChunkAlignedSeriesCompactionExecutor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
Expand Down Expand Up @@ -182,17 +180,16 @@ private void compactNotAlignedSeries(
deviceIterator.iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice();
while (seriesIterator.hasNextSeries()) {
checkThreadInterrupted();
String series = seriesIterator.nextSeries();
String measurement = seriesIterator.nextSeries();
// TODO: we can provide a configuration item to enable concurrent between each series
PartialPath path = CompactionPathUtils.getPath(device, series);
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
seriesIterator.getMetadataListForCurrentSeries();
// remove the chunk metadata whose data type not match the data type of last chunk
readerAndChunkMetadataList =
filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList);
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
new SingleSeriesCompactionExecutor(
path, readerAndChunkMetadataList, writer, targetResource, summary);
device, measurement, readerAndChunkMetadataList, writer, targetResource, summary);
compactionExecutorOfCurrentTimeSeries.execute();
}
writer.endChunkGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,7 @@ private CompactionPathUtils() {}

public static PartialPath getPath(IDeviceID device, String measurement)
throws IllegalPathException {
PartialPath path;
String plainDeviceId = ((PlainDeviceID) device).toStringID();
if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) {
path = DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId);
} else {
path = new PartialPath(((PlainDeviceID) device).toStringID().split(PATH_SEPARATER_NO_REGEX));
}
return path.concatNode(measurement);
return getPath(device).concatNode(measurement);
}

public static PartialPath getPath(IDeviceID device) throws IllegalPathException {
Expand All @@ -51,7 +44,7 @@ public static PartialPath getPath(IDeviceID device) throws IllegalPathException
if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) {
path = DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId);
} else {
path = new PartialPath(((PlainDeviceID) device).toStringID().split(PATH_SEPARATER_NO_REGEX));
path = new PartialPath(plainDeviceId.split(PATH_SEPARATER_NO_REGEX));
}
return path;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable {
private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>();
private final Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
private Pair<IDeviceID, Boolean> currentDevice = null;
private long timeLowerBoundForCurrentDevice;

/**
* Used for compaction with read chunk performer.
Expand Down Expand Up @@ -174,7 +175,7 @@ public boolean hasNextDevice() {
* @return Pair of device full path and whether this device is aligned
*/
@SuppressWarnings("squid:S135")
public Pair<IDeviceID, Boolean> nextDevice() {
public Pair<IDeviceID, Boolean> nextDevice() throws IllegalPathException {
List<TsFileResource> toBeRemovedResources = new LinkedList<>();
Pair<IDeviceID, Boolean> minDevice = null;
// get the device from source files sorted from the newest to the oldest by version
Expand Down Expand Up @@ -205,6 +206,11 @@ public Pair<IDeviceID, Boolean> nextDevice() {
for (TsFileResource resource : toBeRemovedResources) {
deviceIteratorMap.remove(resource);
}

timeLowerBoundForCurrentDevice =
CommonDateTimeUtils.currentTime()
- DataNodeTTLCache.getInstance()
.getTTL(((PlainDeviceID) currentDevice.getLeft()).toStringID());
return currentDevice;
}

Expand Down Expand Up @@ -398,59 +404,45 @@ private void applyModificationForAlignedChunkMetadataList(
// all the value chunks is empty chunk
return;
}
IDeviceID device = currentDevice.getLeft();
Deletion ttlDeletion = null;
if (tsFileResource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
ttlDeletion =
new Deletion(
CompactionPathUtils.getPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
Long.MAX_VALUE,
Long.MIN_VALUE,
timeLowerBoundForCurrentDevice);
}

List<Modification> modifications =
modificationCache.computeIfAbsent(
tsFileResource,
r -> {
List<Modification> list =
new LinkedList<>(ModificationFile.getNormalMods(r).getModifications());
// add outdated device mods by ttl
try {
for (IDeviceID device : r.getDevices()) {
// TODO: remove deviceId conversion
long timeLowerBound =
CommonDateTimeUtils.currentTime()
- DataNodeTTLCache.getInstance()
.getTTL(((PlainDeviceID) device).toStringID());
if (r.getStartTime(device) < timeLowerBound) {
list.add(
new Deletion(
CompactionPathUtils.getPath(device)
.concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
Long.MAX_VALUE,
Long.MIN_VALUE,
timeLowerBound));
}
}
} catch (IllegalPathException e) {
throw new RuntimeException(e);
}
return list;
});
r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));

// construct the input params List<List<Modification>> for QueryUtils.modifyAlignedChunkMetaData
AlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0);
List<IChunkMetadata> valueChunkMetadataList = alignedChunkMetadata.getValueChunkMetadataList();
List<List<Modification>> modificationForCurDevice = new ArrayList<>();
List<PartialPath> valueSeriesPaths = new ArrayList<>(valueChunkMetadataList.size());
for (int i = 0; i < valueChunkMetadataList.size(); ++i) {
modificationForCurDevice.add(new ArrayList<>());
IChunkMetadata valueChunkMetadata = valueChunkMetadataList.get(i);
valueSeriesPaths.add(
valueChunkMetadata == null
? null
: CompactionPathUtils.getPath(
currentDevice.left, valueChunkMetadata.getMeasurementUid()));
}

for (Modification modification : modifications) {
for (int i = 0; i < valueChunkMetadataList.size(); ++i) {
PartialPath path = valueSeriesPaths.get(i);
if (path != null && modification.getPath().matchFullPath(path)) {
modificationForCurDevice.get(i).add(modification);
for (IChunkMetadata valueChunkMetadata : valueChunkMetadataList) {
if (valueChunkMetadata == null) {
modificationForCurDevice.add(Collections.emptyList());
continue;
}
List<Modification> modificationList = new ArrayList<>();
PartialPath path =
CompactionPathUtils.getPath(
currentDevice.getLeft(), valueChunkMetadata.getMeasurementUid());
for (Modification modification : modifications) {
if (modification.getPath().matchFullPath(path)) {
modificationList.add(modification);
}
}
if (ttlDeletion != null) {
modificationList.add(ttlDeletion);
}
modificationForCurDevice.add(
modificationList.isEmpty() ? Collections.emptyList() : modificationList);
}

ModificationUtils.modifyAlignedChunkMetaData(
Expand Down Expand Up @@ -624,17 +616,27 @@ public String nextSeries() throws IllegalPathException {
if (seriesInThisIteration.isEmpty()) {
return new LinkedList<>();
}
IDeviceID device = currentDevice.getLeft();
currentCompactingSeries = seriesInThisIteration.removeFirst();

LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataForThisSeries = new LinkedList<>();
PartialPath path =
CompactionPathUtils.getPath(currentDevice.getLeft(), currentCompactingSeries);
PartialPath path = CompactionPathUtils.getPath(device, currentCompactingSeries);

for (TsFileResource resource : tsFileResourcesSortedByAsc) {
TsFileSequenceReader reader = readerMap.get(resource);
Map<String, List<ChunkMetadata>> chunkMetadataListMap = chunkMetadataCacheMap.get(reader);

Deletion ttlDeletion = null;
if (resource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
ttlDeletion =
new Deletion(
CompactionPathUtils.getPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
Long.MAX_VALUE,
Long.MIN_VALUE,
timeLowerBoundForCurrentDevice);
}

if (chunkMetadataListMap.containsKey(currentCompactingSeries)) {
// get the chunk metadata list and modification list of current series in this tsfile
List<ChunkMetadata> chunkMetadataListInThisResource =
Expand All @@ -644,39 +646,18 @@ public String nextSeries() throws IllegalPathException {
List<Modification> modificationsInThisResource =
modificationCache.computeIfAbsent(
resource,
r -> {
List<Modification> list =
new LinkedList<>(ModificationFile.getNormalMods(r).getModifications());
// add outdated device mods by ttl
try {
for (IDeviceID device : r.getDevices()) {
// TODO: remove deviceId conversion
long timeLowerBound =
CommonDateTimeUtils.currentTime()
- DataNodeTTLCache.getInstance()
.getTTL(((PlainDeviceID) device).toStringID());
if (r.getStartTime(device) < timeLowerBound) {
list.add(
new Deletion(
CompactionPathUtils.getPath(device)
.concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
Long.MAX_VALUE,
Long.MIN_VALUE,
timeLowerBound));
}
}
} catch (IllegalPathException e) {
throw new RuntimeException(e);
}
return list;
});
r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));
LinkedList<Modification> modificationForCurrentSeries = new LinkedList<>();
// collect the modifications for current series
for (Modification modification : modificationsInThisResource) {
if (modification.getPath().matchFullPath(path)) {
modificationForCurrentSeries.add(modification);
}
}
// add ttl deletion for current series
if (ttlDeletion != null) {
modificationForCurrentSeries.add(ttlDeletion);
}

// if there are modifications of current series, apply them to the chunk metadata
if (!modificationForCurrentSeries.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk;

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
Expand All @@ -29,7 +28,6 @@
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Chunk;
Expand All @@ -49,7 +47,7 @@
@SuppressWarnings("squid:S1319")
public class SingleSeriesCompactionExecutor {
private IDeviceID device;
private PartialPath series;
private String measurement;
private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList;
private CompactionTsFileWriter fileWriter;
private TsFileResource targetResource;
Expand All @@ -75,13 +73,13 @@ public class SingleSeriesCompactionExecutor {
IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();

public SingleSeriesCompactionExecutor(
PartialPath series,
IDeviceID device,
IMeasurementSchema measurementSchema,
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList,
CompactionTsFileWriter fileWriter,
TsFileResource targetResource) {
this.device = new PlainDeviceID(series.getDevice());
this.series = series;
this.device = device;
this.measurement = measurementSchema.getMeasurementId();
this.readerAndChunkMetadataList = readerAndChunkMetadataList;
this.fileWriter = fileWriter;
this.schema = measurementSchema;
Expand All @@ -93,13 +91,14 @@ public SingleSeriesCompactionExecutor(
}

public SingleSeriesCompactionExecutor(
PartialPath series,
IDeviceID device,
String measurement,
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList,
CompactionTsFileWriter fileWriter,
TsFileResource targetResource,
CompactionTaskSummary summary) {
this.device = new PlainDeviceID(series.getDevice());
this.series = series;
this.device = device;
this.measurement = measurement;
this.readerAndChunkMetadataList = readerAndChunkMetadataList;
this.fileWriter = fileWriter;
this.schema = null;
Expand Down Expand Up @@ -167,7 +166,7 @@ private void constructChunkWriterFromReadChunk(Chunk chunk) {
ChunkHeader chunkHeader = chunk.getHeader();
this.schema =
new MeasurementSchema(
series.getMeasurement(),
measurement,
chunkHeader.getDataType(),
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType());
Expand Down Expand Up @@ -371,7 +370,7 @@ private void flushChunkWriter() throws IOException {
private void checkAndUpdatePreviousTimestamp(long currentWritingTimestamp) {
if (currentWritingTimestamp <= lastWriteTimestamp) {
throw new CompactionLastTimeCheckFailedException(
series.getFullPath(), currentWritingTimestamp, lastWriteTimestamp);
device, measurement, currentWritingTimestamp, lastWriteTimestamp);
} else {
lastWriteTimestamp = currentWritingTimestamp;
}
Expand Down

0 comments on commit 9c085d1

Please sign in to comment.