From bd5b30bb50deb8e305d8140dbf2a7f0a94ae4595 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Fri, 18 Oct 2024 09:47:47 +0800 Subject: [PATCH] Pipe: Support syncing table model data between clusters (TsFileInsertionEventTableParser impl & following fixup) (#13806) Co-authored-by: Zhenyu Luo --- .../request/PipeTransferTabletRawReqV2.java | 7 +- ...abletInsertionEventTablePatternParser.java | 1 + .../tsfile/PipeTsFileInsertionEvent.java | 9 +- .../parser/TsFileInsertionEventParser.java | 14 +- .../TsFileInsertionEventParserProvider.java | 31 ++-- .../TsFileInsertionEventQueryParser.java | 14 +- .../scan/TsFileInsertionEventScanParser.java | 8 +- .../TsFileInsertionEventTableParser.java | 147 ++++++++++++++++++ ...sertionEventTableParserTabletIterator.java | 121 ++++++++++++++ ...peHistoricalDataRegionTsFileExtractor.java | 33 ++-- .../PipeRealtimeDataRegionExtractor.java | 4 +- .../assigner/PipeDataRegionAssigner.java | 2 +- .../thrift/IoTDBDataNodeReceiver.java | 62 +++++--- .../plan/relational/sql/ast/InsertRows.java | 19 ++- .../sql/ast/WrappedInsertStatement.java | 11 +- .../PipeDataNodeThriftRequestTest.java | 14 +- .../pipe/event/PipeInsertionEvent.java | 14 -- 17 files changed, 405 insertions(+), 106 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReqV2.java index 47ac23a89cd9..0700acab1992 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReqV2.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; +import java.util.stream.Collectors; import static org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent.isTabletEmpty; @@ -85,8 +86,12 @@ public InsertTabletStatement constructStatement() { } // Table model + request.setWriteToTable(true); + request.columnCategories = + tablet.getColumnTypes().stream() + .map(t -> (byte) t.ordinal()) + .collect(Collectors.toList()); final InsertTabletStatement statement = StatementGenerator.createStatement(request); - statement.setWriteToTable(true); statement.setDatabaseName(dataBaseName); return statement; } catch (final MetadataException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java index 3ea43e35e43d..091a7c7486e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java @@ -131,6 +131,7 @@ public Tablet convertToTablet() { newTablet.bitMaps = nullValueColumnBitmaps; newTablet.values = valueColumns; newTablet.rowSize = rowCount; + newTablet.setColumnTypes(Arrays.asList(valueColumnTypes)); tablet = newTablet; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 84ce58c916f8..c9252e02c04d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -424,11 +424,6 @@ public boolean mayEventPathsOverlappedWithPattern() { /////////////////////////// PipeInsertionEvent /////////////////////////// - /** - * Judge whether the TsFile is table model or tree model. If the TsFile is table model, the method - * will return true. If the TsFile is tree model, the method will return false. If the TsFile is - * mixed model, the method will return true and mark the event as table model event. - */ @Override public boolean isTableModelEvent() { if (getRawIsTableModelEvent() == null) { @@ -450,8 +445,8 @@ public boolean isTableModelEvent() { markAsTreeModelEvent(); } else { markAsTableModelEvent(); - break; } + break; } } catch (final Exception e) { throw new PipeException( @@ -502,7 +497,7 @@ private TsFileInsertionEventParser initEventParser() { if (eventParser == null) { eventParser = new TsFileInsertionEventParserProvider( - tsFile, treePattern, startTime, endTime, pipeTaskMeta, this) + tsFile, treePattern, tablePattern, startTime, endTime, pipeTaskMeta, this) .provide(); } return eventParser; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index 2ae5d5a2ba5b..3a7c5db38888 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -38,8 +39,11 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionEventParser.class); - protected final TreePattern pattern; // used to filter data + protected final TreePattern treePattern; // used to filter data + protected final TablePattern tablePattern; // used to filter data protected final GlobalTimeExpression timeFilterExpression; // used to filter data + protected final long startTime; // used to filter data + protected final long endTime; // used to filter data protected final PipeTaskMeta pipeTaskMeta; // used to report progress protected final PipeInsertionEvent sourceEvent; // used to report progress @@ -49,16 +53,20 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { protected TsFileSequenceReader tsFileSequenceReader; protected TsFileInsertionEventParser( - final TreePattern pattern, + final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent) { - this.pattern = pattern; + this.treePattern = treePattern; + this.tablePattern = tablePattern; timeFilterExpression = (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE) ? null : new GlobalTimeExpression(TimeFilterApi.between(startTime, endTime)); + this.startTime = startTime; + this.endTime = endTime; this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java index 1bc74c86738e..eaf2eceb2dcd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java @@ -22,10 +22,12 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.tsfile.file.metadata.IDeviceID; @@ -39,7 +41,8 @@ public class TsFileInsertionEventParserProvider { private final File tsFile; - private final TreePattern pattern; + private final TreePattern treePattern; + private final TablePattern tablePattern; private final long startTime; private final long endTime; @@ -49,12 +52,14 @@ public class TsFileInsertionEventParserProvider { public TsFileInsertionEventParserProvider( final File tsFile, final TreePattern treePattern, + final TablePattern tablePattern, final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, final PipeTsFileInsertionEvent sourceEvent) { this.tsFile = tsFile; - this.pattern = treePattern; + this.treePattern = treePattern; + this.tablePattern = tablePattern; this.startTime = startTime; this.endTime = endTime; this.pipeTaskMeta = pipeTaskMeta; @@ -62,10 +67,15 @@ public TsFileInsertionEventParserProvider( } public TsFileInsertionEventParser provide() throws IOException { + if (sourceEvent.isTableModelEvent()) { + return new TsFileInsertionEventTableParser( + tsFile, tablePattern, startTime, endTime, pipeTaskMeta, sourceEvent); + } + if (startTime != Long.MIN_VALUE || endTime != Long.MAX_VALUE - || pattern instanceof IoTDBTreePattern - && !((IoTDBTreePattern) pattern).mayMatchMultipleTimeSeriesInOneDevice()) { + || treePattern instanceof IoTDBTreePattern + && !((IoTDBTreePattern) treePattern).mayMatchMultipleTimeSeriesInOneDevice()) { // 1. If time filter exists, use query here because the scan container may filter it // row by row in single page chunk. // 2. If the pattern matches only one time series in one device, use query container here @@ -75,7 +85,7 @@ public TsFileInsertionEventParser provide() throws IOException { // hard to know whether it only matches one timeseries, while matching multiple is often the // case. return new TsFileInsertionEventQueryParser( - tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent); } final Map deviceIsAlignedMap = @@ -84,7 +94,7 @@ public TsFileInsertionEventParser provide() throws IOException { // If we failed to get from cache, it indicates that the memory usage is high. // We use scan data container because it requires less memory. return new TsFileInsertionEventScanParser( - tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent); } final int originalSize = deviceIsAlignedMap.size(); @@ -94,10 +104,10 @@ public TsFileInsertionEventParser provide() throws IOException { return (double) filteredDeviceIsAlignedMap.size() / originalSize > PipeConfig.getInstance().getPipeTsFileScanParsingThreshold() ? new TsFileInsertionEventScanParser( - tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent) + tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent) : new TsFileInsertionEventQueryParser( tsFile, - pattern, + treePattern, startTime, endTime, pipeTaskMeta, @@ -107,7 +117,7 @@ public TsFileInsertionEventParser provide() throws IOException { private Map filterDeviceIsAlignedMapByPattern( final Map deviceIsAlignedMap) { - if (Objects.isNull(pattern) || pattern.isRoot()) { + if (Objects.isNull(treePattern) || treePattern.isRoot()) { return deviceIsAlignedMap; } @@ -115,7 +125,8 @@ private Map filterDeviceIsAlignedMapByPattern( .filter( entry -> { final IDeviceID deviceId = entry.getKey(); - return pattern.coversDevice(deviceId) || pattern.mayOverlapWithDevice(deviceId); + return treePattern.coversDevice(deviceId) + || treePattern.mayOverlapWithDevice(deviceId); }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java index b98c6b33ecd1..6caa83cabdc3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java @@ -98,7 +98,7 @@ public TsFileInsertionEventQueryParser( final PipeInsertionEvent sourceEvent, final Map deviceIsAlignedMap) throws IOException { - super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); try { final PipeTsFileResourceManager tsFileResourceManager = PipeDataNodeResourceManager.tsfile(); @@ -165,7 +165,9 @@ private Map> filterDeviceMeasurementsMapByPattern( // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c // in this case, all data can be matched without checking the measurements - if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) { + if (Objects.isNull(treePattern) + || treePattern.isRoot() + || treePattern.coversDevice(deviceId)) { if (!entry.getValue().isEmpty()) { filteredDeviceMeasurementsMap.put(deviceId, entry.getValue()); } @@ -173,11 +175,11 @@ private Map> filterDeviceMeasurementsMapByPattern( // case 2: for example, pattern is root.a.b.c and device is root.a.b // in this case, we need to check the full path - else if (pattern.mayOverlapWithDevice(deviceId)) { + else if (treePattern.mayOverlapWithDevice(deviceId)) { final List filteredMeasurements = new ArrayList<>(); for (final String measurement : entry.getValue()) { - if (pattern.matchesMeasurement(deviceId, measurement)) { + if (treePattern.matchesMeasurement(deviceId, measurement)) { filteredMeasurements.add(measurement); } } @@ -203,13 +205,13 @@ private Map readDeviceIsAlignedMap() throws IOException { } private Set filterDevicesByPattern(final Set devices) { - if (Objects.isNull(pattern) || pattern.isRoot()) { + if (Objects.isNull(treePattern) || treePattern.isRoot()) { return devices; } final Set filteredDevices = new HashSet<>(); for (final IDeviceID device : devices) { - if (pattern.coversDevice(device) || pattern.mayOverlapWithDevice(device)) { + if (treePattern.coversDevice(device) || treePattern.mayOverlapWithDevice(device)) { filteredDevices.add(device); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index ca662d7d9acd..afe7067fa773 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -96,7 +96,7 @@ public TsFileInsertionEventScanParser( final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent) throws IOException { - super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); this.startTime = startTime; this.endTime = endTime; @@ -381,7 +381,7 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { break; } - if (!pattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { + if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { tsFileSequenceReader.position( tsFileSequenceReader.position() + chunkHeader.getDataSize()); break; @@ -408,7 +408,7 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { chunkHeader = tsFileSequenceReader.readChunkHeader(marker); if (Objects.isNull(currentDevice) - || !pattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { + || !treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { tsFileSequenceReader.position( tsFileSequenceReader.position() + chunkHeader.getDataSize()); break; @@ -456,7 +456,7 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { isMultiPageList.clear(); measurementIndexMap.clear(); final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID(); - currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID : null; + currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null; break; case MetaMarker.OPERATION_INDEX_RANGE: tsFileSequenceReader.readPlanIndex(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java new file mode 100644 index 000000000000..3447a17305fb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table; + +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; +import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.controller.CachedChunkLoaderImpl; +import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl; +import org.apache.tsfile.read.query.executor.TableQueryExecutor; +import org.apache.tsfile.write.record.Tablet; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; + +public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser { + + private final TableQueryExecutor tableQueryExecutor; + + private final Iterator> filteredTableSchemaIterator; + + public TsFileInsertionEventTableParser( + final File tsFile, + final TablePattern pattern, + final long startTime, + final long endTime, + final PipeTaskMeta pipeTaskMeta, + final PipeInsertionEvent sourceEvent) + throws IOException { + super(null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + + try { + tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true); + filteredTableSchemaIterator = + tsFileSequenceReader.readFileMetadata().getTableSchemaMap().entrySet().stream() + .filter(entry -> Objects.isNull(pattern) || pattern.matchesTable(entry.getKey())) + .iterator(); + tableQueryExecutor = + new TableQueryExecutor( + new MetadataQuerierByFileImpl(tsFileSequenceReader), + new CachedChunkLoaderImpl(tsFileSequenceReader), + TableQueryExecutor.TableQueryOrdering.DEVICE); + } catch (final Exception e) { + close(); + throw e; + } + } + + @Override + public Iterable toTabletInsertionEvents() { + return () -> + new Iterator() { + + private TsFileInsertionEventTableParserTabletIterator tabletIterator = null; + + @Override + public boolean hasNext() { + while (tabletIterator == null || !tabletIterator.hasNext()) { + if (!filteredTableSchemaIterator.hasNext()) { + close(); + return false; + } + + final Map.Entry entry = filteredTableSchemaIterator.next(); + + try { + tabletIterator = + new TsFileInsertionEventTableParserTabletIterator( + tableQueryExecutor, entry.getKey(), entry.getValue(), startTime, endTime); + } catch (final Exception e) { + close(); + throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e); + } + } + + return true; + } + + @Override + public TabletInsertionEvent next() { + if (!hasNext()) { + close(); + throw new NoSuchElementException(); + } + + final Tablet tablet = tabletIterator.next(); + + final TabletInsertionEvent next; + if (!hasNext()) { + next = + new PipeRawTabletInsertionEvent( + Boolean.TRUE, + sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null, + tablet, + true, + sourceEvent != null ? sourceEvent.getPipeName() : null, + sourceEvent != null ? sourceEvent.getCreationTime() : 0, + pipeTaskMeta, + sourceEvent, + true); + close(); + } else { + next = + new PipeRawTabletInsertionEvent( + Boolean.TRUE, + sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null, + tablet, + true, + sourceEvent != null ? sourceEvent.getPipeName() : null, + sourceEvent != null ? sourceEvent.getCreationTime() : 0, + pipeTaskMeta, + sourceEvent, + false); + } + return next; + } + }; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java new file mode 100644 index 000000000000..4f54ec26f033 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table; + +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.tsfile.exception.read.ReadProcessException; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.query.executor.TableQueryExecutor; +import org.apache.tsfile.read.reader.block.TsBlockReader; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; + +public class TsFileInsertionEventTableParserTabletIterator implements Iterator { + + private final String tableName; + + private final long startTime; + private final long endTime; + + private final List columnSchemas; + private final List columnNames; + private final TsBlockReader tsBlockReader; + + public TsFileInsertionEventTableParserTabletIterator( + final TableQueryExecutor tableQueryExecutor, + final String tableName, + final TableSchema tableSchema, + final long startTime, + final long endTime) { + this.tableName = tableName; + this.startTime = startTime; + this.endTime = endTime; + + try { + columnSchemas = + tableSchema.getColumnSchemas().stream() + // time column in aligned time-series should not be a query column + .filter( + schema -> + schema.getMeasurementId() != null && !schema.getMeasurementId().isEmpty()) + .collect(Collectors.toList()); + columnNames = + columnSchemas.stream() + .map(IMeasurementSchema::getMeasurementId) + .collect(Collectors.toList()); + tsBlockReader = tableQueryExecutor.query(tableName, columnNames, null, null, null); + } catch (final ReadProcessException e) { + throw new PipeException("Failed to build query data set", e); + } + } + + @Override + public boolean hasNext() { + return tsBlockReader.hasNext(); + } + + @Override + public Tablet next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + try { + return buildNextTablet(); + } catch (final IOException e) { + throw new PipeException("Failed to build tablet", e); + } + } + + // TODO: memory control + private Tablet buildNextTablet() throws IOException { + final TsBlock tsBlock = tsBlockReader.next(); + + final Tablet tablet = new Tablet(tableName, columnSchemas, tsBlock.getPositionCount()); + tablet.initBitMaps(); + + final TsBlock.TsBlockRowIterator rowIterator = tsBlock.getTsBlockRowIterator(); + while (rowIterator.hasNext()) { + final Object[] row = rowIterator.next(); + + final long timestamp = (Long) row[row.length - 1]; + if (timestamp < startTime || timestamp > endTime) { + continue; + } + + final int rowIndex = tablet.rowSize; + tablet.addTimestamp(rowIndex, timestamp); + for (int i = 0, fieldSize = row.length - 1; i < fieldSize; i++) { + tablet.addValue(columnNames.get(i), rowIndex, row[i]); + } + tablet.rowSize++; + } + + return tablet; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index d7c68f6e35f8..3acc589ca313 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -287,7 +287,9 @@ public void customize( final String databaseName = dataRegion.getDatabaseName(); if (Objects.nonNull(databaseName)) { isDbNameCoveredByPattern = - treePattern.coversDb(databaseName) && tablePattern.coversDb(databaseName); + treePattern.coversDb(databaseName) + // The database name is prefixed with "root." + && tablePattern.coversDb(databaseName.substring(5)); } } @@ -561,35 +563,18 @@ private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource reso // In case of tree model deviceID if (treePattern.isTreeModelDataAllowedToBeCaptured() && treePattern.mayOverlapWithDevice(deviceID)) { - tsfile2IsTableModelMap.compute( - resource, - (tsFileResource, isTableModel) -> { - if (Objects.isNull(isTableModel) || !isTableModel) { - return Boolean.FALSE; - } - throw new IllegalStateException( - String.format( - "Pipe %s@%s: TsFile %s contains both table model and tree model data", - pipeName, dataRegionId, resource.getTsFilePath())); - }); + tsfile2IsTableModelMap.computeIfAbsent( + resource, (tsFileResource) -> Boolean.FALSE); return true; } } else { // In case of table model deviceID if (tablePattern.isTableModelDataAllowedToBeCaptured() - && tablePattern.matchesDatabase(resource.getDatabaseName()) + // The database name in resource is prefixed with "root." + && tablePattern.matchesDatabase(resource.getDatabaseName().substring(5)) && tablePattern.matchesTable(deviceID.getTableName())) { - tsfile2IsTableModelMap.compute( - resource, - (tsFileResource, isTableModel) -> { - if (Objects.isNull(isTableModel) || isTableModel) { - return Boolean.TRUE; - } - throw new IllegalStateException( - String.format( - "Pipe %s@%s: TsFile %s contains both table model and tree model data", - pipeName, dataRegionId, resource.getTsFilePath())); - }); + tsfile2IsTableModelMap.computeIfAbsent( + resource, (tsFileResource) -> Boolean.TRUE); return true; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index c4872b3ac956..d992d87b3d78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -218,7 +218,9 @@ public void customize( final String databaseName = dataRegion.getDatabaseName(); if (databaseName != null) { isDbNameCoveredByPattern = - treePattern.coversDb(databaseName) && tablePattern.coversDb(databaseName); + treePattern.coversDb(databaseName) + // The database name is prefixed with "root." + && tablePattern.coversDb(databaseName.substring(5)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 2a8eeda9e0a2..9cfdc249df11 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -165,7 +165,7 @@ private void bindOrUpdateProgressIndexForTsFileInsertionEvent( .isProgressIndexAfterOrEquals( dataRegionId, event.getTimePartitionId(), event.getProgressIndex())) { event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get()); - LOGGER.warn( + LOGGER.info( "Data region {} bind {} to event {} because it was flushed prematurely.", dataRegionId, maxProgressIndexForTsFileInsertionEvent, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 1772124f728d..5a974d1e35f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -122,6 +122,7 @@ import java.util.stream.Stream; import static org.apache.iotdb.db.exception.metadata.DatabaseNotSetException.DATABASE_NOT_SET; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause; import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR_CHAR; @@ -143,7 +144,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { new PipeStatementPatternParseVisitor(); private final PipeStatementDataTypeConvertExecutionVisitor statementDataTypeConvertExecutionVisitor = - new PipeStatementDataTypeConvertExecutionVisitor(this::executeStatement); + new PipeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel); private final PipeStatementToBatchVisitor batchVisitor = new PipeStatementToBatchVisitor(); // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster @@ -702,31 +703,43 @@ private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement sta TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null statement."); } - final TSStatus status = executeStatement(statement); - return shouldConvertDataTypeOnTypeMismatch - && ((statement instanceof InsertBaseStatement - && ((InsertBaseStatement) statement).hasFailedMeasurements()) - || status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) - ? statement.accept(statementDataTypeConvertExecutionVisitor, status).orElse(status) - : status; - } - - private TSStatus executeStatement(Statement statement) { + // Judge which model the statement belongs to + final boolean isTableModelStatement; + final String dataBaseName; if (statement instanceof LoadTsFileStatement && ((LoadTsFileStatement) statement) .getModel() .equals(LoadTsFileConfigurator.MODEL_TABLE_VALUE)) { - return executeStatementForTableModel( - statement, ((LoadTsFileStatement) statement).getDatabase()); - } - - if (statement instanceof InsertBaseStatement + isTableModelStatement = true; + dataBaseName = ((LoadTsFileStatement) statement).getDatabase(); + } else if (statement instanceof InsertBaseStatement && ((InsertBaseStatement) statement).isWriteToTable()) { - return executeStatementForTableModel( - statement, ((InsertBaseStatement) statement).getDatabaseName().get()); + isTableModelStatement = true; + dataBaseName = + ((InsertBaseStatement) statement).getDatabaseName().isPresent() + ? ((InsertBaseStatement) statement).getDatabaseName().get() + : null; + } else { + isTableModelStatement = false; + dataBaseName = null; } - return executeStatementForTreeModel(statement); + final TSStatus status = + isTableModelStatement + ? executeStatementForTableModel(statement, dataBaseName) + : executeStatementForTreeModel(statement); + // Data type conversion is not supported for table model statements + if (isTableModelStatement) { + return status; + } + // Try to convert data type if the statement is a tree model statement + // and the status code is not success + return shouldConvertDataTypeOnTypeMismatch + && ((statement instanceof InsertBaseStatement + && ((InsertBaseStatement) statement).hasFailedMeasurements()) + || status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) + ? statement.accept(statementDataTypeConvertExecutionVisitor, status).orElse(status) + : status; } private TSStatus executeStatementForTableModel(Statement statement, String dataBaseName) { @@ -747,9 +760,14 @@ private TSStatus executeStatementForTableModel(Statement statement, String dataB IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) .status; } catch (final Exception e) { - if (e.getMessage() != null - && e.getMessage().contains(DATABASE_NOT_SET.toLowerCase(Locale.ROOT))) { - ALREADY_CREATED_DATABASES.remove(dataBaseName); + ALREADY_CREATED_DATABASES.remove(dataBaseName); + + final Throwable rootCause = getRootCause(e); + if (rootCause.getMessage() != null + && rootCause + .getMessage() + .toLowerCase(Locale.ENGLISH) + .contains(DATABASE_NOT_SET.toLowerCase(Locale.ENGLISH))) { autoCreateDatabaseIfNecessary(dataBaseName); // Retry after creating the database diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java index e47912e6bc1f..8650c0fc18ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java @@ -84,14 +84,17 @@ public void validateTableSchema(Metadata metadata, MPPQueryContext context) { for (InsertRowStatement insertRowStatement : getInnerTreeStatement().getInsertRowStatementList()) { final TableSchema incomingTableSchema = toTableSchema(insertRowStatement); - final TableSchema realSchema = - metadata - .validateTableHeaderSchema( - AnalyzeUtils.getDatabaseName(insertRowStatement, context), - incomingTableSchema, - context, - false) - .orElse(null); + final TableSchema realSchema; + synchronized (metadata) { + realSchema = + metadata + .validateTableHeaderSchema( + AnalyzeUtils.getDatabaseName(insertRowStatement, context), + incomingTableSchema, + context, + false) + .orElse(null); + } if (realSchema == null) { throw new SemanticException( "Schema validation failed, table cannot be created: " + incomingTableSchema); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java index b951c02cc49c..b50cae15f847 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java @@ -101,10 +101,13 @@ protected TableSchema toTableSchema(InsertBaseStatement insertBaseStatement) { public void validateTableSchema(Metadata metadata, MPPQueryContext context) { String databaseName = getDatabase(); final TableSchema incomingSchema = getTableSchema(); - final TableSchema realSchema = - metadata - .validateTableHeaderSchema(databaseName, incomingSchema, context, true) - .orElse(null); + final TableSchema realSchema; + synchronized (metadata) { + realSchema = + metadata + .validateTableHeaderSchema(databaseName, incomingSchema, context, true) + .orElse(null); + } if (realSchema == null) { throw new SemanticException( "Schema validation failed, table cannot be created: " + incomingSchema); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java index 5c3a70e149c6..47065ef8eaa9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java @@ -266,7 +266,18 @@ public void testPipeTransferTabletReqV2() { schemaList.add(new MeasurementSchema("s8", TSDataType.DATE)); schemaList.add(new MeasurementSchema("s9", TSDataType.BLOB)); schemaList.add(new MeasurementSchema("s10", TSDataType.STRING)); - final Tablet t = new Tablet("root.sg.d", schemaList, 1024); + List columnTypes = new ArrayList<>(); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + final Tablet t = new Tablet("root.sg.d", schemaList, columnTypes, 1024); t.rowSize = 2; t.addTimestamp(0, 2000); t.addTimestamp(1, 1000); @@ -307,6 +318,7 @@ public void testPipeTransferTabletReqV2() { Assert.assertTrue(statement.isWriteToTable()); Assert.assertTrue(statement.getDatabaseName().isPresent()); Assert.assertEquals(statement.getDatabaseName().get(), "test"); + Assert.assertEquals(t.getColumnTypes(), deserializeReq.getTablet().getColumnTypes()); } catch (final IOException e) { Assert.fail(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeInsertionEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeInsertionEvent.java index 58c9a5c0daa8..29e0fae356da 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeInsertionEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeInsertionEvent.java @@ -25,11 +25,6 @@ public abstract class PipeInsertionEvent extends EnrichedEvent { - /** - * If the event is marked as a table model event, it will be treated as a table model event in the - * following process. If the event is marked as a tree model event, and then mark as a table model - * event, it will be treated as a table model event. - */ private Boolean isTableModelEvent; // lazy initialization private final String treeModelDatabaseName; @@ -79,16 +74,7 @@ public void markAsTableModelEvent() { isTableModelEvent = Boolean.TRUE; } - /** - * If the event is marked as a table model event, it will be treated as a table model event in the - * following process. If the event is marked as a tree model event, and then mark as a table model - * event, it will be treated as a table model event. - */ public void markAsTreeModelEvent() { - if (isTableModelEvent != null && isTableModelEvent) { - return; - } - isTableModelEvent = Boolean.FALSE; }