Skip to content

Commit

Permalink
Pipe: Fix TsFileInsertionDataContainer may generate ClassCastException (
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored Sep 5, 2024
1 parent 27f9065 commit beabc19
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private Tablet buildNextTablet() throws IOException {
tablet.addValue(
measurements.get(i),
rowIndex,
field == null ? null : field.getObjectValue(field.getDataType()));
field == null ? null : field.getObjectValue(schemas.get(i).getType()));
}

tablet.rowSize++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
import org.apache.tsfile.read.reader.chunk.ChunkReader;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
Expand Down Expand Up @@ -206,7 +205,7 @@ private Tablet getNextTablet() {
final int rowIndex = tablet.rowSize;

tablet.addTimestamp(rowIndex, data.currentTime());
putValueToColumns(data, tablet.values, tablet.bitMaps, rowIndex);
putValueToColumns(data, tablet, rowIndex);

tablet.rowSize++;
}
Expand Down Expand Up @@ -256,17 +255,17 @@ private void prepareData() throws IOException {
} while (!data.hasCurrent());
}

private void putValueToColumns(
final BatchData data, final Object[] columns, final BitMap[] bitMaps, final int rowIndex) {
final TSDataType type = data.getDataType();
if (type == TSDataType.VECTOR) {
private void putValueToColumns(final BatchData data, final Tablet tablet, final int rowIndex) {
final Object[] columns = tablet.values;

if (data.getDataType() == TSDataType.VECTOR) {
for (int i = 0; i < columns.length; ++i) {
final TsPrimitiveType primitiveType = data.getVector()[i];
if (Objects.isNull(primitiveType)) {
bitMaps[i].mark(rowIndex);
tablet.bitMaps[i].mark(rowIndex);
continue;
}
switch (primitiveType.getDataType()) {
switch (tablet.getSchemas().get(i).getType()) {
case BOOLEAN:
((boolean[]) columns[i])[rowIndex] = primitiveType.getBoolean();
break;
Expand Down Expand Up @@ -297,7 +296,7 @@ private void putValueToColumns(
}
}
} else {
switch (type) {
switch (tablet.getSchemas().get(0).getType()) {
case BOOLEAN:
((boolean[]) columns[0])[rowIndex] = data.getBoolean();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.pipe.api.access.Row;

import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.TsFileWriter;
Expand All @@ -54,6 +56,7 @@

import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -525,24 +528,27 @@ private void testPartialNullValue(final boolean isQuery)
alignedTsFile = new File("0-0-2-0.tsfile");

final List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT32));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s1", TSDataType.TIMESTAMP));
schemaList.add(new MeasurementSchema("s2", TSDataType.STRING));
schemaList.add(new MeasurementSchema("s3", TSDataType.DATE));

final Tablet t = new Tablet("root.sg.d", schemaList, 1024);
t.rowSize = 2;
t.addTimestamp(0, 1000);
t.addTimestamp(1, 2000);
t.addValue("s1", 0, 2);
t.addValue("s1", 0, 2L);
t.addValue("s2", 0, null);
t.addValue("s3", 0, LocalDate.of(2020, 8, 1));
t.addValue("s1", 1, null);
t.addValue("s2", 1, 2L);
t.addValue("s2", 1, new Binary("test", TSFileConfig.STRING_CHARSET));
t.addValue("s3", 1, LocalDate.of(2024, 8, 1));

try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList);
writer.writeAligned(t);
}
testTsFilePointNum(
alignedTsFile, new PrefixPipePattern("root"), Long.MIN_VALUE, Long.MAX_VALUE, isQuery, 2);
alignedTsFile, new PrefixPipePattern("root"), Long.MIN_VALUE, Long.MAX_VALUE, isQuery, 4);
}

private void testTsFilePointNum(
Expand Down

0 comments on commit beabc19

Please sign in to comment.