Skip to content

Commit

Permalink
[Table Model] table model compaction (#12428)
Browse files Browse the repository at this point in the history
* table model compaction

* modify ut

* modify CompactionTableModelTestFileWriter

* modify CompactionTableModelTestFileWriter

* fix map concurrent modification

* add ut

* suppport ReadPointCompactionPerformer

* fix read point compaction performer ut

* add v3 tsfile in test resource for compaction test

* add ut

* skip collect tree model table schema in compaction

* modify ut

* empty

* fix test

* empty

* Revert "fix test"

This reverts commit b01c233.

* fix v3tsfile resource no such file

* fix ut

* fix ut

* fix ut

* fix ut
  • Loading branch information
shuwenwei authored May 22, 2024
1 parent 6e569b7 commit 181c25f
Show file tree
Hide file tree
Showing 48 changed files with 2,387 additions and 962 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;

public class CompactionTableSchemaNotMatchException extends RuntimeException {
public CompactionTableSchemaNotMatchException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionPerformerSubTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchemaCollector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
Expand All @@ -46,6 +47,7 @@
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -112,6 +114,9 @@ public void perform() throws Exception {
isCrossCompaction
? new FastCrossCompactionWriter(targetFiles, seqFiles, readerCacheMap)
: new FastInnerCompactionWriter(targetFiles.get(0))) {
List<Schema> schemas =
CompactionTableSchemaCollector.collectSchema(seqFiles, unseqFiles, readerCacheMap);
compactionWriter.setSchemaForAllTargetFile(schemas);
while (deviceIterator.hasNextDevice()) {
checkThreadInterrupted();
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
Expand Down Expand Up @@ -141,6 +146,7 @@ public void perform() throws Exception {
subTaskSummary.setTemporalFileSize(compactionWriter.getWriterSize());
sortedSourceFiles.clear();
}
compactionWriter.removeUnusedTableSchema();
compactionWriter.endFile();
CompactionUtils.updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchemaCollector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.ReadChunkAlignedSeriesCompactionExecutor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
Expand Down Expand Up @@ -80,6 +81,8 @@ public void perform()
targetResource.getTsFile(),
sizeForFileWriter,
CompactionType.INNER_SEQ_COMPACTION)) {
writer.setSchema(
CompactionTableSchemaCollector.collectSchema(seqFiles, deviceIterator.getReaderMap()));
while (deviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
IDeviceID device = deviceInfo.left;
Expand All @@ -97,6 +100,7 @@ public void perform()
for (TsFileResource tsFileResource : seqFiles) {
targetResource.updatePlanIndexes(tsFileResource);
}
writer.removeUnusedTableSchema();
writer.endFile();
if (writer.isEmptyTargetFile()) {
targetResource.forceMarkDeleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IUnseqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.ReadPointPerformerSubTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchemaCollector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader;
Expand All @@ -48,6 +49,7 @@
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -110,6 +112,10 @@ public void perform() throws Exception {
// Do not close device iterator, because tsfile reader is managed by FileReaderManager.
MultiTsFileDeviceIterator deviceIterator =
new MultiTsFileDeviceIterator(seqFiles, unseqFiles);
List<Schema> schemas =
CompactionTableSchemaCollector.collectSchema(
seqFiles, unseqFiles, deviceIterator.getReaderMap());
compactionWriter.setSchemaForAllTargetFile(schemas);
while (deviceIterator.hasNextDevice()) {
checkThreadInterrupted();
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
Expand All @@ -127,6 +133,7 @@ public void perform() throws Exception {
summary.setTemporalFileSize(compactionWriter.getWriterSize());
}

compactionWriter.removeUnusedTableSchema();
compactionWriter.endFile();
CompactionUtils.updatePlanIndexes(targetFiles, seqFiles, unseqFiles);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;

import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionTableSchemaNotMatchException;

import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.write.record.Tablet.ColumnType;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.util.ArrayList;
import java.util.List;

public class CompactionTableSchema extends TableSchema {
public CompactionTableSchema(String tableName) {
super(tableName);
}

public void merge(TableSchema tableSchema) {
if (tableSchema == null) {
return;
}
if (!tableSchema.getTableName().equals(this.tableName)) {
throw new CompactionTableSchemaNotMatchException(
"this.tableName is " + tableName + " merge tableName is " + tableSchema.getTableName());
}
// filter id columns
List<IMeasurementSchema> otherSchemaColumnSchemas = tableSchema.getColumnSchemas();
List<ColumnType> otherSchemaColumnTypes = tableSchema.getColumnTypes();
List<IMeasurementSchema> idColumnSchemasToMerge = new ArrayList<>();

for (int i = 0; i < otherSchemaColumnTypes.size(); i++) {
ColumnType columnType = otherSchemaColumnTypes.get(i);
if (columnType != ColumnType.ID) {
break;
}
idColumnSchemasToMerge.add(otherSchemaColumnSchemas.get(i));
}

// check id column prefix
int prefixLength = Math.min(this.columnTypes.size(), idColumnSchemasToMerge.size());
for (int i = 0; i < prefixLength; i++) {
IMeasurementSchema idColumnToMerge = idColumnSchemasToMerge.get(i);
IMeasurementSchema currentIdColumn = columnSchemas.get(i);
if (!idColumnToMerge.getMeasurementId().equals(currentIdColumn.getMeasurementId())) {
throw new CompactionTableSchemaNotMatchException(
"current id column name is "
+ currentIdColumn.getMeasurementId()
+ ", other id column name in same position is "
+ idColumnToMerge.getMeasurementId());
}
}

// add new id columns
List<IMeasurementSchema> newIdColumns =
idColumnSchemasToMerge.subList(prefixLength, idColumnSchemasToMerge.size());
for (IMeasurementSchema newIdColumn : newIdColumns) {
columnTypes.add(ColumnType.ID);
columnSchemas.add(newIdColumn);
}
}

public CompactionTableSchema copy() {
CompactionTableSchema tableSchema = new CompactionTableSchema(this.tableName);
tableSchema.columnSchemas = new ArrayList<>(this.columnSchemas);
tableSchema.columnTypes = new ArrayList<>(this.columnTypes);
return tableSchema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;

import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;

import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.write.schema.Schema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CompactionTableSchemaCollector {
private CompactionTableSchemaCollector() {}

public static List<Schema> collectSchema(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles,
Map<TsFileResource, TsFileSequenceReader> readerMap)
throws IOException {
List<Schema> targetSchemas = new ArrayList<>(seqFiles.size());
Schema schema =
collectSchema(
Stream.concat(seqFiles.stream(), unseqFiles.stream()).collect(Collectors.toList()),
readerMap);

targetSchemas.add(schema);
for (int i = 1; i < seqFiles.size(); i++) {
Schema copySchema = new Schema();
for (TableSchema tableSchema : schema.getTableSchemaMap().values()) {
copySchema.registerTableSchema(((CompactionTableSchema) tableSchema).copy());
}
targetSchemas.add(copySchema);
}
return targetSchemas;
}

public static Schema collectSchema(
List<TsFileResource> sourceFiles, Map<TsFileResource, TsFileSequenceReader> readerMap)
throws IOException {
Schema targetSchema = new Schema();
Map<String, TableSchema> targetTableSchemaMap = new HashMap<>();
for (TsFileResource resource : sourceFiles) {
TsFileSequenceReader reader = readerMap.get(resource);
Map<String, TableSchema> tableSchemaMap = reader.readFileMetadata().getTableSchemaMap();
if (tableSchemaMap == null) {
// v3 tsfile
continue;
}
for (Map.Entry<String, TableSchema> entry : tableSchemaMap.entrySet()) {
String tableName = entry.getKey();
TableSchema currentTableSchema = entry.getValue();
if (isTreeModel(currentTableSchema)) {
continue;
}
// merge all id columns, measurement schema will be generated automatically when end chunk
// group
CompactionTableSchema collectedTableSchema =
(CompactionTableSchema) targetTableSchemaMap.get(tableName);
if (collectedTableSchema == null) {
collectedTableSchema = new CompactionTableSchema(tableName);
targetTableSchemaMap.put(tableName, collectedTableSchema);
}
collectedTableSchema.merge(currentTableSchema);
}
}
targetTableSchemaMap.values().forEach(targetSchema::registerTableSchema);
return targetSchema;
}

private static boolean isTreeModel(TableSchema tableSchema) {
return tableSchema.getTableName().startsWith("root.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@ private void applyModificationForAlignedChunkMetadataList(
alignedChunkMetadataList, modificationForCurDevice);
}

public Map<TsFileResource, TsFileSequenceReader> getReaderMap() {
return readerMap;
}

@Override
public void close() throws IOException {
for (TsFileSequenceReader reader : readerMap.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.tsfile.write.chunk.IChunkWriter;
import org.apache.tsfile.write.chunk.ValueChunkWriter;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.Schema;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -132,6 +133,8 @@ public void startMeasurement(List<IMeasurementSchema> measurementSchemaList, int
*/
public abstract void checkAndMayFlushChunkMetadata() throws IOException;

protected abstract List<CompactionTsFileWriter> getAllTargetFileWriter();

protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter) {
if (chunkWriter instanceof ChunkWriterImpl) {
ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
Expand Down Expand Up @@ -316,4 +319,20 @@ protected void checkPreviousTimestamp(long currentWritingTimestamp, int subTaskI
lastTime[subTaskId]);
}
}

public void setSchemaForAllTargetFile(List<Schema> schemas) {
List<CompactionTsFileWriter> allTargetFileWriter = getAllTargetFileWriter();
for (int i = 0; i < allTargetFileWriter.size(); i++) {
Schema schema = schemas.get(i);
CompactionTsFileWriter writer = allTargetFileWriter.get(i);
writer.setSchema(schema);
}
}

public void removeUnusedTableSchema() {
List<CompactionTsFileWriter> allTargetFileWriter = getAllTargetFileWriter();
for (CompactionTsFileWriter writer : allTargetFileWriter) {
writer.removeUnusedTableSchema();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ protected AbstractCrossCompactionWriter(
this.targetResources = targetResources;
}

@Override
protected List<CompactionTsFileWriter> getAllTargetFileWriter() {
return targetFileWriters;
}

@Override
public void startChunkGroup(IDeviceID deviceId, boolean isAlign) throws IOException {
this.deviceId = deviceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.tsfile.read.common.block.TsBlock;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWriter {
protected CompactionTsFileWriter fileWriter;
Expand All @@ -56,6 +58,11 @@ protected AbstractInnerCompactionWriter(TsFileResource targetFileResource) throw
isEmptyFile = true;
}

@Override
protected List<CompactionTsFileWriter> getAllTargetFileWriter() {
return Collections.singletonList(fileWriter);
}

@Override
public void startChunkGroup(IDeviceID deviceId, boolean isAlign) throws IOException {
fileWriter.startChunkGroup(deviceId);
Expand Down
Loading

0 comments on commit 181c25f

Please sign in to comment.