diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java index 6ffcfad8cabc..26458513ba88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionLastTimeCheckFailedException.java @@ -19,6 +19,10 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception; +import org.apache.iotdb.commons.conf.IoTDBConstant; + +import org.apache.tsfile.file.metadata.IDeviceID; + public class CompactionLastTimeCheckFailedException extends RuntimeException { public CompactionLastTimeCheckFailedException( @@ -32,6 +36,19 @@ public CompactionLastTimeCheckFailedException( + lastTimestamp); } + public CompactionLastTimeCheckFailedException( + IDeviceID device, String measurement, long currentTimestamp, long lastTimestamp) { + super( + "Timestamp of the current point of " + + device + + IoTDBConstant.PATH_SEPARATOR + + measurement + + " is " + + currentTimestamp + + ", which should be later than the last time " + + lastTimestamp); + } + @Override public Throwable fillInStackTrace() { return this; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index 67b98f175f89..51f6a7db434f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -21,13 +21,11 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionTargetFileCountExceededException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.ReadChunkAlignedSeriesCompactionExecutor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor; @@ -182,9 +180,8 @@ private void compactNotAlignedSeries( deviceIterator.iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice(); while (seriesIterator.hasNextSeries()) { checkThreadInterrupted(); - String series = seriesIterator.nextSeries(); + String measurement = seriesIterator.nextSeries(); // TODO: we can provide a configuration item to enable concurrent between each series - PartialPath path = CompactionPathUtils.getPath(device, series); LinkedList>> readerAndChunkMetadataList = seriesIterator.getMetadataListForCurrentSeries(); // remove the chunk metadata whose data type not match the data type of last chunk @@ -192,7 +189,7 @@ private void compactNotAlignedSeries( filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList); SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries = new SingleSeriesCompactionExecutor( - path, readerAndChunkMetadataList, writer, targetResource, summary); + device, measurement, readerAndChunkMetadataList, writer, targetResource, summary); compactionExecutorOfCurrentTimeSeries.execute(); } writer.endChunkGroup(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java index 20de56d10fd4..2cd4cf722c69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java @@ -35,14 +35,7 @@ private CompactionPathUtils() {} public static PartialPath getPath(IDeviceID device, String measurement) throws IllegalPathException { - PartialPath path; - String plainDeviceId = ((PlainDeviceID) device).toStringID(); - if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) { - path = DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId); - } else { - path = new PartialPath(((PlainDeviceID) device).toStringID().split(PATH_SEPARATER_NO_REGEX)); - } - return path.concatNode(measurement); + return getPath(device).concatNode(measurement); } public static PartialPath getPath(IDeviceID device) throws IllegalPathException { @@ -51,7 +44,7 @@ public static PartialPath getPath(IDeviceID device) throws IllegalPathException if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) { path = DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId); } else { - path = new PartialPath(((PlainDeviceID) device).toStringID().split(PATH_SEPARATER_NO_REGEX)); + path = new PartialPath(plainDeviceId.split(PATH_SEPARATER_NO_REGEX)); } return path; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index aa53aed93cc2..4836119ec05c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -70,6 +70,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { private final Map deviceIteratorMap = new HashMap<>(); private final Map> modificationCache = new HashMap<>(); private Pair currentDevice = null; + private long timeLowerBoundForCurrentDevice; /** * Used for compaction with read chunk performer. @@ -174,7 +175,7 @@ public boolean hasNextDevice() { * @return Pair of device full path and whether this device is aligned */ @SuppressWarnings("squid:S135") - public Pair nextDevice() { + public Pair nextDevice() throws IllegalPathException { List toBeRemovedResources = new LinkedList<>(); Pair minDevice = null; // get the device from source files sorted from the newest to the oldest by version @@ -205,6 +206,11 @@ public Pair nextDevice() { for (TsFileResource resource : toBeRemovedResources) { deviceIteratorMap.remove(resource); } + + timeLowerBoundForCurrentDevice = + CommonDateTimeUtils.currentTime() + - DataNodeTTLCache.getInstance() + .getTTL(((PlainDeviceID) currentDevice.getLeft()).toStringID()); return currentDevice; } @@ -398,59 +404,45 @@ private void applyModificationForAlignedChunkMetadataList( // all the value chunks is empty chunk return; } + IDeviceID device = currentDevice.getLeft(); + Deletion ttlDeletion = null; + if (tsFileResource.getStartTime(device) < timeLowerBoundForCurrentDevice) { + ttlDeletion = + new Deletion( + CompactionPathUtils.getPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD), + Long.MAX_VALUE, + Long.MIN_VALUE, + timeLowerBoundForCurrentDevice); + } List modifications = modificationCache.computeIfAbsent( tsFileResource, - r -> { - List list = - new LinkedList<>(ModificationFile.getNormalMods(r).getModifications()); - // add outdated device mods by ttl - try { - for (IDeviceID device : r.getDevices()) { - // TODO: remove deviceId conversion - long timeLowerBound = - CommonDateTimeUtils.currentTime() - - DataNodeTTLCache.getInstance() - .getTTL(((PlainDeviceID) device).toStringID()); - if (r.getStartTime(device) < timeLowerBound) { - list.add( - new Deletion( - CompactionPathUtils.getPath(device) - .concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD), - Long.MAX_VALUE, - Long.MIN_VALUE, - timeLowerBound)); - } - } - } catch (IllegalPathException e) { - throw new RuntimeException(e); - } - return list; - }); + r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications())); // construct the input params List> for QueryUtils.modifyAlignedChunkMetaData AlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0); List valueChunkMetadataList = alignedChunkMetadata.getValueChunkMetadataList(); List> modificationForCurDevice = new ArrayList<>(); - List valueSeriesPaths = new ArrayList<>(valueChunkMetadataList.size()); - for (int i = 0; i < valueChunkMetadataList.size(); ++i) { - modificationForCurDevice.add(new ArrayList<>()); - IChunkMetadata valueChunkMetadata = valueChunkMetadataList.get(i); - valueSeriesPaths.add( - valueChunkMetadata == null - ? null - : CompactionPathUtils.getPath( - currentDevice.left, valueChunkMetadata.getMeasurementUid())); - } - - for (Modification modification : modifications) { - for (int i = 0; i < valueChunkMetadataList.size(); ++i) { - PartialPath path = valueSeriesPaths.get(i); - if (path != null && modification.getPath().matchFullPath(path)) { - modificationForCurDevice.get(i).add(modification); + for (IChunkMetadata valueChunkMetadata : valueChunkMetadataList) { + if (valueChunkMetadata == null) { + modificationForCurDevice.add(Collections.emptyList()); + continue; + } + List modificationList = new ArrayList<>(); + PartialPath path = + CompactionPathUtils.getPath( + currentDevice.getLeft(), valueChunkMetadata.getMeasurementUid()); + for (Modification modification : modifications) { + if (modification.getPath().matchFullPath(path)) { + modificationList.add(modification); } } + if (ttlDeletion != null) { + modificationList.add(ttlDeletion); + } + modificationForCurDevice.add( + modificationList.isEmpty() ? Collections.emptyList() : modificationList); } ModificationUtils.modifyAlignedChunkMetaData( @@ -624,17 +616,27 @@ public String nextSeries() throws IllegalPathException { if (seriesInThisIteration.isEmpty()) { return new LinkedList<>(); } + IDeviceID device = currentDevice.getLeft(); currentCompactingSeries = seriesInThisIteration.removeFirst(); LinkedList>> readerAndChunkMetadataForThisSeries = new LinkedList<>(); - PartialPath path = - CompactionPathUtils.getPath(currentDevice.getLeft(), currentCompactingSeries); + PartialPath path = CompactionPathUtils.getPath(device, currentCompactingSeries); for (TsFileResource resource : tsFileResourcesSortedByAsc) { TsFileSequenceReader reader = readerMap.get(resource); Map> chunkMetadataListMap = chunkMetadataCacheMap.get(reader); + Deletion ttlDeletion = null; + if (resource.getStartTime(device) < timeLowerBoundForCurrentDevice) { + ttlDeletion = + new Deletion( + CompactionPathUtils.getPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD), + Long.MAX_VALUE, + Long.MIN_VALUE, + timeLowerBoundForCurrentDevice); + } + if (chunkMetadataListMap.containsKey(currentCompactingSeries)) { // get the chunk metadata list and modification list of current series in this tsfile List chunkMetadataListInThisResource = @@ -644,32 +646,7 @@ public String nextSeries() throws IllegalPathException { List modificationsInThisResource = modificationCache.computeIfAbsent( resource, - r -> { - List list = - new LinkedList<>(ModificationFile.getNormalMods(r).getModifications()); - // add outdated device mods by ttl - try { - for (IDeviceID device : r.getDevices()) { - // TODO: remove deviceId conversion - long timeLowerBound = - CommonDateTimeUtils.currentTime() - - DataNodeTTLCache.getInstance() - .getTTL(((PlainDeviceID) device).toStringID()); - if (r.getStartTime(device) < timeLowerBound) { - list.add( - new Deletion( - CompactionPathUtils.getPath(device) - .concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD), - Long.MAX_VALUE, - Long.MIN_VALUE, - timeLowerBound)); - } - } - } catch (IllegalPathException e) { - throw new RuntimeException(e); - } - return list; - }); + r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications())); LinkedList modificationForCurrentSeries = new LinkedList<>(); // collect the modifications for current series for (Modification modification : modificationsInThisResource) { @@ -677,6 +654,10 @@ public String nextSeries() throws IllegalPathException { modificationForCurrentSeries.add(modification); } } + // add ttl deletion for current series + if (ttlDeletion != null) { + modificationForCurrentSeries.add(ttlDeletion); + } // if there are modifications of current series, apply them to the chunk metadata if (!modificationForCurrentSeries.isEmpty()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java index 59a55e716ae5..7e85af0a9fe3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; @@ -29,7 +28,6 @@ import org.apache.tsfile.file.header.ChunkHeader; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.PlainDeviceID; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.Chunk; @@ -49,7 +47,7 @@ @SuppressWarnings("squid:S1319") public class SingleSeriesCompactionExecutor { private IDeviceID device; - private PartialPath series; + private String measurement; private LinkedList>> readerAndChunkMetadataList; private CompactionTsFileWriter fileWriter; private TsFileResource targetResource; @@ -75,13 +73,13 @@ public class SingleSeriesCompactionExecutor { IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction(); public SingleSeriesCompactionExecutor( - PartialPath series, + IDeviceID device, IMeasurementSchema measurementSchema, LinkedList>> readerAndChunkMetadataList, CompactionTsFileWriter fileWriter, TsFileResource targetResource) { - this.device = new PlainDeviceID(series.getDevice()); - this.series = series; + this.device = device; + this.measurement = measurementSchema.getMeasurementId(); this.readerAndChunkMetadataList = readerAndChunkMetadataList; this.fileWriter = fileWriter; this.schema = measurementSchema; @@ -93,13 +91,14 @@ public SingleSeriesCompactionExecutor( } public SingleSeriesCompactionExecutor( - PartialPath series, + IDeviceID device, + String measurement, LinkedList>> readerAndChunkMetadataList, CompactionTsFileWriter fileWriter, TsFileResource targetResource, CompactionTaskSummary summary) { - this.device = new PlainDeviceID(series.getDevice()); - this.series = series; + this.device = device; + this.measurement = measurement; this.readerAndChunkMetadataList = readerAndChunkMetadataList; this.fileWriter = fileWriter; this.schema = null; @@ -167,7 +166,7 @@ private void constructChunkWriterFromReadChunk(Chunk chunk) { ChunkHeader chunkHeader = chunk.getHeader(); this.schema = new MeasurementSchema( - series.getMeasurement(), + measurement, chunkHeader.getDataType(), chunkHeader.getEncodingType(), chunkHeader.getCompressionType()); @@ -371,7 +370,7 @@ private void flushChunkWriter() throws IOException { private void checkAndUpdatePreviousTimestamp(long currentWritingTimestamp) { if (currentWritingTimestamp <= lastWriteTimestamp) { throw new CompactionLastTimeCheckFailedException( - series.getFullPath(), currentWritingTimestamp, lastWriteTimestamp); + device, measurement, currentWritingTimestamp, lastWriteTimestamp); } else { lastWriteTimestamp = currentWritingTimestamp; }