Skip to content

Commit

Permalink
Fixed the bug of the second update of last cache by query
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored Oct 18, 2024
1 parent bd5b30b commit 51bce33
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -125,7 +126,13 @@ protected void mayUpdateLastCache(
}

if (seriesScanInfo.left.decrementAndGet() == 0) {
lastCache.updateLastCache(getDatabaseName(), fullPath, seriesScanInfo.right, true);
lastCache.updateLastCacheIfExists(
getDatabaseName(),
fullPath.getIDeviceID(),
new String[] {fullPath.getMeasurement()},
new TimeValuePair[] {seriesScanInfo.right},
fullPath.isUnderAlignedEntity(),
new IMeasurementSchema[] {fullPath.getMeasurementSchema()});
}
} finally {
dataNodeQueryContext.unLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2877,7 +2877,6 @@ private UpdateLastCacheOperator createUpdateLastCacheOperator(
.getDataRegion()
.getDatabaseName(),
fullPath,
null,
false);
}

Expand Down Expand Up @@ -2934,7 +2933,6 @@ private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator(
devicePath.concatNode(unCachedPath.getMeasurementList().get(i)),
unCachedPath.getSchemaList().get(i),
true),
null,
false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.tsfile.write.schema.IMeasurementSchema;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.util.Collections;
Expand Down Expand Up @@ -160,11 +159,11 @@ int invalidateTreeSchema() {

/////////////////////////////// Last Cache ///////////////////////////////

int updateLastCache(
int initOrInvalidateLastCache(
final String database,
final String tableName,
final String[] measurements,
final @Nullable TimeValuePair[] timeValuePairs,
final boolean isInvalidate,
final boolean isTableModel) {
int result =
lastCache.compareAndSet(null, new TableDeviceLastCache())
Expand All @@ -173,7 +172,7 @@ int updateLastCache(
final TableDeviceLastCache cache = lastCache.get();
result +=
Objects.nonNull(cache)
? cache.update(database, tableName, measurements, timeValuePairs, isTableModel)
? cache.initOrInvalidate(database, tableName, measurements, isInvalidate, isTableModel)
: 0;
return Objects.nonNull(lastCache.get()) ? result : 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.List;
import java.util.Map;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@ThreadSafe
public class TableDeviceLastCache {
static final int INSTANCE_SIZE =
(int) RamUsageEstimator.shallowSizeOfInstance(TableDeviceLastCache.class)
Expand Down Expand Up @@ -87,51 +89,49 @@ public TSDataType getDataType() {
// Time is seen as "" as a measurement
private final Map<String, TimeValuePair> measurement2CachedLastMap = new ConcurrentHashMap<>();

int update(
int initOrInvalidate(
final String database,
final String tableName,
final String[] measurements,
final @Nullable TimeValuePair[] timeValuePairs,
final boolean isInvalidate,
final boolean isTableModel) {
final AtomicInteger diff = new AtomicInteger(0);

for (int i = 0; i < measurements.length; ++i) {
final String measurement =
!measurement2CachedLastMap.containsKey(measurements[i]) && isTableModel
for (final String measurement : measurements) {
final String finalMeasurement =
!measurement2CachedLastMap.containsKey(measurement) && isTableModel
? DataNodeTableCache.getInstance()
.tryGetInternColumnName(database, tableName, measurements[i])
: measurements[i];
.tryGetInternColumnName(database, tableName, measurement)
: measurement;

final TimeValuePair newPair =
Objects.nonNull(timeValuePairs) ? timeValuePairs[i] : PLACEHOLDER_TIME_VALUE_PAIR;
final TimeValuePair newPair = isInvalidate ? null : PLACEHOLDER_TIME_VALUE_PAIR;

measurement2CachedLastMap.compute(
measurement,
finalMeasurement,
(measurementKey, tvPair) -> {
if (Objects.isNull(newPair)) {
diff.addAndGet(
-((isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(finalMeasurement))
+ (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ ((Objects.isNull(tvPair)
|| tvPair == PLACEHOLDER_TIME_VALUE_PAIR
|| tvPair == EMPTY_TIME_VALUE_PAIR)
? 0
: tvPair.getSize())));
return null;
}
if (Objects.isNull(tvPair)) {
if (newPair == PLACEHOLDER_TIME_VALUE_PAIR) {
diff.addAndGet(
(isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(measurement))
+ (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ newPair.getSize());
return newPair;
}
} else if (tvPair.getTimestamp() < newPair.getTimestamp()
|| tvPair == PLACEHOLDER_TIME_VALUE_PAIR) {
diff.addAndGet(getDiffSize(tvPair, newPair));
diff.addAndGet(
(isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(finalMeasurement))
+ (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
return newPair;
}

return tvPair;
});
}
return diff.get();
}

@GuardedBy("DataRegionInsertLock#writeLock")
int tryUpdate(
final @Nonnull String[] measurements, final @Nonnull TimeValuePair[] timeValuePairs) {
final AtomicInteger diff = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,11 @@ public void invalidateAttributes(final String database, final IDeviceID deviceId
* consistency. WARNING: The writing may temporarily put a stale value in cache if a stale value
* is written, but it won't affect the eventual consistency.
*
* <p>- Second time put the calculated {@link TimeValuePair}s. The input {@link TimeValuePair}s
* shall never be or contain {@code null}, if a measurement is with all {@code null}s, its {@link
* TimeValuePair} shall be {@link TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. For time column,
* the input measurement shall be "", and the value shall be {@link
* <p>- Second time put the calculated {@link TimeValuePair}s, and use {@link
* #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[])}. The input {@link
* TimeValuePair}s shall never be or contain {@code null}, if a measurement is with all {@code
* null}s, its {@link TimeValuePair} shall be {@link TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}.
* For time column, the input measurement shall be "", and the value shall be {@link
* TableDeviceLastCache#EMPTY_PRIMITIVE_TYPE}. If the time column is not explicitly specified, the
* device's last time won't be updated because we cannot guarantee the completeness of the
* existing measurements in cache.
Expand All @@ -204,34 +205,33 @@ public void invalidateAttributes(final String database, final IDeviceID deviceId
* @param database the device's database, without "root"
* @param deviceId {@link IDeviceID}
* @param measurements the fetched measurements
* @param timeValuePairs {@code null} for the first fetch, the {@link TimeValuePair} with indexes
* corresponding to the measurements for the second fetch, all {@code null}s if the query has
* ended abnormally before the second push and need to invalidate the entry.
* @param isInvalidate whether to init or invalidate the cache
*/
public void updateLastCache(
public void initOrInvalidateLastCache(
final String database,
final IDeviceID deviceId,
final String[] measurements,
final @Nullable TimeValuePair[] timeValuePairs) {
final boolean isInvalidate) {
readWriteLock.readLock().lock();
try {
dualKeyCache.update(
new TableId(database, deviceId.getTableName()),
deviceId,
new TableDeviceCacheEntry(),
entry ->
entry.updateLastCache(
database, deviceId.getTableName(), measurements, timeValuePairs, true),
Objects.isNull(timeValuePairs));
entry.initOrInvalidateLastCache(
database, deviceId.getTableName(), measurements, isInvalidate, true),
!isInvalidate);
} finally {
readWriteLock.readLock().unlock();
}
}

/**
* Update the last cache in writing. If a measurement is with all {@code null}s or is an
* id/attribute column, its timeValuePair shall be {@code null}. For correctness, this will put
* the cache lazily and only update the existing last caches of measurements.
* Update the last cache in writing or the second push of last cache query. If a measurement is
* with all {@code null}s or is an id/attribute column, its {@link TimeValuePair}[] shall be
* {@code null}. For correctness, this will put the cache lazily and only update the existing last
* caches of measurements.
*
* @param database the device's database, without "root"
* @param deviceId {@link IDeviceID}
Expand Down Expand Up @@ -355,25 +355,29 @@ void updateLastCache(
final @Nullable TimeValuePair[] timeValuePairs,
final boolean isAligned,
final IMeasurementSchema[] measurementSchemas,
final boolean isQuery) {
final boolean initOrInvalidate) {
final String previousDatabase = treeModelDatabasePool.putIfAbsent(database, database);
final String database2Use = Objects.nonNull(previousDatabase) ? previousDatabase : database;

dualKeyCache.update(
new TableId(null, deviceID.getTableName()),
deviceID,
new TableDeviceCacheEntry(),
isQuery
initOrInvalidate
? entry ->
entry.setMeasurementSchema(
database2Use, isAligned, measurements, measurementSchemas)
+ entry.updateLastCache(
database, deviceID.getTableName(), measurements, timeValuePairs, false)
+ entry.initOrInvalidateLastCache(
database,
deviceID.getTableName(),
measurements,
Objects.nonNull(timeValuePairs),
false)
: entry ->
entry.setMeasurementSchema(
database2Use, isAligned, measurements, measurementSchemas)
+ entry.tryUpdateLastCache(measurements, timeValuePairs),
isQuery);
Objects.isNull(timeValuePairs));
}

// WARNING: This is not guaranteed to affect table model's cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;

import javax.annotation.Nullable;
import javax.annotation.Nonnull;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -386,9 +386,9 @@ public void updateLastCacheIfExists(
final String database,
final IDeviceID deviceID,
final String[] measurements,
final TimeValuePair[] timeValuePairs,
final @Nonnull TimeValuePair[] timeValuePairs,
final boolean isAligned,
final MeasurementSchema[] measurementSchemas) {
final IMeasurementSchema[] measurementSchemas) {
tableDeviceSchemaCache.updateLastCache(
database, deviceID, measurements, timeValuePairs, isAligned, measurementSchemas, false);
}
Expand All @@ -398,39 +398,33 @@ public void updateLastCacheIfExists(
*
* <p>Note: The query shall put the {@link TableDeviceLastCache} twice:
*
* <p>- First time set the "isCommit" to {@code false} before the query accesses data., which will
* put the {@link TimeValuePair} as {@code null}. It does not indicate that the measurements are
* all {@code null}s, just to allow the writing to update the cache, then avoid that the query put
* a stale value to cache and break the consistency. WARNING: The writing may temporarily put a
* stale value in cache if a stale value is written, but it won't affect the eventual consistency.
* <p>- First time set the "isCommit" to {@code false} before the query accesses data. It is just
* to allow the writing to update the cache, then avoid that the query put a stale value to cache
* and break the consistency. WARNING: The writing may temporarily put a stale value in cache if a
* stale value is written, but it won't affect the eventual consistency.
*
* <p>- Second time put the calculated {@link TimeValuePair}. The input {@link TimeValuePair}
* shall never be or contain {@code null}, if the measurement is with all {@code null}s, its
* {@link TimeValuePair} shall be {@link TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method
* is not supposed to update time column.
* <p>- Second time put the calculated {@link TimeValuePair}, and use {@link
* #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[], boolean,
* IMeasurementSchema[])}. The input {@link TimeValuePair} shall never be or contain {@code null},
* if the measurement is with all {@code null}s, its {@link TimeValuePair} shall be {@link
* TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed to update time column.
*
* <p>If the query has ended abnormally, it shall call this to invalidate the entry it has pushed
* in the first time, to avoid the stale writing damaging the eventual consistency. The input
* {@link TimeValuePair} shall be {@code null} in this case and the "isCommit" shall be {@code
* true}.
* in the first time, to avoid the stale writing damaging the eventual consistency. In this case
* and the "isInvalidate" shall be {@code true}.
*
* @param database the device's database, WITH "root"
* @param measurementPath the fetched {@link MeasurementPath}
* @param timeValuePair {@code null} to invalidate the first pushed cache, or the {@link
* TimeValuePair} corresponding to the measurement for the second fetch.
* @param isCommit {@code false} for the first fetch, {@code true} for the second fetch or
* invalidation.
* @param isInvalidate {@code true} if invalidate the first pushed cache, or {@code null} for the
* first fetch.
*/
public void updateLastCache(
final String database,
final MeasurementPath measurementPath,
final @Nullable TimeValuePair timeValuePair,
final boolean isCommit) {
final String database, final MeasurementPath measurementPath, final boolean isInvalidate) {
tableDeviceSchemaCache.updateLastCache(
database,
measurementPath.getIDeviceID(),
new String[] {measurementPath.getMeasurement()},
isCommit ? new TimeValuePair[] {timeValuePair} : null,
isInvalidate ? new TimeValuePair[] {null} : null,
measurementPath.isUnderAlignedEntity(),
new IMeasurementSchema[] {measurementPath.getMeasurementSchema()},
true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ public void testUpdateLastCache() throws IllegalPathException {
final TimeValuePair tv1 = new TimeValuePair(1, new TsPrimitiveType.TsInt(1));

treeDeviceSchemaCacheManager.updateLastCache(
database, new MeasurementPath(device.concatNode("s1"), s1), null, false);
database, new MeasurementPath(device.concatNode("s1"), s1), false);
treeDeviceSchemaCacheManager.updateLastCache(
database, new MeasurementPath(device.concatNode("s3"), s3), null, false);
database, new MeasurementPath(device.concatNode("s3"), s3), false);

// Simulate "s1" revert when the query has failed in calculation
treeDeviceSchemaCacheManager.updateLastCacheIfExists(
Expand All @@ -214,15 +214,26 @@ public void testUpdateLastCache() throws IllegalPathException {
false,
new MeasurementSchema[] {s1});
treeDeviceSchemaCacheManager.updateLastCache(
database, new MeasurementPath(device.concatNode("s1"), s1), null, true);
database, new MeasurementPath(device.concatNode("s1"), s1), true);

// "s2" shall be null since the "null" timeValuePair has not been put
treeDeviceSchemaCacheManager.updateLastCache(
database, new MeasurementPath(device.concatNode("s2"), s1), tv1, true);
treeDeviceSchemaCacheManager.updateLastCacheIfExists(
database,
IDeviceID.Factory.DEFAULT_FACTORY.create(
StringArrayDeviceID.splitDeviceIdString(device.getNodes())),
new String[] {"s2"},
new TimeValuePair[] {tv1},
false,
new MeasurementSchema[] {s2});

// Normal update
treeDeviceSchemaCacheManager.updateLastCache(
database, new MeasurementPath(device.concatNode("s3"), s3), tv1, true);
treeDeviceSchemaCacheManager.updateLastCacheIfExists(
database,
IDeviceID.Factory.DEFAULT_FACTORY.create(
StringArrayDeviceID.splitDeviceIdString(device.getNodes())),
new String[] {"s3"},
new TimeValuePair[] {tv1},
false,
new MeasurementSchema[] {s3});

Assert.assertNull(
treeDeviceSchemaCacheManager.getLastCache(new MeasurementPath("root.db.d.s1")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,8 @@ public void testLastCache() {
cache.getLastEntry(database, convertIdValuesToDeviceID(table1, device0), "s3"));

// Test null hit measurements
cache.updateLastCache(
database,
convertIdValuesToDeviceID(table1, device0),
new String[] {"s4"},
new TimeValuePair[] {TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR});
cache.initOrInvalidateLastCache(
database, convertIdValuesToDeviceID(table1, device0), new String[] {"s4"}, false);

// Miss if the "null" time value pair is not in cache, meaning that the
// entry is evicted
Expand Down Expand Up @@ -458,8 +455,8 @@ private void updateLastCache4Query(
final IDeviceID deviceID,
final String[] measurement,
final TimeValuePair[] data) {
cache.updateLastCache(database, deviceID, measurement, null);
cache.updateLastCache(database, deviceID, measurement, data);
cache.initOrInvalidateLastCache(database, deviceID, measurement, false);
cache.updateLastCacheIfExists(database, deviceID, measurement, data);
}

@Test
Expand Down

0 comments on commit 51bce33

Please sign in to comment.