Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix insert/load with timestamp of Long.MIN_VALUE and LONG.MAX_VALUE #14491

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.TSRecord;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
Expand All @@ -55,6 +60,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -72,6 +79,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@SuppressWarnings("ThrowFromFinallyBlock")
@RunWith(IoTDBTestRunner.class)
public class IoTDBSessionSimpleIT {

Expand Down Expand Up @@ -336,7 +344,7 @@ public void insertByObjAndNotInferTypeTest() {
expected.add(TSDataType.TEXT.name());

Set<String> actual = new HashSet<>();
SessionDataSet dataSet = session.executeQueryStatement("show timeseries root.**");
SessionDataSet dataSet = session.executeQueryStatement("show timeseries root.sg1.**");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids conflicts with other tests.

while (dataSet.hasNext()) {
actual.add(dataSet.next().getFields().get(3).getStringValue());
}
Expand Down Expand Up @@ -1859,4 +1867,112 @@ public void convertRecordsToTabletsTest() {
e.printStackTrace();
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertMinMaxTimeTest() throws IoTDBConnectionException, StatementExecutionException {
try {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"false\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}

session.executeNonQueryStatement(
String.format(
"INSERT INTO root.testInsertMinMax.d1(timestamp, s1) VALUES (%d, 1)",
Long.MIN_VALUE));
session.executeNonQueryStatement(
String.format(
"INSERT INTO root.testInsertMinMax.d1(timestamp, s1) VALUES (%d, 1)",
Long.MAX_VALUE));

SessionDataSet dataSet =
session.executeQueryStatement("SELECT * FROM root.testInsertMinMax.d1");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());

session.executeNonQueryStatement("FLUSH");
dataSet = session.executeQueryStatement("SELECT * FROM root.testInsertMinMax.d1");
record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());
}
} finally {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"true\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void loadMinMaxTimeTest()
throws IoTDBConnectionException,
StatementExecutionException,
IOException,
WriteProcessException {
File file = new File("target", "test.tsfile");
try (TsFileWriter writer = new TsFileWriter(file)) {
IDeviceID deviceID = Factory.DEFAULT_FACTORY.create("root.testLoadMinMax.d1");
writer.registerTimeseries(deviceID, new MeasurementSchema("s1", TSDataType.INT32));
TSRecord record = new TSRecord(deviceID, Long.MIN_VALUE);
record.addPoint("s1", 1);
writer.writeRecord(record);
record.setTime(Long.MAX_VALUE);
writer.writeRecord(record);
}

try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"false\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
session.executeNonQueryStatement("LOAD \"" + file.getAbsolutePath() + "\"");

SessionDataSet dataSet =
session.executeQueryStatement("SELECT * FROM root.testLoadMinMax.d1");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());
} finally {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"true\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
file.delete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2886,6 +2886,15 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
BinaryAllocator.getInstance().close(true);
}

commonDescriptor
.getConfig()
.setTimestampPrecisionCheckEnabled(
Boolean.parseBoolean(
properties.getProperty(
"timestamp_precision_check_enabled",
ConfigurationFileUtils.getConfigurationDefaultValue(
"timestamp_precision_check_enabled"))));

conf.setEnablePartialInsert(
Boolean.parseBoolean(
Optional.ofNullable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2301,7 +2301,11 @@ public static Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> getTimePart
result.add(timePartitionSlot);
// next init
timePartitionSlot = new TTimePartitionSlot(endTime);
endTime = endTime + TimePartitionUtils.getTimePartitionInterval();
// beware of overflow
endTime =
endTime + TimePartitionUtils.getTimePartitionInterval() > endTime
? endTime + TimePartitionUtils.getTimePartitionInterval()
: Long.MAX_VALUE;
} else {
index++;
if (index < size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void setDatabase(String database) {

public void autoCreateAndVerify(IDeviceID device) {
try {
if (isDeviceDeletedByMods(device)) {
if (ModificationUtils.isDeviceDeletedByMods(currentModifications, currentTimeIndex, device)) {
return;
}
} catch (IllegalPathException e) {
Expand All @@ -132,15 +132,6 @@ public void autoCreateAndVerify(IDeviceID device) {
}
}

private boolean isDeviceDeletedByMods(IDeviceID device) throws IllegalPathException {
return currentTimeIndex != null
&& ModificationUtils.isAllDeletedByMods(
currentModifications,
device,
currentTimeIndex.getStartTime(device),
currentTimeIndex.getEndTime(device));
}

private void addDevice(final IDeviceID device) {
final String tableName = device.getTableName();
long memoryUsageSizeInBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,7 @@ public void setCurrentModificationsAndTimeIndex(
}

public boolean isDeviceDeletedByMods(IDeviceID device) throws IllegalPathException {
return currentTimeIndex != null
&& ModificationUtils.isAllDeletedByMods(
currentModifications,
device,
currentTimeIndex.getStartTime(device),
currentTimeIndex.getEndTime(device));
return ModificationUtils.isDeviceDeletedByMods(currentModifications, currentTimeIndex, device);
}

public boolean isTimeseriesDeletedByMods(IDeviceID device, TimeseriesMetadata timeseriesMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1995,15 +1995,18 @@ private long parseTimeValue(ConstantContext constant) {
if (constant.INTEGER_LITERAL() != null) {
try {
if (constant.MINUS() != null) {
return -Long.parseLong(constant.INTEGER_LITERAL().getText());
return Long.parseLong("-" + constant.INTEGER_LITERAL().getText());
}
return Long.parseLong(constant.INTEGER_LITERAL().getText());
} catch (NumberFormatException e) {
throw new SemanticException(
String.format(
"Current system timestamp precision is %s, "
"Failed to parse the timestamp: "
+ e.getMessage()
+ "Current system timestamp precision is %s, "
+ "please check whether the timestamp %s is correct.",
TIMESTAMP_PRECISION, constant.INTEGER_LITERAL().getText()));
TIMESTAMP_PRECISION,
constant.INTEGER_LITERAL().getText()));
}
} else if (constant.dateExpression() != null) {
return parseDateExpression(constant.dateExpression(), CommonDateTimeUtils.currentTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,22 @@ public boolean isTsFileEmpty() {
return resource.getDevices().isEmpty();
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
public boolean needDecodeTsFile(
Function<List<Pair<IDeviceID, TTimePartitionSlot>>, List<TRegionReplicaSet>> partitionFetcher)
throws IOException {
Function<List<Pair<IDeviceID, TTimePartitionSlot>>, List<TRegionReplicaSet>>
partitionFetcher) {
List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
resource
.getDevices()
.forEach(
o -> {
// iterating the index, must present
slotList.add(
new Pair<>(o, TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o))));
new Pair<>(
o, TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o).get())));
slotList.add(
new Pair<>(o, TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o))));
new Pair<>(
o, TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o).get())));
});

if (slotList.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,8 @@ protected void updateDeviceLastFlushTime(TsFileResource resource) {
long timePartitionId = resource.getTimePartition();
Map<IDeviceID, Long> endTimeMap = new HashMap<>();
for (IDeviceID deviceId : resource.getDevices()) {
long endTime = resource.getEndTime(deviceId);
@SuppressWarnings("OptionalGetWithoutIsPresent") // checked above
long endTime = resource.getEndTime(deviceId).get();
endTimeMap.put(deviceId, endTime);
}
if (config.isEnableSeparateData()) {
Expand All @@ -682,7 +683,9 @@ protected void upgradeAndUpdateDeviceLastFlushTime(
Map<IDeviceID, Long> endTimeMap = new HashMap<>();
for (TsFileResource resource : resources) {
for (IDeviceID deviceId : resource.getDevices()) {
long endTime = resource.getEndTime(deviceId);
// checked above
//noinspection OptionalGetWithoutIsPresent
long endTime = resource.getEndTime(deviceId).get();
endTimeMap.put(deviceId, endTime);
}
}
Expand Down Expand Up @@ -2429,6 +2432,7 @@ public void insertSeparatorToWAL() {
new ContinuousSameSearchIndexSeparatorNode()));
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
private boolean canSkipDelete(TsFileResource tsFileResource, ModEntry deletion) {
long fileStartTime = tsFileResource.getTimeIndex().getMinStartTime();
long fileEndTime =
Expand All @@ -2446,10 +2450,11 @@ private boolean canSkipDelete(TsFileResource tsFileResource, ModEntry deletion)
}

for (IDeviceID device : tsFileResource.getDevices()) {
long startTime = tsFileResource.getTimeIndex().getStartTime(device);
// we are iterating the time index so the times are definitely present
long startTime = tsFileResource.getTimeIndex().getStartTime(device).get();
long endTime =
tsFileResource.isClosed()
? tsFileResource.getTimeIndex().getEndTime(device)
? tsFileResource.getTimeIndex().getEndTime(device).get()
: Long.MAX_VALUE;
if (deletion.affects(device, startTime, endTime)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ public void perform() throws Exception {
long ttl = deviceIterator.getTTLForCurrentDevice();
sortedSourceFiles.removeIf(
x -> x.definitelyNotContains(device) || !x.isDeviceAlive(device, ttl));
sortedSourceFiles.sort(Comparator.comparingLong(x -> x.getStartTime(device)));
// checked above
//noinspection OptionalGetWithoutIsPresent
sortedSourceFiles.sort(Comparator.comparingLong(x -> x.getStartTime(device).get()));
if (ttl != Long.MAX_VALUE) {
ModEntry ttlDeletion =
CompactionUtils.convertTtlToDeletion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -425,7 +426,8 @@ private void applyModificationForAlignedChunkMetadataList(
}
IDeviceID device = currentDevice.getLeft();
ModEntry ttlDeletion = null;
if (tsFileResource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
Optional<Long> startTime = tsFileResource.getStartTime(device);
if (startTime.isPresent() && startTime.get() < timeLowerBoundForCurrentDevice) {
ttlDeletion = CompactionUtils.convertTtlToDeletion(device, timeLowerBoundForCurrentDevice);
}

Expand Down Expand Up @@ -662,7 +664,8 @@ public String nextSeries() throws IllegalPathException {
Map<String, List<ChunkMetadata>> chunkMetadataListMap = chunkMetadataCacheMap.get(reader);

ModEntry ttlDeletion = null;
if (resource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
Optional<Long> startTime = resource.getStartTime(device);
if (startTime.isPresent() && startTime.get() < timeLowerBoundForCurrentDevice) {
ttlDeletion =
new TreeDeletionEntry(
new MeasurementPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;

public abstract class SeriesCompactionExecutor {
Expand Down Expand Up @@ -349,14 +350,19 @@ private void checkAndCompactOverlapPage(PageElement nextPageElement, TimeValuePa
* list is ordered according to the startTime of the current device in the file from small to
* large, so that each file can be compacted in order.
*/
protected List<FileElement> findOverlapFiles(FileElement file) {
protected List<FileElement> findOverlapFiles(FileElement fileToCheck) {
List<FileElement> overlappedFiles = new ArrayList<>();
long endTime = file.resource.getEndTime(deviceId);
for (FileElement fileElement : fileList) {
if (fileElement.resource.getStartTime(deviceId) <= endTime) {
if (!fileElement.isSelected) {
overlappedFiles.add(fileElement);
fileElement.isSelected = true;
Optional<Long> endTimeInCheckingFile = fileToCheck.resource.getEndTime(deviceId);
for (FileElement otherFile : fileList) {
if (!endTimeInCheckingFile.isPresent()) {
continue;
}
Optional<Long> startTimeInOtherFile = otherFile.resource.getStartTime(deviceId);
if (startTimeInOtherFile.isPresent()
&& startTimeInOtherFile.get() <= endTimeInCheckingFile.get()) {
if (!otherFile.isSelected) {
overlappedFiles.add(otherFile);
otherFile.isSelected = true;
}
} else {
break;
Expand Down
Loading
Loading