diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 09aa2f9854cc..c9252e02c04d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -497,7 +497,7 @@ private TsFileInsertionEventParser initEventParser() { if (eventParser == null) { eventParser = new TsFileInsertionEventParserProvider( - tsFile, treePattern, startTime, endTime, pipeTaskMeta, this) + tsFile, treePattern, tablePattern, startTime, endTime, pipeTaskMeta, this) .provide(); } return eventParser; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index d9f8ac3bb283..3a7c5db38888 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -42,6 +42,8 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { protected final TreePattern treePattern; // used to filter data protected final TablePattern tablePattern; // used to filter data protected final GlobalTimeExpression timeFilterExpression; // used to filter data + protected final long startTime; // used to filter data + protected final long endTime; // used to filter data protected final PipeTaskMeta pipeTaskMeta; // used to report progress protected final PipeInsertionEvent sourceEvent; // used to report progress @@ -63,6 +65,8 @@ protected TsFileInsertionEventParser( (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE) ? null : new GlobalTimeExpression(TimeFilterApi.between(startTime, endTime)); + this.startTime = startTime; + this.endTime = endTime; this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index 8b561a6c6b26..fbc1e7fa3ba0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; -import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParserTabletIterator; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; @@ -33,11 +32,13 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.TsFileDeviceIterator; -import org.apache.tsfile.read.TsFileReader; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.controller.CachedChunkLoaderImpl; +import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl; +import org.apache.tsfile.read.query.executor.TableQueryExecutor; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; @@ -61,11 +62,12 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser LoggerFactory.getLogger(TsFileInsertionEventTableParser.class); private final PipeMemoryBlock allocatedMemoryBlock; - private final TsFileReader tsFileReader; + private final TableQueryExecutor tableQueryExecutor; + + private final Map tableSchemaMap; private final Iterator>> deviceMeasurementsMapIterator; private final Map deviceIsAlignedMap; - private final Map measurementDataTypeMap; public TsFileInsertionEventTableParser( final File tsFile, @@ -78,7 +80,7 @@ public TsFileInsertionEventTableParser( this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null); } - public TsFileInsertionEventTableParser( + private TsFileInsertionEventTableParser( final File tsFile, final TablePattern pattern, final long startTime, @@ -97,7 +99,14 @@ public TsFileInsertionEventTableParser( long memoryRequiredInBytes = PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes(); tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true); - tsFileReader = new TsFileReader(tsFileSequenceReader); + + tableSchemaMap = tsFileSequenceReader.readFileMetadata().getTableSchemaMap(); + + tableQueryExecutor = + new TableQueryExecutor( + new MetadataQuerierByFileImpl(tsFileSequenceReader), + new CachedChunkLoaderImpl(tsFileSequenceReader), + TableQueryExecutor.TableQueryOrdering.DEVICE); if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) { // These read-only objects can be found in cache. @@ -105,7 +114,6 @@ public TsFileInsertionEventTableParser( Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap : tsFileResourceManager.getDeviceIsAlignedMapFromCache(tsFile, true); - measurementDataTypeMap = tsFileResourceManager.getMeasurementDataTypeMapFromCache(tsFile); deviceMeasurementsMap = tsFileResourceManager.getDeviceMeasurementsMapFromCache(tsFile); } else { // We need to create these objects here and remove them later. @@ -123,10 +131,6 @@ public TsFileInsertionEventTableParser( devices = deviceIsAlignedMap.keySet(); } - measurementDataTypeMap = readFilteredFullPathDataTypeMap(devices); - memoryRequiredInBytes += - PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap); - deviceMeasurementsMap = readFilteredDeviceMeasurementsMap(devices); memoryRequiredInBytes += PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap); @@ -146,42 +150,6 @@ public TsFileInsertionEventTableParser( } } - private Map> filterDeviceMeasurementsMapByPattern( - final Map> originalDeviceMeasurementsMap) { - final Map> filteredDeviceMeasurementsMap = new HashMap<>(); - for (Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { - final IDeviceID deviceId = entry.getKey(); - - // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c - // in this case, all data can be matched without checking the measurements - if (Objects.isNull(treePattern) - || treePattern.isRoot() - || treePattern.coversDevice(deviceId)) { - if (!entry.getValue().isEmpty()) { - filteredDeviceMeasurementsMap.put(deviceId, entry.getValue()); - } - } - - // case 2: for example, pattern is root.a.b.c and device is root.a.b - // in this case, we need to check the full path - else if (treePattern.mayOverlapWithDevice(deviceId)) { - final List filteredMeasurements = new ArrayList<>(); - - for (final String measurement : entry.getValue()) { - if (treePattern.matchesMeasurement(deviceId, measurement)) { - filteredMeasurements.add(measurement); - } - } - - if (!filteredMeasurements.isEmpty()) { - filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements); - } - } - } - - return filteredDeviceMeasurementsMap; - } - private Map readDeviceIsAlignedMap() throws IOException { final Map deviceIsAlignedResultMap = new HashMap<>(); final TsFileDeviceIterator deviceIsAlignedIterator = @@ -194,45 +162,20 @@ private Map readDeviceIsAlignedMap() throws IOException { } private Set filterDevicesByPattern(final Set devices) { - if (Objects.isNull(treePattern) || treePattern.isRoot()) { + if (Objects.isNull(tablePattern) + || !tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern()) { return devices; } final Set filteredDevices = new HashSet<>(); for (final IDeviceID device : devices) { - if (treePattern.coversDevice(device) || treePattern.mayOverlapWithDevice(device)) { + if (tablePattern.matchesTable(device.getTableName())) { filteredDevices.add(device); } } return filteredDevices; } - /** - * This method is similar to {@link TsFileSequenceReader#getFullPathDataTypeMap()}, but only reads - * the given devices. - */ - private Map readFilteredFullPathDataTypeMap(final Set devices) - throws IOException { - final Map result = new HashMap<>(); - - for (final IDeviceID device : devices) { - tsFileSequenceReader - .readDeviceMetadata(device) - .values() - .forEach( - timeseriesMetadata -> - result.put( - device.toString() + "." + timeseriesMetadata.getMeasurementId(), - timeseriesMetadata.getTsDataType())); - } - - return result; - } - - /** - * This method is similar to {@link TsFileSequenceReader#getDeviceMeasurementsMap()}, but only - * reads the given devices. - */ private Map> readFilteredDeviceMeasurementsMap( final Set devices) throws IOException { final Map> result = new HashMap<>(); @@ -251,12 +194,24 @@ private Map> readFilteredDeviceMeasurementsMap( return result; } + private Map> filterDeviceMeasurementsMapByPattern( + final Map> originalDeviceMeasurementsMap) { + final Map> filteredDeviceMeasurementsMap = new HashMap<>(); + for (Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { + final IDeviceID deviceId = entry.getKey(); + if (Objects.isNull(tablePattern) || tablePattern.matchesTable(deviceId.getTableName())) { + filteredDeviceMeasurementsMap.put(deviceId, entry.getValue()); + } + } + return filteredDeviceMeasurementsMap; + } + @Override public Iterable toTabletInsertionEvents() { return () -> new Iterator() { - private TsFileInsertionEventQueryParserTabletIterator tabletIterator = null; + private TsFileInsertionEventTableParserTabletIterator tabletIterator = null; @Override public boolean hasNext() { @@ -269,7 +224,16 @@ public boolean hasNext() { final Map.Entry> entry = deviceMeasurementsMapIterator.next(); try { - tabletIterator = null; + tabletIterator = + new TsFileInsertionEventTableParserTabletIterator( + tableQueryExecutor, + entry.getKey(), + entry.getValue(), + tableSchemaMap, + timeFilterExpression, + startTime, + endTime, + allocatedMemoryBlockForTablet); } catch (final Exception e) { close(); throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e); @@ -287,18 +251,15 @@ public TabletInsertionEvent next() { } final Tablet tablet = tabletIterator.next(); - final boolean isAligned = - deviceIsAlignedMap.getOrDefault( - IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false); final TabletInsertionEvent next; if (!hasNext()) { next = new PipeRawTabletInsertionEvent( - sourceEvent != null ? sourceEvent.isTableModelEvent() : null, + Boolean.TRUE, sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null, tablet, - isAligned, + true, sourceEvent != null ? sourceEvent.getPipeName() : null, sourceEvent != null ? sourceEvent.getCreationTime() : 0, pipeTaskMeta, @@ -308,10 +269,10 @@ public TabletInsertionEvent next() { } else { next = new PipeRawTabletInsertionEvent( - sourceEvent != null ? sourceEvent.isTableModelEvent() : null, + Boolean.TRUE, sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null, tablet, - isAligned, + true, sourceEvent != null ? sourceEvent.getPipeName() : null, sourceEvent != null ? sourceEvent.getCreationTime() : 0, pipeTaskMeta, @@ -325,14 +286,6 @@ public TabletInsertionEvent next() { @Override public void close() { - try { - if (tsFileReader != null) { - tsFileReader.close(); - } - } catch (final IOException e) { - LOGGER.warn("Failed to close TsFileReader", e); - } - super.close(); if (allocatedMemoryBlock != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java new file mode 100644 index 000000000000..0f859ec72e85 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table; + +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.read.ReadProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.expression.IExpression; +import org.apache.tsfile.read.query.executor.TableQueryExecutor; +import org.apache.tsfile.read.reader.block.TsBlockReader; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.stream.Collectors; + +public class TsFileInsertionEventTableParserTabletIterator implements Iterator { + + private final TableQueryExecutor tableQueryExecutor; + + private final IDeviceID deviceId; + private final List measurements; + private final Map tableSchemaMap; + + private final IExpression timeFilterExpression; + private final long startTime; + private final long endTime; + + private final TsBlockReader tsBlockReader; + + // TODO: memory control + private final PipeMemoryBlock allocatedBlockForTablet; + + TsFileInsertionEventTableParserTabletIterator( + final TableQueryExecutor tableQueryExecutor, + final IDeviceID deviceId, + final List measurements, + final Map tableSchemaMap, + final IExpression timeFilterExpression, + final long startTime, + final long endTime, + final PipeMemoryBlock allocatedBlockForTablet) + throws ReadProcessException { + this.tableQueryExecutor = tableQueryExecutor; + + this.deviceId = deviceId; + this.measurements = + measurements.stream() + .filter( + measurement -> + // time column in aligned time-series should not be a query column + measurement != null && !measurement.isEmpty()) + .sorted() + .collect(Collectors.toList()); + this.tableSchemaMap = tableSchemaMap; + + this.timeFilterExpression = timeFilterExpression; + this.startTime = startTime; + this.endTime = endTime; + + this.tsBlockReader = buildQueryDataSet(); + + this.allocatedBlockForTablet = Objects.requireNonNull(allocatedBlockForTablet); + } + + private TsBlockReader buildQueryDataSet() throws ReadProcessException { + return tableQueryExecutor.query( + deviceId.getTableName(), + tableSchemaMap.get(deviceId.getTableName()).getColumnSchemas().stream() + .map(IMeasurementSchema::getMeasurementId) + .filter( + measurement -> + // time column in aligned time-series should not be a query column + measurement != null && !measurement.isEmpty()) + .sorted() + .collect(Collectors.toList()), + // TODO: time filter + null, + null, + null); + } + + @Override + public boolean hasNext() { + return tsBlockReader.hasNext(); + } + + @Override + public Tablet next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + try { + return buildNextTablet(); + } catch (final IOException e) { + throw new PipeException("Failed to build tablet", e); + } + } + + // TODO: memory control + private Tablet buildNextTablet() throws IOException { + final TsBlock tsBlock = tsBlockReader.next(); + + final List schemas = new ArrayList<>(); + for (int i = 0, size = measurements.size(); i < size; i++) { + schemas.add( + new MeasurementSchema(measurements.get(i), tsBlock.getAllColumns()[i].getDataType())); + } + + final Tablet tablet = + new Tablet( + deviceId.getTableName(), + tableSchemaMap.get(deviceId.getTableName()).getColumnSchemas().stream() + .filter(schema -> measurements.contains(schema.getMeasurementId())) + .collect(Collectors.toList()), + tsBlock.getPositionCount()); + tablet.initBitMaps(); + + final TsBlock.TsBlockRowIterator rowIterator = tsBlock.getTsBlockRowIterator(); + while (rowIterator.hasNext()) { + final Object[] row = rowIterator.next(); + + final long timestamp = (Long) row[row.length - 1]; + if (timestamp < startTime || timestamp > endTime) { + continue; + } + + final List fields = new ArrayList<>(); + for (int i = 0; i < row.length - 1; ++i) { + final TSDataType dataType = schemas.get(i).getType(); + if (dataType == null) { + fields.add(null); + continue; + } + if (row[i] == null) { + fields.add(null); + continue; + } + final Field field = new Field(dataType); + fields.add(field); + switch (dataType) { + case BOOLEAN: + field.setBoolV((Boolean) row[i]); + break; + case INT32: + case DATE: + field.setIntV((Integer) row[i]); + break; + case INT64: + case TIMESTAMP: + field.setLongV((Long) row[i]); + break; + case FLOAT: + field.setFloatV((Float) row[i]); + break; + case DOUBLE: + field.setDoubleV((Double) row[i]); + break; + case STRING: + case BLOB: + case TEXT: + field.setBinaryV((Binary) row[i]); + break; + default: + throw new UnsupportedOperationException("Unsupported data type: " + dataType); + } + } + final RowRecord rowRecord = new RowRecord(timestamp, fields); + + final int rowIndex = tablet.rowSize; + tablet.addTimestamp(rowIndex, rowRecord.getTimestamp()); + final int fieldSize = fields.size(); + for (int i = 0; i < fieldSize; i++) { + final Field field = fields.get(i); + tablet.addValue( + measurements.get(i), + rowIndex, + field == null ? null : field.getObjectValue(schemas.get(i).getType())); + } + tablet.rowSize++; + if (tablet.rowSize == tablet.getMaxRowNumber()) { + break; + } + } + + return tablet; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index cc6397bbe968..291147a94ddc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -122,6 +122,7 @@ import java.util.stream.Stream; import static org.apache.iotdb.db.exception.metadata.DatabaseNotSetException.DATABASE_NOT_SET; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause; import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR_CHAR; @@ -759,9 +760,11 @@ private TSStatus executeStatementForTableModel(Statement statement, String dataB IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) .status; } catch (final Exception e) { - if (e.getMessage() != null - && e.getMessage().contains(DATABASE_NOT_SET.toLowerCase(Locale.ROOT))) { - ALREADY_CREATED_DATABASES.remove(dataBaseName); + ALREADY_CREATED_DATABASES.remove(dataBaseName); + + final Throwable rootCause = getRootCause(e); + if (rootCause.getMessage() != null + && rootCause.getMessage().contains(DATABASE_NOT_SET.toLowerCase(Locale.ROOT))) { autoCreateDatabaseIfNecessary(dataBaseName); // Retry after creating the database