diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 11711fa1d034..c4fd1a133c2a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -322,10 +322,8 @@ public boolean shouldParsePattern() { ? ((InsertRowsNode) node) .getInsertRowNodeList().stream() .anyMatch( - insertRowNode -> - !pipePattern.coversDevice( - insertRowNode.getDevicePath().getFullPath())) - : !pipePattern.coversDevice(node.getDevicePath().getFullPath()))); + insertRowNode -> !pipePattern.coversDevice(insertRowNode.getDeviceID())) + : !pipePattern.coversDevice(node.getDeviceID()))); } public List toRawTabletInsertionEvents() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index ae17883a487b..c7a563bcb056 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -33,6 +33,8 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.UnSupportedDataTypeException; @@ -62,7 +64,9 @@ public class TabletInsertionDataContainer { private final EnrichedEvent sourceEvent; // used to report progress and filter value columns by time range - private String deviceId; + // Used to preserve performance + private String deviceStr; + private IDeviceID deviceId; private boolean isAligned; private IMeasurementSchema[] measurementSchemaList; private String[] columnNameStringList; @@ -138,7 +142,9 @@ private void parse(final InsertRowNode insertRowNode, final PipePattern pattern) final int originColumnSize = insertRowNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; - this.deviceId = insertRowNode.getDevicePath().getFullPath(); + // The full path is always cached when device path is deserialized + this.deviceStr = insertRowNode.getDevicePath().getFullPath(); + this.deviceId = insertRowNode.getDeviceID(); this.isAligned = insertRowNode.isAligned(); final long[] originTimestampColumn = new long[] {insertRowNode.getTime()}; @@ -205,7 +211,9 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa final int originColumnSize = insertTabletNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; - this.deviceId = insertTabletNode.getDevicePath().getFullPath(); + // The full path is always cached when device path is deserialized + this.deviceStr = insertTabletNode.getDevicePath().getFullPath(); + this.deviceId = insertTabletNode.getDeviceID(); this.isAligned = insertTabletNode.isAligned(); final long[] originTimestampColumn = insertTabletNode.getTimes(); @@ -288,7 +296,9 @@ private void parse(final Tablet tablet, final boolean isAligned, final PipePatte final int originColumnSize = tablet.getSchemas().size(); final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; - this.deviceId = tablet.getDeviceId(); + // Only support tree-model tablet + this.deviceStr = tablet.getDeviceId(); + this.deviceId = new StringArrayDeviceID(tablet.getDeviceId()); this.isAligned = isAligned; final long[] originTimestampColumn = @@ -567,9 +577,10 @@ public List processRowByRow(BiConsumer final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, sourceEvent); for (int i = 0; i < rowCount; i++) { consumer.accept( + // Used for tree model new PipeRow( i, - deviceId, + getDeviceStr(), isAligned, measurementSchemaList, timestampColumn, @@ -595,7 +606,8 @@ public Tablet convertToTablet() { return tablet; } - final Tablet newTablet = new Tablet(deviceId, Arrays.asList(measurementSchemaList), rowCount); + final Tablet newTablet = + new Tablet(getDeviceStr(), Arrays.asList(measurementSchemaList), rowCount); newTablet.timestamps = timestampColumn; newTablet.bitMaps = nullValueColumnBitmaps; newTablet.values = valueColumns; @@ -605,4 +617,8 @@ public Tablet convertToTablet() { return tablet; } + + private String getDeviceStr() { + return Objects.nonNull(deviceStr) ? deviceStr : deviceId.toString(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index 1eccb7c03be9..ee8c4e62bb81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -146,11 +146,10 @@ private Map> filterDeviceMeasurementsMapByPattern( final Map> filteredDeviceMeasurementsMap = new HashMap<>(); for (Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { final IDeviceID deviceId = entry.getKey(); - String deviceStr = deviceId.toString(); // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c // in this case, all data can be matched without checking the measurements - if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceStr)) { + if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) { if (!entry.getValue().isEmpty()) { filteredDeviceMeasurementsMap.put(deviceId, entry.getValue()); } @@ -158,11 +157,11 @@ private Map> filterDeviceMeasurementsMapByPattern( // case 2: for example, pattern is root.a.b.c and device is root.a.b // in this case, we need to check the full path - else if (pattern.mayOverlapWithDevice(deviceStr)) { + else if (pattern.mayOverlapWithDevice(deviceId)) { final List filteredMeasurements = new ArrayList<>(); for (final String measurement : entry.getValue()) { - if (pattern.matchesMeasurement(deviceStr, measurement)) { + if (pattern.matchesMeasurement(deviceId, measurement)) { filteredMeasurements.add(measurement); } else { // Parse pattern iff there are measurements filtered out @@ -221,7 +220,7 @@ public boolean hasNext() { new TsFileInsertionDataTabletIterator( tsFileReader, measurementDataTypeMap, - entry.getKey().toString(), + entry.getKey(), entry.getValue(), timeFilterExpression); } catch (final IOException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java index f64b1fb353a4..05b34ad026f1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java @@ -24,6 +24,7 @@ import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TsFileReader; import org.apache.tsfile.read.common.Field; import org.apache.tsfile.read.common.Path; @@ -48,7 +49,7 @@ public class TsFileInsertionDataTabletIterator implements Iterator { private final TsFileReader tsFileReader; private final Map measurementDataTypeMap; - private final String deviceId; + private final IDeviceID deviceId; private final List measurements; private final IExpression timeFilterExpression; @@ -58,7 +59,7 @@ public class TsFileInsertionDataTabletIterator implements Iterator { public TsFileInsertionDataTabletIterator( TsFileReader tsFileReader, Map measurementDataTypeMap, - String deviceId, + IDeviceID deviceId, List measurements, IExpression timeFilterExpression) throws IOException { @@ -118,7 +119,11 @@ private Tablet buildNextTablet() throws IOException { schemas.add(new MeasurementSchema(measurement, dataType)); } final Tablet tablet = - new Tablet(deviceId, schemas, PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); + new Tablet( + // Used for tree model + deviceId.toString(), + schemas, + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); tablet.initBitMaps(); while (queryDataSet.hasNext()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java index f76c220dd2b0..4f61e82824c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern; import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.PlainDeviceID; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.read.TsFileSequenceReader; import org.slf4j.Logger; @@ -83,14 +82,13 @@ private Map> filterDeviceMeasurementsMapByPattern() throw for (final Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { - final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID(); + final IDeviceID deviceId = entry.getKey(); // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c // in this case, all data can be matched without checking the measurements if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) { if (!entry.getValue().isEmpty()) { - filteredDeviceMeasurementsMap.put( - new PlainDeviceID(deviceId), new HashSet<>(entry.getValue())); + filteredDeviceMeasurementsMap.put(deviceId, new HashSet<>(entry.getValue())); } } @@ -109,7 +107,7 @@ else if (pattern.mayOverlapWithDevice(deviceId)) { } if (!filteredMeasurements.isEmpty()) { - filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId), filteredMeasurements); + filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java index 77df1f0292be..0717dc4d5c3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java @@ -25,6 +25,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.util.Map; /** @@ -37,12 +39,12 @@ public class PipeRealtimeEvent extends EnrichedEvent { private final EnrichedEvent event; private final TsFileEpoch tsFileEpoch; - private Map device2Measurements; + private Map device2Measurements; public PipeRealtimeEvent( final EnrichedEvent event, final TsFileEpoch tsFileEpoch, - final Map device2Measurements, + final Map device2Measurements, final PipePattern pattern) { this(event, tsFileEpoch, device2Measurements, null, pattern, Long.MIN_VALUE, Long.MAX_VALUE); } @@ -50,7 +52,7 @@ public PipeRealtimeEvent( public PipeRealtimeEvent( final EnrichedEvent event, final TsFileEpoch tsFileEpoch, - final Map device2Measurements, + final Map device2Measurements, final PipeTaskMeta pipeTaskMeta, final PipePattern pattern, final long startTime, @@ -73,7 +75,7 @@ public TsFileEpoch getTsFileEpoch() { return tsFileEpoch; } - public Map getSchemaInfo() { + public Map getSchemaInfo() { return device2Measurements; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java index e0bfabb0c632..e4244a1335e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java @@ -26,6 +26,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import com.google.common.base.Functions; +import org.apache.tsfile.file.metadata.IDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +70,7 @@ public PipeRealtimeEvent bindPipeTsFileInsertionEvent( event, epoch, resource.getDevices().stream() - .collect(Collectors.toMap(Object::toString, device -> EMPTY_MEASUREMENT_ARRAY)), + .collect(Collectors.toMap(Functions.identity(), device -> EMPTY_MEASUREMENT_ARRAY)), event.getPipePattern()); } @@ -82,16 +84,16 @@ public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent( epoch, node instanceof InsertRowsNode ? getDevice2MeasurementsMapFromInsertRowsNode((InsertRowsNode) node) - : Collections.singletonMap(node.getDevicePath().getFullPath(), node.getMeasurements()), + : Collections.singletonMap(node.getDeviceID(), node.getMeasurements()), event.getPipePattern()); } - private Map getDevice2MeasurementsMapFromInsertRowsNode( + private Map getDevice2MeasurementsMapFromInsertRowsNode( InsertRowsNode insertRowsNode) { return insertRowsNode.getInsertRowNodeList().stream() .collect( Collectors.toMap( - insertRowNode -> insertRowNode.getDevicePath().getFullPath(), + InsertNode::getDeviceID, InsertNode::getMeasurements, (oldMeasurements, newMeasurements) -> Stream.of(Arrays.asList(oldMeasurements), Arrays.asList(newMeasurements)) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java index 79dbf04ba422..472b1de10739 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.Pair; import java.util.Arrays; @@ -74,7 +75,7 @@ public Optional visitPlan(final PlanNode node, final IoTDBPipePattern public Optional visitCreateTimeSeries( final CreateTimeSeriesNode node, final IoTDBPipePattern pattern) { return pattern.matchesMeasurement( - node.getPath().getDeviceString(), node.getPath().getMeasurement()) + node.getPath().getIDeviceID(), node.getPath().getMeasurement()) ? Optional.of(node) : Optional.empty(); } @@ -87,7 +88,8 @@ public Optional visitCreateAlignedTimeSeries( .filter( index -> pattern.matchesMeasurement( - node.getDevicePath().getFullPath(), node.getMeasurements().get(index))) + node.getDevicePath().getIDeviceIDAsFullDevice(), + node.getMeasurements().get(index))) .toArray(); return filteredIndexes.length > 0 ? Optional.of( @@ -115,7 +117,7 @@ public Optional visitCreateMultiTimeSeries( new Pair<>( entry.getKey(), trimMeasurementGroup( - entry.getKey().getFullPath(), entry.getValue(), pattern))) + entry.getKey().getIDeviceIDAsFullDevice(), entry.getValue(), pattern))) .filter(pair -> Objects.nonNull(pair.getRight())) .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); return !filteredMeasurementGroupMap.isEmpty() @@ -125,7 +127,7 @@ public Optional visitCreateMultiTimeSeries( } private static MeasurementGroup trimMeasurementGroup( - final String device, final MeasurementGroup group, final IoTDBPipePattern pattern) { + final IDeviceID device, final MeasurementGroup group, final IoTDBPipePattern pattern) { final int[] filteredIndexes = IntStream.range(0, group.size()) .filter(index -> pattern.matchesMeasurement(device, group.getMeasurements().get(index))) @@ -154,7 +156,7 @@ private static MeasurementGroup trimMeasurementGroup( public Optional visitAlterTimeSeries( final AlterTimeSeriesNode node, final IoTDBPipePattern pattern) { return pattern.matchesMeasurement( - node.getPath().getDeviceString(), node.getPath().getMeasurement()) + node.getPath().getIDeviceID(), node.getPath().getMeasurement()) ? Optional.of(node) : Optional.empty(); } @@ -165,7 +167,9 @@ public Optional visitInternalCreateTimeSeries( final MeasurementGroup group = pattern.matchPrefixPath(node.getDevicePath().getFullPath()) ? trimMeasurementGroup( - node.getDevicePath().getFullPath(), node.getMeasurementGroup(), pattern) + node.getDevicePath().getIDeviceIDAsFullDevice(), + node.getMeasurementGroup(), + pattern) : null; return Objects.nonNull(group) ? Optional.of( @@ -209,7 +213,7 @@ public Optional visitInternalCreateMultiTimeSeries( new Pair<>( entry.getValue().getLeft(), trimMeasurementGroup( - entry.getKey().getFullPath(), + entry.getKey().getIDeviceIDAsFullDevice(), entry.getValue().getRight(), pattern)))) .filter(pair -> Objects.nonNull(pair.getRight().getRight())) @@ -241,7 +245,7 @@ public Optional visitCreateLogicalView( .filter( entry -> pattern.matchesMeasurement( - entry.getKey().getDeviceString(), entry.getKey().getMeasurement())) + entry.getKey().getIDeviceID(), entry.getKey().getMeasurement())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return !filteredViewPathToSourceMap.isEmpty() ? Optional.of(new CreateLogicalViewNode(node.getPlanNodeId(), filteredViewPathToSourceMap)) @@ -256,7 +260,7 @@ public Optional visitAlterLogicalView( .filter( entry -> pattern.matchesMeasurement( - entry.getKey().getDeviceString(), entry.getKey().getMeasurement())) + entry.getKey().getIDeviceID(), entry.getKey().getMeasurement())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return !filteredViewPathToSourceMap.isEmpty() ? Optional.of(new AlterLogicalViewNode(node.getPlanNodeId(), filteredViewPathToSourceMap)) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java index 4be44a0fecc4..546d8185c3a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java @@ -28,6 +28,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.tsfile.file.metadata.IDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { protected final ReentrantReadWriteLock lock; protected final Set extractors; - protected final Cache> deviceToExtractorsCache; + protected final Cache> deviceToExtractorsCache; public CachedSchemaPatternMatcher() { this.lock = new ReentrantReadWriteLock(); @@ -114,8 +115,8 @@ public Set match(final PipeRealtimeEvent event) .collect(Collectors.toSet()); } - for (final Map.Entry entry : event.getSchemaInfo().entrySet()) { - final String device = entry.getKey(); + for (final Map.Entry entry : event.getSchemaInfo().entrySet()) { + final IDeviceID device = entry.getKey(); final String[] measurements = entry.getValue(); // 1. try to get matched extractors from cache, if not success, match them by device @@ -176,7 +177,7 @@ public Set match(final PipeRealtimeEvent event) return matchedExtractors; } - protected Set filterExtractorsByDevice(final String device) { + protected Set filterExtractorsByDevice(final IDeviceID device) { final Set filteredExtractors = new HashSet<>(); for (final PipeRealtimeDataRegionExtractor extractor : extractors) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java index 6535e6acfe80..26c22f89b3af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java @@ -59,7 +59,7 @@ public Optional visitNode( public Optional visitCreateTimeseries( final CreateTimeSeriesStatement statement, final IoTDBPipePattern pattern) { return pattern.matchesMeasurement( - statement.getPath().getDeviceString(), statement.getPath().getMeasurement()) + statement.getPath().getIDeviceID(), statement.getPath().getMeasurement()) ? Optional.of(statement) : Optional.empty(); } @@ -72,7 +72,7 @@ public Optional visitCreateAlignedTimeseries( .filter( index -> pattern.matchesMeasurement( - statement.getDevicePath().getFullPath(), + statement.getDevicePath().getIDeviceIDAsFullDevice(), statement.getMeasurements().get(index))) .toArray(); if (filteredIndexes.length == 0) { @@ -118,7 +118,10 @@ public Optional visitCreateLogicalView( .filter( index -> pattern.matchesMeasurement( - createLogicalViewStatement.getTargetPathList().get(index).getDeviceString(), + createLogicalViewStatement + .getTargetPathList() + .get(index) + .getIDeviceIDAsFullDevice(), createLogicalViewStatement.getTargetPathList().get(index).getMeasurement())) .toArray(); if (filteredIndexes.length == 0) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java index 1e7587153fb8..07855ebe829f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java @@ -29,6 +29,8 @@ import org.apache.iotdb.pipe.api.event.Event; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -85,7 +87,7 @@ public void testCachedMatcher() throws Exception { new PipeParameters( new HashMap() { { - put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root." + finalI1); + put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root.db" + finalI1); } }), new PipeTaskRuntimeConfiguration( @@ -101,7 +103,7 @@ public void testCachedMatcher() throws Exception { { put( PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, - "root." + finalI + "." + finalJ); + "root.db" + finalI + ".s" + finalJ); } }), new PipeTaskRuntimeConfiguration( @@ -116,18 +118,22 @@ public void testCachedMatcher() throws Exception { int epochNum = 10000; int deviceNum = 1000; int seriesNum = 100; - Map deviceMap = + Map deviceMap = IntStream.range(0, deviceNum) .mapToObj(String::valueOf) - .collect(Collectors.toMap(s -> "root." + s, s -> new String[0])); + .collect( + Collectors.toMap(s -> new StringArrayDeviceID("root.db" + s), s -> new String[0])); String[] measurements = - IntStream.range(0, seriesNum).mapToObj(String::valueOf).toArray(String[]::new); + IntStream.range(0, seriesNum).mapToObj(num -> "s" + num).toArray(String[]::new); long totalTime = 0; for (int i = 0; i < epochNum; i++) { for (int j = 0; j < deviceNum; j++) { PipeRealtimeEvent event = new PipeRealtimeEvent( - null, null, Collections.singletonMap("root." + i, measurements), null); + null, + null, + Collections.singletonMap(new StringArrayDeviceID("root.db" + i), measurements), + null); long startTime = System.currentTimeMillis(); matcher.match(event).forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); @@ -174,7 +180,8 @@ protected void doExtract(PipeRealtimeEvent event) { } else { match[0] = match[0] - || (getPatternString().startsWith(k) || k.startsWith(getPatternString())); + || (getPatternString().startsWith(k.toString()) + || k.toString().startsWith(getPatternString())); } }); Assert.assertTrue(match[0]); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java index db42d713012d..d443577df26a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java @@ -22,6 +22,8 @@ import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.junit.Assert; import org.junit.Test; @@ -62,7 +64,7 @@ public void testIotdbPipePattern() { Assert.assertFalse(new IoTDBPipePattern(t).coversDb(db)); } - final String device = "root.db.d1"; + final IDeviceID device = new StringArrayDeviceID("root.db.d1"); // Test pattern cover device final String[] patternsCoverDevice = { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java index d81c50a57ce0..6008303b3daf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java @@ -21,6 +21,8 @@ import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.junit.Assert; import org.junit.Test; @@ -57,7 +59,7 @@ public void testPrefixPipePattern() { Assert.assertFalse(new PrefixPipePattern(t).coversDb(db)); } - final String device = "root.db.d1"; + final IDeviceID device = new StringArrayDeviceID("root.db.d1"); // Test pattern cover device final String[] patternsCoverDevice = { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java index 6c82c7e3d9cc..b270a2b07001 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java @@ -28,6 +28,8 @@ import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -83,7 +85,7 @@ public boolean coversDb(final String db) { } @Override - public boolean coversDevice(final String device) { + public boolean coversDevice(final IDeviceID device) { try { return patternPartialPath.include( new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); @@ -93,7 +95,7 @@ public boolean coversDevice(final String device) { } @Override - public boolean mayOverlapWithDevice(final String device) { + public boolean mayOverlapWithDevice(final IDeviceID device) { try { // Another way is to use patternPath.overlapWith("device.*"), // there will be no false positives but time cost may be higher. @@ -104,7 +106,7 @@ public boolean mayOverlapWithDevice(final String device) { } @Override - public boolean matchesMeasurement(final String device, final String measurement) { + public boolean matchesMeasurement(final IDeviceID device, final String measurement) { // For aligned timeseries, empty measurement is an alias of the time column. if (Objects.isNull(measurement) || measurement.isEmpty()) { return false; @@ -118,14 +120,17 @@ public boolean matchesMeasurement(final String device, final String measurement) } /** - * Check if the {@link PipePattern} matches the given prefix path. - * - *

NOTE: In schema transmission, {@link #mayOverlapWithDevice(String)} can be used to detect - * whether the given path can act as a parent path of the {@link PipePattern}, and to transmit - * possibly used schemas like database creation and template setting. + * Check if the {@link PipePattern} matches the given prefix path. In schema transmission, this + * can be used to detect whether the given path can act as a parent path of the {@link + * PipePattern}, and to transmit possibly used schemas like database creation and template + * setting. */ public boolean matchPrefixPath(final String path) { - return mayOverlapWithDevice(path); + try { + return patternPartialPath.matchPrefixPath(new PartialPath(path)); + } catch (final IllegalPathException e) { + return false; + } } /** diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java index 803c24621cf4..510cd02b8ed4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java @@ -21,6 +21,7 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.tsfile.file.metadata.IDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,7 +109,7 @@ public static PipePattern parsePipePatternFromSourceParameters( public abstract boolean coversDb(final String db); /** Check if a device's all measurements are covered by this pattern. */ - public abstract boolean coversDevice(final String device); + public abstract boolean coversDevice(final IDeviceID device); /** * Check if a device may have some measurements matched by the pattern. @@ -118,14 +119,14 @@ public static PipePattern parsePipePatternFromSourceParameters( *

NOTE2: this is just a loose check and may have false positives. To further check if a * measurement matches the pattern, please use {@link PipePattern#matchesMeasurement} after this. */ - public abstract boolean mayOverlapWithDevice(final String device); + public abstract boolean mayOverlapWithDevice(final IDeviceID device); /** * Check if a full path with device and measurement can be matched by pattern. * - *

NOTE: this is only called when {@link PipePattern#mayOverlapWithDevice(String)} is true. + *

NOTE: this is only called when {@link PipePattern#mayOverlapWithDevice} is true. */ - public abstract boolean matchesMeasurement(final String device, final String measurement); + public abstract boolean matchesMeasurement(final IDeviceID device, final String measurement); @Override public String toString() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java index 52f70057052f..50e3c463a725 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.file.metadata.IDeviceID; import java.util.Arrays; @@ -78,28 +79,31 @@ public boolean coversDb(final String db) { } @Override - public boolean coversDevice(final String device) { + public boolean coversDevice(final IDeviceID device) { + final String deviceStr = device.toString(); // for example, pattern is root.a.b and device is root.a.b.c // in this case, the extractor can be matched without checking the measurements - return pattern.length() <= device.length() && device.startsWith(pattern); + return pattern.length() <= deviceStr.length() && deviceStr.startsWith(pattern); } @Override - public boolean mayOverlapWithDevice(final String device) { + public boolean mayOverlapWithDevice(final IDeviceID device) { + final String deviceStr = device.toString(); return ( // for example, pattern is root.a.b and device is root.a.b.c // in this case, the extractor can be matched without checking the measurements - pattern.length() <= device.length() && device.startsWith(pattern)) + pattern.length() <= deviceStr.length() && deviceStr.startsWith(pattern)) // for example, pattern is root.a.b.c and device is root.a.b // in this case, the extractor can be selected as candidate, but the measurements should // be checked further - || (pattern.length() > device.length() && pattern.startsWith(device)); + || (pattern.length() > deviceStr.length() && pattern.startsWith(deviceStr)); } @Override - public boolean matchesMeasurement(final String device, final String measurement) { + public boolean matchesMeasurement(final IDeviceID device, String measurement) { + final String deviceStr = device.toString(); // We assume that the device is already matched. - if (pattern.length() <= device.length()) { + if (pattern.length() <= deviceStr.length()) { return true; } @@ -109,9 +113,9 @@ public boolean matchesMeasurement(final String device, final String measurement) final String dotAndMeasurement = TsFileConstant.PATH_SEPARATOR + measurement; return // low cost check comes first - pattern.length() <= device.length() + dotAndMeasurement.length() + pattern.length() <= deviceStr.length() + dotAndMeasurement.length() // high cost check comes later - && dotAndMeasurement.startsWith(pattern.substring(device.length())); + && dotAndMeasurement.startsWith(pattern.substring(deviceStr.length())); } @Override