diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java index 113defcb47c1..d1146cefa82e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java @@ -36,6 +36,7 @@ import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.schema.IMeasurementSchema; import javax.annotation.Nullable; @@ -125,7 +126,13 @@ protected void mayUpdateLastCache( } if (seriesScanInfo.left.decrementAndGet() == 0) { - lastCache.updateLastCache(getDatabaseName(), fullPath, seriesScanInfo.right, true); + lastCache.updateLastCacheIfExists( + getDatabaseName(), + fullPath.getIDeviceID(), + new String[] {fullPath.getMeasurement()}, + new TimeValuePair[] {seriesScanInfo.right}, + fullPath.isUnderAlignedEntity(), + new IMeasurementSchema[] {fullPath.getMeasurementSchema()}); } } finally { dataNodeQueryContext.unLock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 7df63fc808f1..5dd0c61bd751 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -2877,7 +2877,6 @@ private UpdateLastCacheOperator createUpdateLastCacheOperator( .getDataRegion() .getDatabaseName(), fullPath, - null, false); } @@ -2934,7 +2933,6 @@ private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator( devicePath.concatNode(unCachedPath.getMeasurementList().get(i)), unCachedPath.getSchemaList().get(i), true), - null, false); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java index 20cfff5e73bc..2bb7161be060 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java @@ -28,7 +28,6 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.util.Collections; @@ -160,11 +159,11 @@ int invalidateTreeSchema() { /////////////////////////////// Last Cache /////////////////////////////// - int updateLastCache( + int initOrInvalidateLastCache( final String database, final String tableName, final String[] measurements, - final @Nullable TimeValuePair[] timeValuePairs, + final boolean isInvalidate, final boolean isTableModel) { int result = lastCache.compareAndSet(null, new TableDeviceLastCache()) @@ -173,7 +172,7 @@ int updateLastCache( final TableDeviceLastCache cache = lastCache.get(); result += Objects.nonNull(cache) - ? cache.update(database, tableName, measurements, timeValuePairs, isTableModel) + ? cache.initOrInvalidate(database, tableName, measurements, isInvalidate, isTableModel) : 0; return Objects.nonNull(lastCache.get()) ? result : 0; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java index bae8ff5a5ec3..f43cf139e983 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java @@ -30,6 +30,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +@ThreadSafe public class TableDeviceLastCache { static final int INSTANCE_SIZE = (int) RamUsageEstimator.shallowSizeOfInstance(TableDeviceLastCache.class) @@ -87,51 +89,49 @@ public TSDataType getDataType() { // Time is seen as "" as a measurement private final Map measurement2CachedLastMap = new ConcurrentHashMap<>(); - int update( + int initOrInvalidate( final String database, final String tableName, final String[] measurements, - final @Nullable TimeValuePair[] timeValuePairs, + final boolean isInvalidate, final boolean isTableModel) { final AtomicInteger diff = new AtomicInteger(0); - for (int i = 0; i < measurements.length; ++i) { - final String measurement = - !measurement2CachedLastMap.containsKey(measurements[i]) && isTableModel + for (final String measurement : measurements) { + final String finalMeasurement = + !measurement2CachedLastMap.containsKey(measurement) && isTableModel ? DataNodeTableCache.getInstance() - .tryGetInternColumnName(database, tableName, measurements[i]) - : measurements[i]; + .tryGetInternColumnName(database, tableName, measurement) + : measurement; - final TimeValuePair newPair = - Objects.nonNull(timeValuePairs) ? timeValuePairs[i] : PLACEHOLDER_TIME_VALUE_PAIR; + final TimeValuePair newPair = isInvalidate ? null : PLACEHOLDER_TIME_VALUE_PAIR; measurement2CachedLastMap.compute( - measurement, + finalMeasurement, (measurementKey, tvPair) -> { if (Objects.isNull(newPair)) { + diff.addAndGet( + -((isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(finalMeasurement)) + + (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + + ((Objects.isNull(tvPair) + || tvPair == PLACEHOLDER_TIME_VALUE_PAIR + || tvPair == EMPTY_TIME_VALUE_PAIR) + ? 0 + : tvPair.getSize()))); return null; } if (Objects.isNull(tvPair)) { - if (newPair == PLACEHOLDER_TIME_VALUE_PAIR) { - diff.addAndGet( - (isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(measurement)) - + (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY - + newPair.getSize()); - return newPair; - } - } else if (tvPair.getTimestamp() < newPair.getTimestamp() - || tvPair == PLACEHOLDER_TIME_VALUE_PAIR) { - diff.addAndGet(getDiffSize(tvPair, newPair)); + diff.addAndGet( + (isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(finalMeasurement)) + + (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY); return newPair; } - return tvPair; }); } return diff.get(); } - @GuardedBy("DataRegionInsertLock#writeLock") int tryUpdate( final @Nonnull String[] measurements, final @Nonnull TimeValuePair[] timeValuePairs) { final AtomicInteger diff = new AtomicInteger(0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java index 7acd9dcb3b37..3cb1964a6a93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java @@ -189,10 +189,11 @@ public void invalidateAttributes(final String database, final IDeviceID deviceId * consistency. WARNING: The writing may temporarily put a stale value in cache if a stale value * is written, but it won't affect the eventual consistency. * - *

- Second time put the calculated {@link TimeValuePair}s. The input {@link TimeValuePair}s - * shall never be or contain {@code null}, if a measurement is with all {@code null}s, its {@link - * TimeValuePair} shall be {@link TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. For time column, - * the input measurement shall be "", and the value shall be {@link + *

- Second time put the calculated {@link TimeValuePair}s, and use {@link + * #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[])}. The input {@link + * TimeValuePair}s shall never be or contain {@code null}, if a measurement is with all {@code + * null}s, its {@link TimeValuePair} shall be {@link TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. + * For time column, the input measurement shall be "", and the value shall be {@link * TableDeviceLastCache#EMPTY_PRIMITIVE_TYPE}. If the time column is not explicitly specified, the * device's last time won't be updated because we cannot guarantee the completeness of the * existing measurements in cache. @@ -204,15 +205,13 @@ public void invalidateAttributes(final String database, final IDeviceID deviceId * @param database the device's database, without "root" * @param deviceId {@link IDeviceID} * @param measurements the fetched measurements - * @param timeValuePairs {@code null} for the first fetch, the {@link TimeValuePair} with indexes - * corresponding to the measurements for the second fetch, all {@code null}s if the query has - * ended abnormally before the second push and need to invalidate the entry. + * @param isInvalidate whether to init or invalidate the cache */ - public void updateLastCache( + public void initOrInvalidateLastCache( final String database, final IDeviceID deviceId, final String[] measurements, - final @Nullable TimeValuePair[] timeValuePairs) { + final boolean isInvalidate) { readWriteLock.readLock().lock(); try { dualKeyCache.update( @@ -220,18 +219,19 @@ public void updateLastCache( deviceId, new TableDeviceCacheEntry(), entry -> - entry.updateLastCache( - database, deviceId.getTableName(), measurements, timeValuePairs, true), - Objects.isNull(timeValuePairs)); + entry.initOrInvalidateLastCache( + database, deviceId.getTableName(), measurements, isInvalidate, true), + !isInvalidate); } finally { readWriteLock.readLock().unlock(); } } /** - * Update the last cache in writing. If a measurement is with all {@code null}s or is an - * id/attribute column, its timeValuePair shall be {@code null}. For correctness, this will put - * the cache lazily and only update the existing last caches of measurements. + * Update the last cache in writing or the second push of last cache query. If a measurement is + * with all {@code null}s or is an id/attribute column, its {@link TimeValuePair}[] shall be + * {@code null}. For correctness, this will put the cache lazily and only update the existing last + * caches of measurements. * * @param database the device's database, without "root" * @param deviceId {@link IDeviceID} @@ -355,7 +355,7 @@ void updateLastCache( final @Nullable TimeValuePair[] timeValuePairs, final boolean isAligned, final IMeasurementSchema[] measurementSchemas, - final boolean isQuery) { + final boolean initOrInvalidate) { final String previousDatabase = treeModelDatabasePool.putIfAbsent(database, database); final String database2Use = Objects.nonNull(previousDatabase) ? previousDatabase : database; @@ -363,17 +363,21 @@ void updateLastCache( new TableId(null, deviceID.getTableName()), deviceID, new TableDeviceCacheEntry(), - isQuery + initOrInvalidate ? entry -> entry.setMeasurementSchema( database2Use, isAligned, measurements, measurementSchemas) - + entry.updateLastCache( - database, deviceID.getTableName(), measurements, timeValuePairs, false) + + entry.initOrInvalidateLastCache( + database, + deviceID.getTableName(), + measurements, + Objects.nonNull(timeValuePairs), + false) : entry -> entry.setMeasurementSchema( database2Use, isAligned, measurements, measurementSchemas) + entry.tryUpdateLastCache(measurements, timeValuePairs), - isQuery); + Objects.isNull(timeValuePairs)); } // WARNING: This is not guaranteed to affect table model's cache diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java index 7393b99029a8..46fbca6c53fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java @@ -37,7 +37,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; -import javax.annotation.Nullable; +import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Arrays; @@ -386,9 +386,9 @@ public void updateLastCacheIfExists( final String database, final IDeviceID deviceID, final String[] measurements, - final TimeValuePair[] timeValuePairs, + final @Nonnull TimeValuePair[] timeValuePairs, final boolean isAligned, - final MeasurementSchema[] measurementSchemas) { + final IMeasurementSchema[] measurementSchemas) { tableDeviceSchemaCache.updateLastCache( database, deviceID, measurements, timeValuePairs, isAligned, measurementSchemas, false); } @@ -398,39 +398,33 @@ public void updateLastCacheIfExists( * *

Note: The query shall put the {@link TableDeviceLastCache} twice: * - *

- First time set the "isCommit" to {@code false} before the query accesses data., which will - * put the {@link TimeValuePair} as {@code null}. It does not indicate that the measurements are - * all {@code null}s, just to allow the writing to update the cache, then avoid that the query put - * a stale value to cache and break the consistency. WARNING: The writing may temporarily put a - * stale value in cache if a stale value is written, but it won't affect the eventual consistency. + *

- First time set the "isCommit" to {@code false} before the query accesses data. It is just + * to allow the writing to update the cache, then avoid that the query put a stale value to cache + * and break the consistency. WARNING: The writing may temporarily put a stale value in cache if a + * stale value is written, but it won't affect the eventual consistency. * - *

- Second time put the calculated {@link TimeValuePair}. The input {@link TimeValuePair} - * shall never be or contain {@code null}, if the measurement is with all {@code null}s, its - * {@link TimeValuePair} shall be {@link TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method - * is not supposed to update time column. + *

- Second time put the calculated {@link TimeValuePair}, and use {@link + * #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[], boolean, + * IMeasurementSchema[])}. The input {@link TimeValuePair} shall never be or contain {@code null}, + * if the measurement is with all {@code null}s, its {@link TimeValuePair} shall be {@link + * TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed to update time column. * *

If the query has ended abnormally, it shall call this to invalidate the entry it has pushed - * in the first time, to avoid the stale writing damaging the eventual consistency. The input - * {@link TimeValuePair} shall be {@code null} in this case and the "isCommit" shall be {@code - * true}. + * in the first time, to avoid the stale writing damaging the eventual consistency. In this case + * and the "isInvalidate" shall be {@code true}. * * @param database the device's database, WITH "root" * @param measurementPath the fetched {@link MeasurementPath} - * @param timeValuePair {@code null} to invalidate the first pushed cache, or the {@link - * TimeValuePair} corresponding to the measurement for the second fetch. - * @param isCommit {@code false} for the first fetch, {@code true} for the second fetch or - * invalidation. + * @param isInvalidate {@code true} if invalidate the first pushed cache, or {@code null} for the + * first fetch. */ public void updateLastCache( - final String database, - final MeasurementPath measurementPath, - final @Nullable TimeValuePair timeValuePair, - final boolean isCommit) { + final String database, final MeasurementPath measurementPath, final boolean isInvalidate) { tableDeviceSchemaCache.updateLastCache( database, measurementPath.getIDeviceID(), new String[] {measurementPath.getMeasurement()}, - isCommit ? new TimeValuePair[] {timeValuePair} : null, + isInvalidate ? new TimeValuePair[] {null} : null, measurementPath.isUnderAlignedEntity(), new IMeasurementSchema[] {measurementPath.getMeasurementSchema()}, true); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java index 05b3515f4184..446d0cd65874 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java @@ -198,9 +198,9 @@ public void testUpdateLastCache() throws IllegalPathException { final TimeValuePair tv1 = new TimeValuePair(1, new TsPrimitiveType.TsInt(1)); treeDeviceSchemaCacheManager.updateLastCache( - database, new MeasurementPath(device.concatNode("s1"), s1), null, false); + database, new MeasurementPath(device.concatNode("s1"), s1), false); treeDeviceSchemaCacheManager.updateLastCache( - database, new MeasurementPath(device.concatNode("s3"), s3), null, false); + database, new MeasurementPath(device.concatNode("s3"), s3), false); // Simulate "s1" revert when the query has failed in calculation treeDeviceSchemaCacheManager.updateLastCacheIfExists( @@ -214,15 +214,26 @@ public void testUpdateLastCache() throws IllegalPathException { false, new MeasurementSchema[] {s1}); treeDeviceSchemaCacheManager.updateLastCache( - database, new MeasurementPath(device.concatNode("s1"), s1), null, true); + database, new MeasurementPath(device.concatNode("s1"), s1), true); // "s2" shall be null since the "null" timeValuePair has not been put - treeDeviceSchemaCacheManager.updateLastCache( - database, new MeasurementPath(device.concatNode("s2"), s1), tv1, true); + treeDeviceSchemaCacheManager.updateLastCacheIfExists( + database, + IDeviceID.Factory.DEFAULT_FACTORY.create( + StringArrayDeviceID.splitDeviceIdString(device.getNodes())), + new String[] {"s2"}, + new TimeValuePair[] {tv1}, + false, + new MeasurementSchema[] {s2}); - // Normal update - treeDeviceSchemaCacheManager.updateLastCache( - database, new MeasurementPath(device.concatNode("s3"), s3), tv1, true); + treeDeviceSchemaCacheManager.updateLastCacheIfExists( + database, + IDeviceID.Factory.DEFAULT_FACTORY.create( + StringArrayDeviceID.splitDeviceIdString(device.getNodes())), + new String[] {"s3"}, + new TimeValuePair[] {tv1}, + false, + new MeasurementSchema[] {s3}); Assert.assertNull( treeDeviceSchemaCacheManager.getLastCache(new MeasurementPath("root.db.d.s1"))); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java index d739d9493bce..ebcbddda0bbd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java @@ -218,11 +218,8 @@ public void testLastCache() { cache.getLastEntry(database, convertIdValuesToDeviceID(table1, device0), "s3")); // Test null hit measurements - cache.updateLastCache( - database, - convertIdValuesToDeviceID(table1, device0), - new String[] {"s4"}, - new TimeValuePair[] {TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR}); + cache.initOrInvalidateLastCache( + database, convertIdValuesToDeviceID(table1, device0), new String[] {"s4"}, false); // Miss if the "null" time value pair is not in cache, meaning that the // entry is evicted @@ -458,8 +455,8 @@ private void updateLastCache4Query( final IDeviceID deviceID, final String[] measurement, final TimeValuePair[] data) { - cache.updateLastCache(database, deviceID, measurement, null); - cache.updateLastCache(database, deviceID, measurement, data); + cache.initOrInvalidateLastCache(database, deviceID, measurement, false); + cache.updateLastCacheIfExists(database, deviceID, measurement, data); } @Test