Skip to content

Commit

Permalink
Load: verify measurement datatype & register table schema in split fi…
Browse files Browse the repository at this point in the history
…le (#13827)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
DanielWang2035 and SteveYurongSu authored Oct 18, 2024
1 parent b3a2142 commit 4bd629e
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.analyze.load;

import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.exception.LoadEmptyFileException;
import org.apache.iotdb.db.exception.LoadReadOnlyException;
Expand All @@ -30,8 +31,10 @@
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ITableDeviceSchemaValidation;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
Expand All @@ -44,17 +47,19 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.io.FileUtils;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -70,6 +75,9 @@ public class LoadTsFileToTableModelAnalyzer extends LoadTsFileAnalyzer {

private final Metadata metadata;

// tableName -> Pair<device column count, device column mapping>
private final Map<String, Pair<Integer, List<Integer>>> tableIdColumnMapper = new HashMap<>();

public LoadTsFileToTableModelAnalyzer(
LoadTsFileStatement loadTsFileStatement, Metadata metadata, MPPQueryContext context) {
super(loadTsFileStatement, context);
Expand Down Expand Up @@ -137,26 +145,24 @@ protected void analyzeSingleTsFile(final File tsFile)
// construct tsfile resource
final TsFileResource tsFileResource = constructTsFileResource(reader, tsFile);

for (Map.Entry<String, TableSchema> name2Schema :
for (Map.Entry<String, org.apache.tsfile.file.metadata.TableSchema> name2Schema :
reader.readFileMetadata().getTableSchemaMap().entrySet()) {
final TableSchema fileSchema =
TableSchema.fromTsFileTableSchema(name2Schema.getKey(), name2Schema.getValue());
final TableSchema realSchema;
// TODO: remove this synchronized block after the metadata is thread-safe
synchronized (metadata) {
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema realSchema =
metadata
.validateTableHeaderSchema(
database,
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema
.fromTsFileTableSchema(name2Schema.getKey(), name2Schema.getValue()),
context,
true)
.orElse(null);
realSchema =
metadata.validateTableHeaderSchema(database, fileSchema, context, true).orElse(null);
if (Objects.isNull(realSchema)) {
throw new VerifyMetadataException(
String.format(
"Failed to validate schema for table {%s, %s}",
name2Schema.getKey(), name2Schema.getValue()));
}
}
tableIdColumnMapper.clear();
verifyTableDataTypeAndGenerateIdColumnMapper(fileSchema, realSchema);
}

long writePointCount = 0;
Expand Down Expand Up @@ -193,6 +199,42 @@ protected void analyzeSingleTsFile(final File tsFile)
}
}

private void verifyTableDataTypeAndGenerateIdColumnMapper(
TableSchema fileSchema, TableSchema realSchema) throws VerifyMetadataException {
final int realIdColumnCount = realSchema.getIdColumns().size();
final List<Integer> idColumnMapping =
tableIdColumnMapper
.computeIfAbsent(
realSchema.getTableName(), k -> new Pair<>(realIdColumnCount, new ArrayList<>()))
.getRight();
for (int i = 0; i < fileSchema.getColumns().size(); i++) {
final ColumnSchema fileColumn = fileSchema.getColumns().get(i);
if (fileColumn.getColumnCategory() == TsTableColumnCategory.ID) {
final int realIndex = realSchema.getIndexAmongIdColumns(fileColumn.getName());
if (realIndex != -1) {
idColumnMapping.add(realIndex);
} else {
throw new VerifyMetadataException(
String.format(
"Id column %s in TsFile is not found in IoTDB table %s",
fileColumn.getName(), realSchema.getTableName()));
}
} else if (fileColumn.getColumnCategory() == TsTableColumnCategory.MEASUREMENT) {
final ColumnSchema realColumn =
realSchema.getColumn(fileColumn.getName(), fileColumn.getColumnCategory());
if (!fileColumn.getType().equals(realColumn.getType())) {
throw new VerifyMetadataException(
String.format(
"Data type mismatch for column %s in table %s, type in TsFile: %s, type in IoTDB: %s",
realColumn.getName(),
realSchema.getTableName(),
fileColumn.getType(),
realColumn.getType()));
}
}
}
}

private void autoCreateDatabase(final String database) throws VerifyMetadataException {
validateDatabaseName(database);
final CreateDBTask task =
Expand Down Expand Up @@ -228,8 +270,24 @@ public String getTableName() {

@Override
public List<Object[]> getDeviceIdList() {
return Collections.singletonList(
Arrays.copyOfRange(deviceId.getSegments(), 1, deviceId.getSegments().length));
final Pair<Integer, List<Integer>> idColumnCountAndMapper =
analyzer.tableIdColumnMapper.get(deviceId.getTableName());
if (Objects.isNull(idColumnCountAndMapper)) {
// This should not happen
LOGGER.warn(
"Failed to find id column mapping for table {}, deviceId: {}",
deviceId.getTableName(),
deviceId);
return Collections.singletonList(
Arrays.copyOfRange(deviceId.getSegments(), 1, deviceId.getSegments().length));
}

final Object[] deviceIdArray = new String[idColumnCountAndMapper.getLeft()];
for (int i = 0; i < idColumnCountAndMapper.getRight().size(); i++) {
final int j = idColumnCountAndMapper.getRight().get(i);
deviceIdArray[j] = deviceId.getSegments()[i + 1];
}
return Collections.singletonList(deviceIdArray);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,31 @@ public List<ColumnSchema> getColumns() {
return columns;
}

/** Get the column with the specified name and category, return null if not found. */
public ColumnSchema getColumn(String columnName, TsTableColumnCategory columnCategory) {
for (final ColumnSchema column : columns) {
if (column.getName().equals(columnName) && column.getColumnCategory() == columnCategory) {
return column;
}
}
return null;
}

/**
* Given the name of an ID column, return the index of this column among all ID columns, return -1
* if not found.
*/
public int getIndexAmongIdColumns(String idColumnName) {
int index = 0;
for (ColumnSchema column : getIdColumns()) {
if (column.getName().equals(idColumnName)) {
return index;
}
index++;
}
return -1;
}

public static TableSchema of(TsTable tsTable) {
String tableName = tsTable.getTableName();
List<ColumnSchema> columns = new ArrayList<>();
Expand Down Expand Up @@ -112,6 +137,12 @@ public static TableSchema fromTsFileTableSchema(
continue;
}

// TsFile should not contain attribute columns by design.
final ColumnType columnType = tsFileTableSchema.getColumnTypes().get(i);
if (columnType == ColumnType.ATTRIBUTE) {
continue;
}

final TSDataType dataType = tsFileTableSchema.getColumnSchemas().get(i).getType();
if (dataType == TSDataType.VECTOR) {
continue;
Expand All @@ -122,8 +153,7 @@ public static TableSchema fromTsFileTableSchema(
columnName,
InternalTypeManager.fromTSDataType(dataType),
false,
TsTableColumnCategory.fromTsFileColumnType(
tsFileTableSchema.getColumnTypes().get(i))));
TsTableColumnCategory.fromTsFileColumnType(columnType)));
}
return new TableSchema(tableName, columns);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter;
import org.apache.iotdb.metrics.utils.MetricLevel;
Expand Down Expand Up @@ -525,9 +526,15 @@ public TsFileDataManager(
}

private boolean addOrSendTsFileData(TsFileData tsFileData) {
return tsFileData.isModification()
? addOrSendDeletionData(tsFileData)
: addOrSendChunkData((ChunkData) tsFileData);
switch (tsFileData.getType()) {
case CHUNK:
return addOrSendChunkData((ChunkData) tsFileData);
case DELETION:
return addOrSendDeletionData((DeletionData) tsFileData);
default:
throw new UnsupportedOperationException(
String.format("Unsupported TsFileDataType %s.", tsFileData.getType()));
}
}

private boolean isMemoryEnough() {
Expand Down Expand Up @@ -605,7 +612,7 @@ private void routeChunkData() {
nonDirectionalChunkData.clear();
}

private boolean addOrSendDeletionData(TsFileData deletionData) {
private boolean addOrSendDeletionData(DeletionData deletionData) {
routeChunkData(); // ensure chunk data will be added before deletion

for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,17 @@ public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNo
}

for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
if (!tsFileData.isModification()) {
ChunkData chunkData = (ChunkData) tsFileData;
writerManager.write(
new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
} else {
writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
switch (tsFileData.getType()) {
case CHUNK:
ChunkData chunkData = (ChunkData) tsFileData;
writerManager.write(
new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
break;
case DELETION:
writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
break;
default:
throw new IOException("Unsupported TsFileData type: " + tsFileData.getType());
}
}
} finally {
Expand Down Expand Up @@ -398,7 +403,9 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws
return;
}

dataPartition2Writer.put(partitionInfo, new TsFileIOWriter(newTsFile));
final TsFileIOWriter writer = new TsFileIOWriter(newTsFile);
writer.setGenerateTableSchema(true);
dataPartition2Writer.put(partitionInfo, writer);
}
TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
if (!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo, ""))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void addValueChunk(final ChunkHeader chunkHeader) {

@Override
public void serialize(final DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(isModification(), stream);
ReadWriteIOUtils.write(getType().ordinal(), stream);
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
byteStream.writeTo(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public interface ChunkData extends TsFileData {
void writeToFileWriter(TsFileIOWriter writer) throws IOException;

@Override
default boolean isModification() {
return false;
default TsFileDataType getType() {
return TsFileDataType.CHUNK;
}

static ChunkData deserialize(InputStream stream) throws PageException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ public void writeToModificationFile(ModificationFile modificationFile, long file
}

@Override
public boolean isModification() {
return true;
public TsFileDataType getType() {
return TsFileDataType.DELETION;
}

@Override
public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(isModification(), stream);
ReadWriteIOUtils.write(getType().ordinal(), stream);
deletion.serializeWithoutFileOffset(stream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void writeToFileWriter(final TsFileIOWriter writer) throws IOException {

@Override
public void serialize(final DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(isModification(), stream);
ReadWriteIOUtils.write(getType().ordinal(), stream);
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
byteStream.writeTo(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,20 @@
public interface TsFileData {
long getDataSize();

boolean isModification();
TsFileDataType getType();

void serialize(DataOutputStream stream) throws IOException;

static TsFileData deserialize(InputStream stream)
throws IOException, PageException, IllegalPathException {
boolean isModification = ReadWriteIOUtils.readBool(stream);
return isModification ? DeletionData.deserialize(stream) : ChunkData.deserialize(stream);
final TsFileDataType type = TsFileDataType.values()[ReadWriteIOUtils.readInt(stream)];
switch (type) {
case CHUNK:
return ChunkData.deserialize(stream);
case DELETION:
return DeletionData.deserialize(stream);
default:
throw new UnsupportedOperationException("Unknown TsFileData type: " + type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.load.splitter;

public enum TsFileDataType {
CHUNK,
DELETION
}

0 comments on commit 4bd629e

Please sign in to comment.