From 4bd629ef239f354d7fd28e09f75485767d88bcd2 Mon Sep 17 00:00:00 2001 From: Zikun Ma <55695098+DanielWang2035@users.noreply.github.com> Date: Fri, 18 Oct 2024 20:25:10 +0800 Subject: [PATCH] Load: verify measurement datatype & register table schema in split file (#13827) Co-authored-by: Steve Yurong Su --- .../load/LoadTsFileToTableModelAnalyzer.java | 84 ++++++++++++++++--- .../plan/relational/metadata/TableSchema.java | 34 +++++++- .../scheduler/load/LoadTsFileScheduler.java | 15 +++- .../storageengine/load/LoadTsFileManager.java | 21 +++-- .../load/splitter/AlignedChunkData.java | 2 +- .../load/splitter/ChunkData.java | 4 +- .../load/splitter/DeletionData.java | 6 +- .../load/splitter/NonAlignedChunkData.java | 2 +- .../load/splitter/TsFileData.java | 13 ++- .../load/splitter/TsFileDataType.java | 25 ++++++ 10 files changed, 170 insertions(+), 36 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileDataType.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java index 63aabf41d393..7b50770a7f98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.db.exception.LoadEmptyFileException; import org.apache.iotdb.db.exception.LoadReadOnlyException; @@ -30,8 +31,10 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ITableDeviceSchemaValidation; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -44,17 +47,19 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.commons.io.FileUtils; import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -70,6 +75,9 @@ public class LoadTsFileToTableModelAnalyzer extends LoadTsFileAnalyzer { private final Metadata metadata; + // tableName -> Pair + private final Map>> tableIdColumnMapper = new HashMap<>(); + public LoadTsFileToTableModelAnalyzer( LoadTsFileStatement loadTsFileStatement, Metadata metadata, MPPQueryContext context) { super(loadTsFileStatement, context); @@ -137,19 +145,15 @@ protected void analyzeSingleTsFile(final File tsFile) // construct tsfile resource final TsFileResource tsFileResource = constructTsFileResource(reader, tsFile); - for (Map.Entry name2Schema : + for (Map.Entry name2Schema : reader.readFileMetadata().getTableSchemaMap().entrySet()) { + final TableSchema fileSchema = + TableSchema.fromTsFileTableSchema(name2Schema.getKey(), name2Schema.getValue()); + final TableSchema realSchema; // TODO: remove this synchronized block after the metadata is thread-safe synchronized (metadata) { - org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema realSchema = - metadata - .validateTableHeaderSchema( - database, - org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema - .fromTsFileTableSchema(name2Schema.getKey(), name2Schema.getValue()), - context, - true) - .orElse(null); + realSchema = + metadata.validateTableHeaderSchema(database, fileSchema, context, true).orElse(null); if (Objects.isNull(realSchema)) { throw new VerifyMetadataException( String.format( @@ -157,6 +161,8 @@ protected void analyzeSingleTsFile(final File tsFile) name2Schema.getKey(), name2Schema.getValue())); } } + tableIdColumnMapper.clear(); + verifyTableDataTypeAndGenerateIdColumnMapper(fileSchema, realSchema); } long writePointCount = 0; @@ -193,6 +199,42 @@ protected void analyzeSingleTsFile(final File tsFile) } } + private void verifyTableDataTypeAndGenerateIdColumnMapper( + TableSchema fileSchema, TableSchema realSchema) throws VerifyMetadataException { + final int realIdColumnCount = realSchema.getIdColumns().size(); + final List idColumnMapping = + tableIdColumnMapper + .computeIfAbsent( + realSchema.getTableName(), k -> new Pair<>(realIdColumnCount, new ArrayList<>())) + .getRight(); + for (int i = 0; i < fileSchema.getColumns().size(); i++) { + final ColumnSchema fileColumn = fileSchema.getColumns().get(i); + if (fileColumn.getColumnCategory() == TsTableColumnCategory.ID) { + final int realIndex = realSchema.getIndexAmongIdColumns(fileColumn.getName()); + if (realIndex != -1) { + idColumnMapping.add(realIndex); + } else { + throw new VerifyMetadataException( + String.format( + "Id column %s in TsFile is not found in IoTDB table %s", + fileColumn.getName(), realSchema.getTableName())); + } + } else if (fileColumn.getColumnCategory() == TsTableColumnCategory.MEASUREMENT) { + final ColumnSchema realColumn = + realSchema.getColumn(fileColumn.getName(), fileColumn.getColumnCategory()); + if (!fileColumn.getType().equals(realColumn.getType())) { + throw new VerifyMetadataException( + String.format( + "Data type mismatch for column %s in table %s, type in TsFile: %s, type in IoTDB: %s", + realColumn.getName(), + realSchema.getTableName(), + fileColumn.getType(), + realColumn.getType())); + } + } + } + } + private void autoCreateDatabase(final String database) throws VerifyMetadataException { validateDatabaseName(database); final CreateDBTask task = @@ -228,8 +270,24 @@ public String getTableName() { @Override public List getDeviceIdList() { - return Collections.singletonList( - Arrays.copyOfRange(deviceId.getSegments(), 1, deviceId.getSegments().length)); + final Pair> idColumnCountAndMapper = + analyzer.tableIdColumnMapper.get(deviceId.getTableName()); + if (Objects.isNull(idColumnCountAndMapper)) { + // This should not happen + LOGGER.warn( + "Failed to find id column mapping for table {}, deviceId: {}", + deviceId.getTableName(), + deviceId); + return Collections.singletonList( + Arrays.copyOfRange(deviceId.getSegments(), 1, deviceId.getSegments().length)); + } + + final Object[] deviceIdArray = new String[idColumnCountAndMapper.getLeft()]; + for (int i = 0; i < idColumnCountAndMapper.getRight().size(); i++) { + final int j = idColumnCountAndMapper.getRight().get(i); + deviceIdArray[j] = deviceId.getSegments()[i + 1]; + } + return Collections.singletonList(deviceIdArray); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java index 6b24740732d7..af4307dc15d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java @@ -54,6 +54,31 @@ public List getColumns() { return columns; } + /** Get the column with the specified name and category, return null if not found. */ + public ColumnSchema getColumn(String columnName, TsTableColumnCategory columnCategory) { + for (final ColumnSchema column : columns) { + if (column.getName().equals(columnName) && column.getColumnCategory() == columnCategory) { + return column; + } + } + return null; + } + + /** + * Given the name of an ID column, return the index of this column among all ID columns, return -1 + * if not found. + */ + public int getIndexAmongIdColumns(String idColumnName) { + int index = 0; + for (ColumnSchema column : getIdColumns()) { + if (column.getName().equals(idColumnName)) { + return index; + } + index++; + } + return -1; + } + public static TableSchema of(TsTable tsTable) { String tableName = tsTable.getTableName(); List columns = new ArrayList<>(); @@ -112,6 +137,12 @@ public static TableSchema fromTsFileTableSchema( continue; } + // TsFile should not contain attribute columns by design. + final ColumnType columnType = tsFileTableSchema.getColumnTypes().get(i); + if (columnType == ColumnType.ATTRIBUTE) { + continue; + } + final TSDataType dataType = tsFileTableSchema.getColumnSchemas().get(i).getType(); if (dataType == TSDataType.VECTOR) { continue; @@ -122,8 +153,7 @@ public static TableSchema fromTsFileTableSchema( columnName, InternalTypeManager.fromTSDataType(dataType), false, - TsTableColumnCategory.fromTsFileColumnType( - tsFileTableSchema.getColumnTypes().get(i)))); + TsTableColumnCategory.fromTsFileColumnType(columnType))); } return new TableSchema(tableName, columns); } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index fc116f409dc0..cfa4965fe5d9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -61,6 +61,7 @@ import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet; import org.apache.iotdb.db.storageengine.load.splitter.ChunkData; +import org.apache.iotdb.db.storageengine.load.splitter.DeletionData; import org.apache.iotdb.db.storageengine.load.splitter.TsFileData; import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter; import org.apache.iotdb.metrics.utils.MetricLevel; @@ -525,9 +526,15 @@ public TsFileDataManager( } private boolean addOrSendTsFileData(TsFileData tsFileData) { - return tsFileData.isModification() - ? addOrSendDeletionData(tsFileData) - : addOrSendChunkData((ChunkData) tsFileData); + switch (tsFileData.getType()) { + case CHUNK: + return addOrSendChunkData((ChunkData) tsFileData); + case DELETION: + return addOrSendDeletionData((DeletionData) tsFileData); + default: + throw new UnsupportedOperationException( + String.format("Unsupported TsFileDataType %s.", tsFileData.getType())); + } } private boolean isMemoryEnough() { @@ -605,7 +612,7 @@ private void routeChunkData() { nonDirectionalChunkData.clear(); } - private boolean addOrSendDeletionData(TsFileData deletionData) { + private boolean addOrSendDeletionData(DeletionData deletionData) { routeChunkData(); // ensure chunk data will be added before deletion for (Map.Entry entry : replicaSet2Piece.entrySet()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 4a313a6392ea..75c82a9a1d38 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -220,12 +220,17 @@ public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNo } for (TsFileData tsFileData : pieceNode.getAllTsFileData()) { - if (!tsFileData.isModification()) { - ChunkData chunkData = (ChunkData) tsFileData; - writerManager.write( - new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData); - } else { - writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData); + switch (tsFileData.getType()) { + case CHUNK: + ChunkData chunkData = (ChunkData) tsFileData; + writerManager.write( + new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData); + break; + case DELETION: + writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData); + break; + default: + throw new IOException("Unsupported TsFileData type: " + tsFileData.getType()); } } } finally { @@ -398,7 +403,9 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws return; } - dataPartition2Writer.put(partitionInfo, new TsFileIOWriter(newTsFile)); + final TsFileIOWriter writer = new TsFileIOWriter(newTsFile); + writer.setGenerateTableSchema(true); + dataPartition2Writer.put(partitionInfo, writer); } TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo); if (!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo, ""))) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java index c49a1d38cc35..f20a34cb3cab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java @@ -162,7 +162,7 @@ public void addValueChunk(final ChunkHeader chunkHeader) { @Override public void serialize(final DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(isModification(), stream); + ReadWriteIOUtils.write(getType().ordinal(), stream); ReadWriteIOUtils.write(isAligned(), stream); serializeAttr(stream); byteStream.writeTo(stream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java index 3b16a9d660c4..f64121527aa4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java @@ -50,8 +50,8 @@ public interface ChunkData extends TsFileData { void writeToFileWriter(TsFileIOWriter writer) throws IOException; @Override - default boolean isModification() { - return false; + default TsFileDataType getType() { + return TsFileDataType.CHUNK; } static ChunkData deserialize(InputStream stream) throws PageException, IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java index 186426650fe5..1730df9cf50a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java @@ -49,13 +49,13 @@ public void writeToModificationFile(ModificationFile modificationFile, long file } @Override - public boolean isModification() { - return true; + public TsFileDataType getType() { + return TsFileDataType.DELETION; } @Override public void serialize(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(isModification(), stream); + ReadWriteIOUtils.write(getType().ordinal(), stream); deletion.serializeWithoutFileOffset(stream); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java index 2cd6860b7bfa..230e7c127c5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java @@ -120,7 +120,7 @@ public void writeToFileWriter(final TsFileIOWriter writer) throws IOException { @Override public void serialize(final DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(isModification(), stream); + ReadWriteIOUtils.write(getType().ordinal(), stream); ReadWriteIOUtils.write(isAligned(), stream); serializeAttr(stream); byteStream.writeTo(stream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java index eee4eac2b3f9..f24eb45c01bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java @@ -31,13 +31,20 @@ public interface TsFileData { long getDataSize(); - boolean isModification(); + TsFileDataType getType(); void serialize(DataOutputStream stream) throws IOException; static TsFileData deserialize(InputStream stream) throws IOException, PageException, IllegalPathException { - boolean isModification = ReadWriteIOUtils.readBool(stream); - return isModification ? DeletionData.deserialize(stream) : ChunkData.deserialize(stream); + final TsFileDataType type = TsFileDataType.values()[ReadWriteIOUtils.readInt(stream)]; + switch (type) { + case CHUNK: + return ChunkData.deserialize(stream); + case DELETION: + return DeletionData.deserialize(stream); + default: + throw new UnsupportedOperationException("Unknown TsFileData type: " + type); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileDataType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileDataType.java new file mode 100644 index 000000000000..9e3f73f724dd --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileDataType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.splitter; + +public enum TsFileDataType { + CHUNK, + DELETION +}