From 67522aec840e915565d5d2496a205d5ed20e97d5 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 20 May 2024 10:49:18 +0800 Subject: [PATCH 01/10] Replaced IDeviceID in pipe module --- .../PipeInsertNodeTabletInsertionEvent.java | 58 +++++----- .../tablet/TabletInsertionDataContainer.java | 102 +++++++++--------- .../tsfile/TsFileInsertionDataContainer.java | 27 ++--- .../tsfile/TsFileInsertionPointCounter.java | 16 +-- .../event/realtime/PipeRealtimeEvent.java | 10 +- .../realtime/epoch/TsFileEpochManager.java | 16 ++- .../pattern/CachedSchemaPatternMatcher.java | 17 +-- .../db/pipe/pattern/IoTDBPipePatternTest.java | 48 +++++---- .../pipe/pattern/PrefixPipePatternTest.java | 48 +++++---- .../pipe/pattern/IoTDBPipePattern.java | 8 +- .../commons/pipe/pattern/PipePattern.java | 7 +- .../pipe/pattern/PrefixPipePattern.java | 22 ++-- 12 files changed, 199 insertions(+), 180 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index dad53f2b69de..02eada5b671f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -64,10 +64,10 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent private ProgressIndex progressIndex; public PipeInsertNodeTabletInsertionEvent( - WALEntryHandler walEntryHandler, - ProgressIndex progressIndex, - boolean isAligned, - boolean isGeneratedByPipe) { + final WALEntryHandler walEntryHandler, + final ProgressIndex progressIndex, + final boolean isAligned, + final boolean isGeneratedByPipe) { this( walEntryHandler, progressIndex, @@ -81,15 +81,15 @@ public PipeInsertNodeTabletInsertionEvent( } private PipeInsertNodeTabletInsertionEvent( - WALEntryHandler walEntryHandler, - ProgressIndex progressIndex, - boolean isAligned, - boolean isGeneratedByPipe, - String pipeName, - PipeTaskMeta pipeTaskMeta, - PipePattern pattern, - long startTime, - long endTime) { + final WALEntryHandler walEntryHandler, + final ProgressIndex progressIndex, + final boolean isAligned, + final boolean isGeneratedByPipe, + final String pipeName, + final PipeTaskMeta pipeTaskMeta, + final PipePattern pattern, + final long startTime, + final long endTime) { super(pipeName, pipeTaskMeta, pattern, startTime, endTime); this.walEntryHandler = walEntryHandler; this.progressIndex = progressIndex; @@ -116,11 +116,11 @@ public InsertNode getInsertNodeViaCacheIfPossible() { /////////////////////////// EnrichedEvent /////////////////////////// @Override - public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { + public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { try { PipeResourceManager.wal().pin(walEntryHandler); return true; - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn( String.format( "Increase reference count for memtable %d error. Holder Message: %s", @@ -131,7 +131,7 @@ public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { } @Override - public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { + public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { try { PipeResourceManager.wal().unpin(walEntryHandler); // Release the containers' memory. @@ -140,7 +140,7 @@ public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { dataContainers = null; } return true; - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn( String.format( "Decrease reference count for memtable %d error. Holder Message: %s", @@ -151,7 +151,7 @@ public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { } @Override - public void bindProgressIndex(ProgressIndex progressIndex) { + public void bindProgressIndex(final ProgressIndex progressIndex) { this.progressIndex = progressIndex; } @@ -162,11 +162,11 @@ public ProgressIndex getProgressIndex() { @Override public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( - String pipeName, - PipeTaskMeta pipeTaskMeta, - PipePattern pattern, - long startTime, - long endTime) { + final String pipeName, + final PipeTaskMeta pipeTaskMeta, + final PipePattern pattern, + final long startTime, + final long endTime) { return new PipeInsertNodeTabletInsertionEvent( walEntryHandler, progressIndex, @@ -210,7 +210,7 @@ public boolean mayEventTimeOverlappedWithTimeRange() { throw new UnSupportedDataTypeException( String.format("InsertNode type %s is not supported.", insertNode.getClass().getName())); } - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn( "Exception occurred when determining the event time of PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}]. Returning true to ensure data integrity.", this, @@ -224,7 +224,8 @@ public boolean mayEventTimeOverlappedWithTimeRange() { /////////////////////////// TabletInsertionEvent /////////////////////////// @Override - public Iterable processRowByRow(BiConsumer consumer) { + public Iterable processRowByRow( + final BiConsumer consumer) { return initDataContainers().stream() .map(tabletInsertionDataContainer -> tabletInsertionDataContainer.processRowByRow(consumer)) .flatMap(Collection::stream) @@ -232,7 +233,8 @@ public Iterable processRowByRow(BiConsumer processTablet(BiConsumer consumer) { + public Iterable processTablet( + final BiConsumer consumer) { return initDataContainers().stream() .map(tabletInsertionDataContainer -> tabletInsertionDataContainer.processTablet(consumer)) .flatMap(Collection::stream) @@ -283,7 +285,7 @@ private List initDataContainers() { } return dataContainers; - } catch (Exception e) { + } catch (final Exception e) { throw new PipeException("Initialize data container error.", e); } } @@ -303,7 +305,7 @@ public boolean shouldParsePattern() { final InsertNode node = getInsertNodeViaCacheIfPossible(); return super.shouldParsePattern() && Objects.nonNull(pipePattern) - && (Objects.isNull(node) || !pipePattern.coversDevice(node.getDevicePath().getFullPath())); + && (Objects.isNull(node) || !pipePattern.coversDevice(node.getDeviceID())); } public List toRawTabletInsertionEvents() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index e95bc5373b16..662f71af2953 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -33,6 +33,8 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.UnSupportedDataTypeException; @@ -61,7 +63,7 @@ public class TabletInsertionDataContainer { private final EnrichedEvent sourceEvent; // used to report progress and filter value columns by time range - private String deviceId; + private IDeviceID deviceId; private boolean isAligned; private MeasurementSchema[] measurementSchemaList; private String[] columnNameStringList; @@ -89,10 +91,10 @@ public class TabletInsertionDataContainer { } public TabletInsertionDataContainer( - PipeTaskMeta pipeTaskMeta, - EnrichedEvent sourceEvent, - InsertNode insertNode, - PipePattern pattern) { + final PipeTaskMeta pipeTaskMeta, + final EnrichedEvent sourceEvent, + final InsertNode insertNode, + final PipePattern pattern) { this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; @@ -107,11 +109,11 @@ public TabletInsertionDataContainer( } public TabletInsertionDataContainer( - PipeTaskMeta pipeTaskMeta, - EnrichedEvent sourceEvent, - Tablet tablet, - boolean isAligned, - PipePattern pattern) { + final PipeTaskMeta pipeTaskMeta, + final EnrichedEvent sourceEvent, + final Tablet tablet, + final boolean isAligned, + final PipePattern pattern) { this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; @@ -119,7 +121,7 @@ public TabletInsertionDataContainer( } @TestOnly - public TabletInsertionDataContainer(InsertNode insertNode, PipePattern pattern) { + public TabletInsertionDataContainer(final InsertNode insertNode, final PipePattern pattern) { this(null, null, insertNode, pattern); } @@ -133,15 +135,15 @@ public void markAsNeedToReport() { //////////////////////////// parse //////////////////////////// - private void parse(InsertRowNode insertRowNode, PipePattern pattern) { + private void parse(final InsertRowNode insertRowNode, final PipePattern pattern) { final int originColumnSize = insertRowNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; - this.deviceId = insertRowNode.getDevicePath().getFullPath(); + this.deviceId = insertRowNode.getDeviceID(); this.isAligned = insertRowNode.isAligned(); final long[] originTimestampColumn = new long[] {insertRowNode.getTime()}; - List rowIndexList = generateRowIndexList(originTimestampColumn); + final List rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); generateColumnIndexMapper( @@ -170,7 +172,7 @@ private void parse(InsertRowNode insertRowNode, PipePattern pattern) { this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; - BitMap bitMap = new BitMap(this.timestampColumn.length); + final BitMap bitMap = new BitMap(this.timestampColumn.length); if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) { this.valueColumns[filteredColumnIndex] = null; bitMap.markAll(); @@ -200,16 +202,16 @@ private void parse(InsertRowNode insertRowNode, PipePattern pattern) { } } - private void parse(InsertTabletNode insertTabletNode, PipePattern pattern) { + private void parse(final InsertTabletNode insertTabletNode, final PipePattern pattern) { final int originColumnSize = insertTabletNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; - this.deviceId = insertTabletNode.getDevicePath().getFullPath(); + this.deviceId = insertTabletNode.getDeviceID(); this.isAligned = insertTabletNode.isAligned(); final long[] originTimestampColumn = insertTabletNode.getTimes(); final int originRowSize = originTimestampColumn.length; - List rowIndexList = generateRowIndexList(originTimestampColumn); + final List rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); generateColumnIndexMapper( @@ -253,7 +255,7 @@ private void parse(InsertTabletNode insertTabletNode, PipePattern pattern) { this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; - BitMap bitMap = new BitMap(this.timestampColumn.length); + final BitMap bitMap = new BitMap(this.timestampColumn.length); if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) { this.valueColumns[filteredColumnIndex] = null; bitMap.markAll(); @@ -283,11 +285,12 @@ private void parse(InsertTabletNode insertTabletNode, PipePattern pattern) { } } - private void parse(Tablet tablet, boolean isAligned, PipePattern pattern) { + private void parse(final Tablet tablet, final boolean isAligned, final PipePattern pattern) { final int originColumnSize = tablet.getSchemas().size(); final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; - this.deviceId = tablet.deviceId; + // Only support tree-model tablet + this.deviceId = new StringArrayDeviceID(tablet.getDeviceId()); this.isAligned = isAligned; final long[] originTimestampColumn = @@ -344,7 +347,7 @@ private void parse(Tablet tablet, boolean isAligned, PipePattern pattern) { this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList.get(i); this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; - BitMap bitMap = new BitMap(this.timestampColumn.length); + final BitMap bitMap = new BitMap(this.timestampColumn.length); if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) { this.valueColumns[filteredColumnIndex] = null; bitMap.markAll(); @@ -375,9 +378,9 @@ private void parse(Tablet tablet, boolean isAligned, PipePattern pattern) { } private void generateColumnIndexMapper( - String[] originMeasurementList, - PipePattern pattern, - Integer[] originColumnIndex2FilteredColumnIndexMapperList) { + final String[] originMeasurementList, + final PipePattern pattern, + final Integer[] originColumnIndex2FilteredColumnIndexMapperList) { final int originColumnSize = originMeasurementList.length; // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c @@ -414,7 +417,7 @@ private List generateRowIndexList(final long[] originTimestampColumn) { return generateFullRowIndexList(rowCount); } - List rowIndexList = new ArrayList<>(); + final List rowIndexList = new ArrayList<>(); // We assume that `originTimestampColumn` is ordered. if (originTimestampColumn[originTimestampColumn.length - 1] < sourceEvent.getStartTime() || originTimestampColumn[0] > sourceEvent.getEndTime()) { @@ -431,7 +434,7 @@ private List generateRowIndexList(final long[] originTimestampColumn) { return rowIndexList; } - private static List generateFullRowIndexList(int rowCount) { + private static List generateFullRowIndexList(final int rowCount) { if (rowCount <= CACHED_FULL_ROW_INDEX_LIST_ROW_COUNT_UPPER) { return cachedFullRowIndexList.get(rowCount); } @@ -439,20 +442,20 @@ private static List generateFullRowIndexList(int rowCount) { } private static Object filterValueColumnsByRowIndexList( - @NonNull TSDataType type, - @NonNull Object originValueColumn, - @NonNull List rowIndexList, - boolean isSingleOriginValueColumn, - @NonNull BitMap originNullValueColumnBitmap, - @NonNull BitMap nullValueColumnBitmap /* output parameters */) { + @NonNull final TSDataType type, + @NonNull final Object originValueColumn, + @NonNull final List rowIndexList, + final boolean isSingleOriginValueColumn, + @NonNull final BitMap originNullValueColumnBitmap, + @NonNull final BitMap nullValueColumnBitmap /* output parameters */) { switch (type) { case INT32: { - int[] intValueColumns = + final int[] intValueColumns = isSingleOriginValueColumn ? new int[] {(int) originValueColumn} : (int[]) originValueColumn; - int[] valueColumns = new int[rowIndexList.size()]; + final int[] valueColumns = new int[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = 0; @@ -465,11 +468,11 @@ private static Object filterValueColumnsByRowIndexList( } case INT64: { - long[] longValueColumns = + final long[] longValueColumns = isSingleOriginValueColumn ? new long[] {(long) originValueColumn} : (long[]) originValueColumn; - long[] valueColumns = new long[rowIndexList.size()]; + final long[] valueColumns = new long[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = 0L; @@ -482,11 +485,11 @@ private static Object filterValueColumnsByRowIndexList( } case FLOAT: { - float[] floatValueColumns = + final float[] floatValueColumns = isSingleOriginValueColumn ? new float[] {(float) originValueColumn} : (float[]) originValueColumn; - float[] valueColumns = new float[rowIndexList.size()]; + final float[] valueColumns = new float[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = 0F; @@ -499,11 +502,11 @@ private static Object filterValueColumnsByRowIndexList( } case DOUBLE: { - double[] doubleValueColumns = + final double[] doubleValueColumns = isSingleOriginValueColumn ? new double[] {(double) originValueColumn} : (double[]) originValueColumn; - double[] valueColumns = new double[rowIndexList.size()]; + final double[] valueColumns = new double[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = 0D; @@ -516,11 +519,11 @@ private static Object filterValueColumnsByRowIndexList( } case BOOLEAN: { - boolean[] booleanValueColumns = + final boolean[] booleanValueColumns = isSingleOriginValueColumn ? new boolean[] {(boolean) originValueColumn} : (boolean[]) originValueColumn; - boolean[] valueColumns = new boolean[rowIndexList.size()]; + final boolean[] valueColumns = new boolean[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = false; @@ -533,11 +536,11 @@ private static Object filterValueColumnsByRowIndexList( } case TEXT: { - Binary[] binaryValueColumns = + final Binary[] binaryValueColumns = isSingleOriginValueColumn ? new Binary[] {(Binary) originValueColumn} : (Binary[]) originValueColumn; - Binary[] valueColumns = new Binary[rowIndexList.size()]; + final Binary[] valueColumns = new Binary[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)]) || Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues()) @@ -558,7 +561,7 @@ private static Object filterValueColumnsByRowIndexList( //////////////////////////// process //////////////////////////// - public List processRowByRow(BiConsumer consumer) { + public List processRowByRow(final BiConsumer consumer) { if (valueColumns.length == 0 || timestampColumn.length == 0) { return Collections.emptyList(); } @@ -568,7 +571,7 @@ public List processRowByRow(BiConsumer consumer.accept( new PipeRow( i, - deviceId, + deviceId.toString(), isAligned, measurementSchemaList, timestampColumn, @@ -581,7 +584,7 @@ public List processRowByRow(BiConsumer return rowCollector.convertToTabletInsertionEvents(shouldReport); } - public List processTablet(BiConsumer consumer) { + public List processTablet(final BiConsumer consumer) { final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, sourceEvent); consumer.accept(convertToTablet(), rowCollector); return rowCollector.convertToTabletInsertionEvents(shouldReport); @@ -594,7 +597,8 @@ public Tablet convertToTablet() { return tablet; } - final Tablet newTablet = new Tablet(deviceId, Arrays.asList(measurementSchemaList), rowCount); + final Tablet newTablet = + new Tablet(deviceId.toString(), Arrays.asList(measurementSchemaList), rowCount); newTablet.timestamps = timestampColumn; newTablet.bitMaps = nullValueColumnBitmaps; newTablet.values = valueColumns; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index 1c1f0d02dee2..3bb289d97db6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -78,17 +78,18 @@ public class TsFileInsertionDataContainer implements AutoCloseable { private boolean shouldParsePattern = false; public TsFileInsertionDataContainer( - File tsFile, PipePattern pattern, long startTime, long endTime) throws IOException { + final File tsFile, final PipePattern pattern, final long startTime, final long endTime) + throws IOException { this(tsFile, pattern, startTime, endTime, null, null); } public TsFileInsertionDataContainer( - File tsFile, - PipePattern pattern, - long startTime, - long endTime, - PipeTaskMeta pipeTaskMeta, - EnrichedEvent sourceEvent) + final File tsFile, + final PipePattern pattern, + final long startTime, + final long endTime, + final PipeTaskMeta pipeTaskMeta, + final EnrichedEvent sourceEvent) throws IOException { this.pattern = pattern; timeFilterExpression = @@ -135,17 +136,17 @@ public TsFileInsertionDataContainer( // No longer need this. Help GC. tsFileSequenceReader.clearCachedDeviceMetadata(); - } catch (Exception e) { + } catch (final Exception e) { close(); throw e; } } private Map> filterDeviceMeasurementsMapByPattern( - Map> originalDeviceMeasurementsMap) { + final Map> originalDeviceMeasurementsMap) { final Map> filteredDeviceMeasurementsMap = new HashMap<>(); for (Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { - final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID(); + final IDeviceID deviceId = entry.getKey(); // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c // in this case, all data can be matched without checking the measurements @@ -221,7 +222,7 @@ public boolean hasNext() { ((PlainDeviceID) entry.getKey()).toStringID(), entry.getValue(), timeFilterExpression); - } catch (IOException e) { + } catch (final IOException e) { close(); throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e); } @@ -277,7 +278,7 @@ public void close() { if (tsFileReader != null) { tsFileReader.close(); } - } catch (IOException e) { + } catch (final IOException e) { LOGGER.warn("Failed to close TsFileReader", e); } @@ -285,7 +286,7 @@ public void close() { if (tsFileSequenceReader != null) { tsFileSequenceReader.close(); } - } catch (IOException e) { + } catch (final IOException e) { LOGGER.warn("Failed to close TsFileSequenceReader", e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java index 086b03c45583..386acf1758ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern; import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.PlainDeviceID; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.read.TsFileSequenceReader; import org.slf4j.Logger; @@ -52,7 +51,8 @@ public class TsFileInsertionPointCounter implements AutoCloseable { private long count = 0; - public TsFileInsertionPointCounter(File tsFile, PipePattern pattern) throws IOException { + public TsFileInsertionPointCounter(final File tsFile, final PipePattern pattern) + throws IOException { this.pattern = pattern; try { @@ -80,15 +80,15 @@ private Map> filterDeviceMeasurementsMapByPattern() throw tsFileSequenceReader.getDeviceMeasurementsMap(); final Map> filteredDeviceMeasurementsMap = new HashMap<>(); - for (Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { - final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID(); + for (final Map.Entry> entry : + originalDeviceMeasurementsMap.entrySet()) { + final IDeviceID deviceId = entry.getKey(); // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c // in this case, all data can be matched without checking the measurements if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) { if (!entry.getValue().isEmpty()) { - filteredDeviceMeasurementsMap.put( - new PlainDeviceID(deviceId), new HashSet<>(entry.getValue())); + filteredDeviceMeasurementsMap.put(deviceId, new HashSet<>(entry.getValue())); } } @@ -107,7 +107,7 @@ else if (pattern.mayOverlapWithDevice(deviceId)) { } if (!filteredMeasurements.isEmpty()) { - filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId), filteredMeasurements); + filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements); } } @@ -162,7 +162,7 @@ public void close() { if (tsFileSequenceReader != null) { tsFileSequenceReader.close(); } - } catch (IOException e) { + } catch (final IOException e) { LOGGER.warn("Failed to close TsFileSequenceReader", e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java index 77df1f0292be..0717dc4d5c3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java @@ -25,6 +25,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.util.Map; /** @@ -37,12 +39,12 @@ public class PipeRealtimeEvent extends EnrichedEvent { private final EnrichedEvent event; private final TsFileEpoch tsFileEpoch; - private Map device2Measurements; + private Map device2Measurements; public PipeRealtimeEvent( final EnrichedEvent event, final TsFileEpoch tsFileEpoch, - final Map device2Measurements, + final Map device2Measurements, final PipePattern pattern) { this(event, tsFileEpoch, device2Measurements, null, pattern, Long.MIN_VALUE, Long.MAX_VALUE); } @@ -50,7 +52,7 @@ public PipeRealtimeEvent( public PipeRealtimeEvent( final EnrichedEvent event, final TsFileEpoch tsFileEpoch, - final Map device2Measurements, + final Map device2Measurements, final PipeTaskMeta pipeTaskMeta, final PipePattern pattern, final long startTime, @@ -73,7 +75,7 @@ public TsFileEpoch getTsFileEpoch() { return tsFileEpoch; } - public Map getSchemaInfo() { + public Map getSchemaInfo() { return device2Measurements; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java index 7c4e5d17eebd..94b6b8c085eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java @@ -19,13 +19,12 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch; +import com.google.common.base.Functions; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; - -import org.apache.tsfile.file.metadata.PlainDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +42,7 @@ public class TsFileEpochManager { private final ConcurrentMap filePath2Epoch = new ConcurrentHashMap<>(); public PipeRealtimeEvent bindPipeTsFileInsertionEvent( - PipeTsFileInsertionEvent event, TsFileResource resource) { + final PipeTsFileInsertionEvent event, final TsFileResource resource) { final String filePath = resource.getTsFilePath(); // This would not happen, but just in case @@ -64,22 +63,21 @@ public PipeRealtimeEvent bindPipeTsFileInsertionEvent( event, epoch, resource.getDevices().stream() - .collect( - Collectors.toMap( - device -> ((PlainDeviceID) device).toStringID(), - device -> EMPTY_MEASUREMENT_ARRAY)), + .collect(Collectors.toMap(Functions.identity(), device -> EMPTY_MEASUREMENT_ARRAY)), event.getPipePattern()); } public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent( - PipeInsertNodeTabletInsertionEvent event, InsertNode node, TsFileResource resource) { + final PipeInsertNodeTabletInsertionEvent event, + final InsertNode node, + final TsFileResource resource) { final TsFileEpoch epoch = filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), TsFileEpoch::new); epoch.updateInsertNodeMinTime(node.getMinTime()); return new PipeRealtimeEvent( event, epoch, - Collections.singletonMap(node.getDevicePath().getFullPath(), node.getMeasurements()), + Collections.singletonMap(node.getDeviceID(), node.getMeasurements()), event.getPipePattern()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java index 1de43b7042e1..546d8185c3a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java @@ -28,6 +28,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.tsfile.file.metadata.IDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { protected final ReentrantReadWriteLock lock; protected final Set extractors; - protected final Cache> deviceToExtractorsCache; + protected final Cache> deviceToExtractorsCache; public CachedSchemaPatternMatcher() { this.lock = new ReentrantReadWriteLock(); @@ -61,7 +62,7 @@ public CachedSchemaPatternMatcher() { } @Override - public void register(PipeRealtimeDataRegionExtractor extractor) { + public void register(final PipeRealtimeDataRegionExtractor extractor) { lock.writeLock().lock(); try { extractors.add(extractor); @@ -72,7 +73,7 @@ public void register(PipeRealtimeDataRegionExtractor extractor) { } @Override - public void deregister(PipeRealtimeDataRegionExtractor extractor) { + public void deregister(final PipeRealtimeDataRegionExtractor extractor) { lock.writeLock().lock(); try { extractors.remove(extractor); @@ -93,7 +94,7 @@ public int getRegisterCount() { } @Override - public Set match(PipeRealtimeEvent event) { + public Set match(final PipeRealtimeEvent event) { final Set matchedExtractors = new HashSet<>(); lock.readLock().lock(); @@ -114,8 +115,8 @@ public Set match(PipeRealtimeEvent event) { .collect(Collectors.toSet()); } - for (final Map.Entry entry : event.getSchemaInfo().entrySet()) { - final String device = entry.getKey(); + for (final Map.Entry entry : event.getSchemaInfo().entrySet()) { + final IDeviceID device = entry.getKey(); final String[] measurements = entry.getValue(); // 1. try to get matched extractors from cache, if not success, match them by device @@ -176,10 +177,10 @@ public Set match(PipeRealtimeEvent event) { return matchedExtractors; } - protected Set filterExtractorsByDevice(String device) { + protected Set filterExtractorsByDevice(final IDeviceID device) { final Set filteredExtractors = new HashSet<>(); - for (PipeRealtimeDataRegionExtractor extractor : extractors) { + for (final PipeRealtimeDataRegionExtractor extractor : extractors) { // Return if the extractor only extract deletion if (!extractor.shouldExtractInsertion()) { continue; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java index 5c74f3ea21c1..d443577df26a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java @@ -22,6 +22,8 @@ import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.junit.Assert; import org.junit.Test; @@ -30,48 +32,48 @@ public class IoTDBPipePatternTest { @Test public void testIotdbPipePattern() { // Test legal and illegal pattern - String[] legalPatterns = { + final String[] legalPatterns = { "root", "root.db", "root.db.d1.s", "root.db.`1`", "root.*.d.*s.s", }; - String[] illegalPatterns = { + final String[] illegalPatterns = { "root.", "roo", "", "root..", "root./", }; - for (String s : legalPatterns) { + for (final String s : legalPatterns) { Assert.assertTrue(new IoTDBPipePattern(s).isLegal()); } - for (String t : illegalPatterns) { + for (final String t : illegalPatterns) { try { Assert.assertFalse(new IoTDBPipePattern(t).isLegal()); - } catch (Exception e) { + } catch (final Exception e) { Assert.assertTrue(e instanceof PipeException); } } // Test pattern cover db - String db = "root.db"; - String[] patternsCoverDb = { + final String db = "root.db"; + final String[] patternsCoverDb = { "root.**", "root.db.**", "root.*db*.**", }; - String[] patternsNotCoverDb = { + final String[] patternsNotCoverDb = { "root.db", "root.*", "root.*.*", "root.db.*.**", "root.db.d1", "root.**.db.**", }; - for (String s : patternsCoverDb) { + for (final String s : patternsCoverDb) { Assert.assertTrue(new IoTDBPipePattern(s).coversDb(db)); } - for (String t : patternsNotCoverDb) { + for (final String t : patternsNotCoverDb) { Assert.assertFalse(new IoTDBPipePattern(t).coversDb(db)); } - String device = "root.db.d1"; + final IDeviceID device = new StringArrayDeviceID("root.db.d1"); // Test pattern cover device - String[] patternsCoverDevice = { + final String[] patternsCoverDevice = { "root.**", "root.db.**", "root.*.*.*", "root.db.d1.*", "root.*db*.*d*.*", "root.**.*1.*", }; - String[] patternsNotCoverDevice = { + final String[] patternsNotCoverDevice = { "root.*", "root.*.*", "root.db.d1", "root.db.d2.*", "root.**.d2.**", }; - for (String s : patternsCoverDevice) { + for (final String s : patternsCoverDevice) { Assert.assertTrue(new IoTDBPipePattern(s).coversDevice(device)); } for (String t : patternsNotCoverDevice) { @@ -79,31 +81,31 @@ public void testIotdbPipePattern() { } // Test pattern may overlap with device - String[] patternsOverlapWithDevice = { + final String[] patternsOverlapWithDevice = { "root.db.**", "root.db.d1", "root.db.d1.*", "root.db.d1.s1", "root.**.d2.**", "root.*.d*.**", }; - String[] patternsNotOverlapWithDevice = { + final String[] patternsNotOverlapWithDevice = { "root.db.d2.**", "root.db2.d1.**", "root.db.db.d1.**", }; - for (String s : patternsOverlapWithDevice) { + for (final String s : patternsOverlapWithDevice) { Assert.assertTrue(new IoTDBPipePattern(s).mayOverlapWithDevice(device)); } - for (String t : patternsNotOverlapWithDevice) { + for (final String t : patternsNotOverlapWithDevice) { Assert.assertFalse(new IoTDBPipePattern(t).mayOverlapWithDevice(device)); } // Test pattern match measurement - String measurement = "s1"; - String[] patternsMatchMeasurement = { + final String measurement = "s1"; + final String[] patternsMatchMeasurement = { "root.db.d1.s1", "root.db.d1.*", }; - String[] patternsNotMatchMeasurement = { + final String[] patternsNotMatchMeasurement = { "root.db.d1", "root.db.d1", "root.db.d1.*.*", }; - for (String s : patternsMatchMeasurement) { + for (final String s : patternsMatchMeasurement) { Assert.assertTrue(new IoTDBPipePattern(s).matchesMeasurement(device, measurement)); } - for (String t : patternsNotMatchMeasurement) { + for (final String t : patternsNotMatchMeasurement) { Assert.assertFalse(new IoTDBPipePattern(t).matchesMeasurement(device, measurement)); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java index cb327d2dac05..6008303b3daf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java @@ -21,6 +21,8 @@ import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.junit.Assert; import org.junit.Test; @@ -29,76 +31,76 @@ public class PrefixPipePatternTest { @Test public void testPrefixPipePattern() { // Test legal and illegal pattern - String[] legalPatterns = { + final String[] legalPatterns = { "root", "root.", "root.db", "root.db.d1.s", "root.db.`1`", }; - String[] illegalPatterns = { + final String[] illegalPatterns = { "roo", "", "root..", "root./", }; - for (String s : legalPatterns) { + for (final String s : legalPatterns) { Assert.assertTrue(new PrefixPipePattern(s).isLegal()); } - for (String t : illegalPatterns) { + for (final String t : illegalPatterns) { Assert.assertFalse(new PrefixPipePattern(t).isLegal()); } // Test pattern cover db - String db = "root.db"; - String[] patternsCoverDb = { + final String db = "root.db"; + final String[] patternsCoverDb = { "root", "root.", "root.d", "root.db", }; - String[] patternsNotCoverDb = { + final String[] patternsNotCoverDb = { "root.**", "root.db.", }; - for (String s : patternsCoverDb) { + for (final String s : patternsCoverDb) { Assert.assertTrue(new PrefixPipePattern(s).coversDb(db)); } - for (String t : patternsNotCoverDb) { + for (final String t : patternsNotCoverDb) { Assert.assertFalse(new PrefixPipePattern(t).coversDb(db)); } - String device = "root.db.d1"; + final IDeviceID device = new StringArrayDeviceID("root.db.d1"); // Test pattern cover device - String[] patternsCoverDevice = { + final String[] patternsCoverDevice = { "root", "root.", "root.d", "root.db", "root.db.", "root.db.d", "root.db.d1", }; - String[] patternsNotCoverDevice = { + final String[] patternsNotCoverDevice = { "root.db.d1.", "root.db.d1.s1", "root.**", "root.db.d2", }; - for (String s : patternsCoverDevice) { + for (final String s : patternsCoverDevice) { Assert.assertTrue(new PrefixPipePattern(s).coversDevice(device)); } - for (String t : patternsNotCoverDevice) { + for (final String t : patternsNotCoverDevice) { Assert.assertFalse(new PrefixPipePattern(t).coversDevice(device)); } // Test pattern may overlap with device - String[] patternsOverlapWithDevice = { + final String[] patternsOverlapWithDevice = { "root", "root.db.d1", "root.db.d1.", "root.db.d1.s1", }; - String[] patternsNotOverlapWithDevice = { + final String[] patternsNotOverlapWithDevice = { "root.db.d2", "root.**", }; - for (String s : patternsOverlapWithDevice) { + for (final String s : patternsOverlapWithDevice) { Assert.assertTrue(new PrefixPipePattern(s).mayOverlapWithDevice(device)); } - for (String t : patternsNotOverlapWithDevice) { + for (final String t : patternsNotOverlapWithDevice) { Assert.assertFalse(new PrefixPipePattern(t).mayOverlapWithDevice(device)); } // Test pattern match measurement - String measurement = "s1"; - String[] patternsMatchMeasurement = { + final String measurement = "s1"; + final String[] patternsMatchMeasurement = { "root.db.d1", "root.db.d1.", "root.db.d1.s", "root.db.d1.s1", }; - String[] patternsNotMatchMeasurement = { + final String[] patternsNotMatchMeasurement = { "root.db.d1.s11", "root.db.d1.s2", }; - for (String s : patternsMatchMeasurement) { + for (final String s : patternsMatchMeasurement) { Assert.assertTrue(new PrefixPipePattern(s).matchesMeasurement(device, measurement)); } - for (String t : patternsNotMatchMeasurement) { + for (final String t : patternsNotMatchMeasurement) { Assert.assertFalse(new PrefixPipePattern(t).matchesMeasurement(device, measurement)); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java index ec4192adff37..bd3a1698f17f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.util.Objects; public class IoTDBPipePattern extends PipePattern { @@ -72,7 +74,7 @@ public boolean coversDb(String db) { } @Override - public boolean coversDevice(String device) { + public boolean coversDevice(IDeviceID device) { try { return patternPartialPath.include( new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); @@ -82,7 +84,7 @@ public boolean coversDevice(String device) { } @Override - public boolean mayOverlapWithDevice(String device) { + public boolean mayOverlapWithDevice(IDeviceID device) { try { // Another way is to use patternPath.overlapWith("device.*"), // there will be no false positives but time cost may be higher. @@ -93,7 +95,7 @@ public boolean mayOverlapWithDevice(String device) { } @Override - public boolean matchesMeasurement(String device, String measurement) { + public boolean matchesMeasurement(IDeviceID device, String measurement) { // For aligned timeseries, empty measurement is an alias of the time column. if (Objects.isNull(measurement) || measurement.isEmpty()) { return false; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java index 04be541f6252..76bbfd09f475 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java @@ -21,6 +21,7 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.tsfile.file.metadata.IDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,7 +108,7 @@ public static PipePattern parsePipePatternFromSourceParameters(PipeParameters so public abstract boolean coversDb(String db); /** Check if a device's all measurements are covered by this pattern. */ - public abstract boolean coversDevice(String device); + public abstract boolean coversDevice(IDeviceID device); /** * Check if a device may have some measurements matched by the pattern. @@ -117,14 +118,14 @@ public static PipePattern parsePipePatternFromSourceParameters(PipeParameters so *

NOTE2: this is just a loose check and may have false positives. To further check if a * measurement matches the pattern, please use {@link PipePattern#matchesMeasurement} after this. */ - public abstract boolean mayOverlapWithDevice(String device); + public abstract boolean mayOverlapWithDevice(IDeviceID device); /** * Check if a full path with device and measurement can be matched by pattern. * *

NOTE: this is only called when {@link PipePattern#mayOverlapWithDevice} is true. */ - public abstract boolean matchesMeasurement(String device, String measurement); + public abstract boolean matchesMeasurement(IDeviceID device, String measurement); @Override public String toString() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java index 5e9c3a5aa441..577755cda84a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.file.metadata.IDeviceID; import java.util.Arrays; @@ -78,28 +79,31 @@ public boolean coversDb(String db) { } @Override - public boolean coversDevice(String device) { + public boolean coversDevice(final IDeviceID device) { + final String deviceStr = device.toString(); // for example, pattern is root.a.b and device is root.a.b.c // in this case, the extractor can be matched without checking the measurements - return pattern.length() <= device.length() && device.startsWith(pattern); + return pattern.length() <= deviceStr.length() && deviceStr.startsWith(pattern); } @Override - public boolean mayOverlapWithDevice(String device) { + public boolean mayOverlapWithDevice(final IDeviceID device) { + final String deviceStr = device.toString(); return ( // for example, pattern is root.a.b and device is root.a.b.c // in this case, the extractor can be matched without checking the measurements - pattern.length() <= device.length() && device.startsWith(pattern)) + pattern.length() <= deviceStr.length() && deviceStr.startsWith(pattern)) // for example, pattern is root.a.b.c and device is root.a.b // in this case, the extractor can be selected as candidate, but the measurements should // be checked further - || (pattern.length() > device.length() && pattern.startsWith(device)); + || (pattern.length() > deviceStr.length() && pattern.startsWith(deviceStr)); } @Override - public boolean matchesMeasurement(String device, String measurement) { + public boolean matchesMeasurement(final IDeviceID device, String measurement) { + final String deviceStr = device.toString(); // We assume that the device is already matched. - if (pattern.length() <= device.length()) { + if (pattern.length() <= deviceStr.length()) { return true; } @@ -109,9 +113,9 @@ public boolean matchesMeasurement(String device, String measurement) { String dotAndMeasurement = TsFileConstant.PATH_SEPARATOR + measurement; return // low cost check comes first - pattern.length() <= device.length() + dotAndMeasurement.length() + pattern.length() <= deviceStr.length() + dotAndMeasurement.length() // high cost check comes later - && dotAndMeasurement.startsWith(pattern.substring(device.length())); + && dotAndMeasurement.startsWith(pattern.substring(deviceStr.length())); } @Override From 7cbc16afccbe38b735756c5ba3827ad470089908 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 20 May 2024 13:25:13 +0800 Subject: [PATCH 02/10] Reverted finals --- .../PipeInsertNodeTabletInsertionEvent.java | 56 ++++++------ .../tablet/TabletInsertionDataContainer.java | 86 +++++++++---------- .../tsfile/TsFileInsertionDataContainer.java | 25 +++--- .../tsfile/TsFileInsertionPointCounter.java | 8 +- .../realtime/epoch/TsFileEpochManager.java | 6 +- .../pattern/CachedSchemaPatternMatcher.java | 10 +-- .../db/pipe/pattern/IoTDBPipePatternTest.java | 46 +++++----- .../pipe/pattern/PrefixPipePatternTest.java | 46 +++++----- 8 files changed, 138 insertions(+), 145 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 02eada5b671f..a7b640540ad6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -64,10 +64,10 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent private ProgressIndex progressIndex; public PipeInsertNodeTabletInsertionEvent( - final WALEntryHandler walEntryHandler, - final ProgressIndex progressIndex, - final boolean isAligned, - final boolean isGeneratedByPipe) { + WALEntryHandler walEntryHandler, + ProgressIndex progressIndex, + boolean isAligned, + boolean isGeneratedByPipe) { this( walEntryHandler, progressIndex, @@ -81,15 +81,15 @@ public PipeInsertNodeTabletInsertionEvent( } private PipeInsertNodeTabletInsertionEvent( - final WALEntryHandler walEntryHandler, - final ProgressIndex progressIndex, - final boolean isAligned, - final boolean isGeneratedByPipe, - final String pipeName, - final PipeTaskMeta pipeTaskMeta, - final PipePattern pattern, - final long startTime, - final long endTime) { + WALEntryHandler walEntryHandler, + ProgressIndex progressIndex, + boolean isAligned, + boolean isGeneratedByPipe, + String pipeName, + PipeTaskMeta pipeTaskMeta, + PipePattern pattern, + long startTime, + long endTime) { super(pipeName, pipeTaskMeta, pattern, startTime, endTime); this.walEntryHandler = walEntryHandler; this.progressIndex = progressIndex; @@ -116,11 +116,11 @@ public InsertNode getInsertNodeViaCacheIfPossible() { /////////////////////////// EnrichedEvent /////////////////////////// @Override - public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { try { PipeResourceManager.wal().pin(walEntryHandler); return true; - } catch (final Exception e) { + } catch (Exception e) { LOGGER.warn( String.format( "Increase reference count for memtable %d error. Holder Message: %s", @@ -131,7 +131,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa } @Override - public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { try { PipeResourceManager.wal().unpin(walEntryHandler); // Release the containers' memory. @@ -140,7 +140,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa dataContainers = null; } return true; - } catch (final Exception e) { + } catch (Exception e) { LOGGER.warn( String.format( "Decrease reference count for memtable %d error. Holder Message: %s", @@ -151,7 +151,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa } @Override - public void bindProgressIndex(final ProgressIndex progressIndex) { + public void bindProgressIndex(ProgressIndex progressIndex) { this.progressIndex = progressIndex; } @@ -162,11 +162,11 @@ public ProgressIndex getProgressIndex() { @Override public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( - final String pipeName, - final PipeTaskMeta pipeTaskMeta, - final PipePattern pattern, - final long startTime, - final long endTime) { + String pipeName, + PipeTaskMeta pipeTaskMeta, + PipePattern pattern, + long startTime, + long endTime) { return new PipeInsertNodeTabletInsertionEvent( walEntryHandler, progressIndex, @@ -210,7 +210,7 @@ public boolean mayEventTimeOverlappedWithTimeRange() { throw new UnSupportedDataTypeException( String.format("InsertNode type %s is not supported.", insertNode.getClass().getName())); } - } catch (final Exception e) { + } catch (Exception e) { LOGGER.warn( "Exception occurred when determining the event time of PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}]. Returning true to ensure data integrity.", this, @@ -224,8 +224,7 @@ public boolean mayEventTimeOverlappedWithTimeRange() { /////////////////////////// TabletInsertionEvent /////////////////////////// @Override - public Iterable processRowByRow( - final BiConsumer consumer) { + public Iterable processRowByRow(BiConsumer consumer) { return initDataContainers().stream() .map(tabletInsertionDataContainer -> tabletInsertionDataContainer.processRowByRow(consumer)) .flatMap(Collection::stream) @@ -233,8 +232,7 @@ public Iterable processRowByRow( } @Override - public Iterable processTablet( - final BiConsumer consumer) { + public Iterable processTablet(BiConsumer consumer) { return initDataContainers().stream() .map(tabletInsertionDataContainer -> tabletInsertionDataContainer.processTablet(consumer)) .flatMap(Collection::stream) @@ -285,7 +283,7 @@ private List initDataContainers() { } return dataContainers; - } catch (final Exception e) { + } catch (Exception e) { throw new PipeException("Initialize data container error.", e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index 662f71af2953..f3d4b58a8063 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -91,10 +91,10 @@ public class TabletInsertionDataContainer { } public TabletInsertionDataContainer( - final PipeTaskMeta pipeTaskMeta, - final EnrichedEvent sourceEvent, - final InsertNode insertNode, - final PipePattern pattern) { + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + InsertNode insertNode, + PipePattern pattern) { this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; @@ -109,11 +109,11 @@ public TabletInsertionDataContainer( } public TabletInsertionDataContainer( - final PipeTaskMeta pipeTaskMeta, - final EnrichedEvent sourceEvent, - final Tablet tablet, - final boolean isAligned, - final PipePattern pattern) { + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + Tablet tablet, + boolean isAligned, + PipePattern pattern) { this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; @@ -121,7 +121,7 @@ public TabletInsertionDataContainer( } @TestOnly - public TabletInsertionDataContainer(final InsertNode insertNode, final PipePattern pattern) { + public TabletInsertionDataContainer(InsertNode insertNode, PipePattern pattern) { this(null, null, insertNode, pattern); } @@ -135,7 +135,7 @@ public void markAsNeedToReport() { //////////////////////////// parse //////////////////////////// - private void parse(final InsertRowNode insertRowNode, final PipePattern pattern) { + private void parse(InsertRowNode insertRowNode, PipePattern pattern) { final int originColumnSize = insertRowNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; @@ -143,7 +143,7 @@ private void parse(final InsertRowNode insertRowNode, final PipePattern pattern) this.isAligned = insertRowNode.isAligned(); final long[] originTimestampColumn = new long[] {insertRowNode.getTime()}; - final List rowIndexList = generateRowIndexList(originTimestampColumn); + List rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); generateColumnIndexMapper( @@ -172,7 +172,7 @@ private void parse(final InsertRowNode insertRowNode, final PipePattern pattern) this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; - final BitMap bitMap = new BitMap(this.timestampColumn.length); + BitMap bitMap = new BitMap(this.timestampColumn.length); if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) { this.valueColumns[filteredColumnIndex] = null; bitMap.markAll(); @@ -202,7 +202,7 @@ private void parse(final InsertRowNode insertRowNode, final PipePattern pattern) } } - private void parse(final InsertTabletNode insertTabletNode, final PipePattern pattern) { + private void parse(InsertTabletNode insertTabletNode, PipePattern pattern) { final int originColumnSize = insertTabletNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; @@ -211,7 +211,7 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa final long[] originTimestampColumn = insertTabletNode.getTimes(); final int originRowSize = originTimestampColumn.length; - final List rowIndexList = generateRowIndexList(originTimestampColumn); + List rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); generateColumnIndexMapper( @@ -255,7 +255,7 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; - final BitMap bitMap = new BitMap(this.timestampColumn.length); + BitMap bitMap = new BitMap(this.timestampColumn.length); if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) { this.valueColumns[filteredColumnIndex] = null; bitMap.markAll(); @@ -285,7 +285,7 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa } } - private void parse(final Tablet tablet, final boolean isAligned, final PipePattern pattern) { + private void parse(Tablet tablet, boolean isAligned, PipePattern pattern) { final int originColumnSize = tablet.getSchemas().size(); final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; @@ -347,7 +347,7 @@ private void parse(final Tablet tablet, final boolean isAligned, final PipePatte this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList.get(i); this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; - final BitMap bitMap = new BitMap(this.timestampColumn.length); + BitMap bitMap = new BitMap(this.timestampColumn.length); if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) { this.valueColumns[filteredColumnIndex] = null; bitMap.markAll(); @@ -378,9 +378,9 @@ private void parse(final Tablet tablet, final boolean isAligned, final PipePatte } private void generateColumnIndexMapper( - final String[] originMeasurementList, - final PipePattern pattern, - final Integer[] originColumnIndex2FilteredColumnIndexMapperList) { + String[] originMeasurementList, + PipePattern pattern, + Integer[] originColumnIndex2FilteredColumnIndexMapperList) { final int originColumnSize = originMeasurementList.length; // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c @@ -417,7 +417,7 @@ private List generateRowIndexList(final long[] originTimestampColumn) { return generateFullRowIndexList(rowCount); } - final List rowIndexList = new ArrayList<>(); + List rowIndexList = new ArrayList<>(); // We assume that `originTimestampColumn` is ordered. if (originTimestampColumn[originTimestampColumn.length - 1] < sourceEvent.getStartTime() || originTimestampColumn[0] > sourceEvent.getEndTime()) { @@ -434,7 +434,7 @@ private List generateRowIndexList(final long[] originTimestampColumn) { return rowIndexList; } - private static List generateFullRowIndexList(final int rowCount) { + private static List generateFullRowIndexList(int rowCount) { if (rowCount <= CACHED_FULL_ROW_INDEX_LIST_ROW_COUNT_UPPER) { return cachedFullRowIndexList.get(rowCount); } @@ -442,20 +442,20 @@ private static List generateFullRowIndexList(final int rowCount) { } private static Object filterValueColumnsByRowIndexList( - @NonNull final TSDataType type, - @NonNull final Object originValueColumn, - @NonNull final List rowIndexList, - final boolean isSingleOriginValueColumn, - @NonNull final BitMap originNullValueColumnBitmap, - @NonNull final BitMap nullValueColumnBitmap /* output parameters */) { + @NonNull TSDataType type, + @NonNull Object originValueColumn, + @NonNull List rowIndexList, + boolean isSingleOriginValueColumn, + @NonNull BitMap originNullValueColumnBitmap, + @NonNull BitMap nullValueColumnBitmap /* output parameters */) { switch (type) { case INT32: { - final int[] intValueColumns = + int[] intValueColumns = isSingleOriginValueColumn ? new int[] {(int) originValueColumn} : (int[]) originValueColumn; - final int[] valueColumns = new int[rowIndexList.size()]; + int[] valueColumns = new int[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = 0; @@ -468,11 +468,11 @@ private static Object filterValueColumnsByRowIndexList( } case INT64: { - final long[] longValueColumns = + long[] longValueColumns = isSingleOriginValueColumn ? new long[] {(long) originValueColumn} : (long[]) originValueColumn; - final long[] valueColumns = new long[rowIndexList.size()]; + long[] valueColumns = new long[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = 0L; @@ -485,11 +485,11 @@ private static Object filterValueColumnsByRowIndexList( } case FLOAT: { - final float[] floatValueColumns = + float[] floatValueColumns = isSingleOriginValueColumn ? new float[] {(float) originValueColumn} : (float[]) originValueColumn; - final float[] valueColumns = new float[rowIndexList.size()]; + float[] valueColumns = new float[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = 0F; @@ -502,11 +502,11 @@ private static Object filterValueColumnsByRowIndexList( } case DOUBLE: { - final double[] doubleValueColumns = + double[] doubleValueColumns = isSingleOriginValueColumn ? new double[] {(double) originValueColumn} : (double[]) originValueColumn; - final double[] valueColumns = new double[rowIndexList.size()]; + double[] valueColumns = new double[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = 0D; @@ -519,11 +519,11 @@ private static Object filterValueColumnsByRowIndexList( } case BOOLEAN: { - final boolean[] booleanValueColumns = + boolean[] booleanValueColumns = isSingleOriginValueColumn ? new boolean[] {(boolean) originValueColumn} : (boolean[]) originValueColumn; - final boolean[] valueColumns = new boolean[rowIndexList.size()]; + boolean[] valueColumns = new boolean[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { valueColumns[i] = false; @@ -536,11 +536,11 @@ private static Object filterValueColumnsByRowIndexList( } case TEXT: { - final Binary[] binaryValueColumns = + Binary[] binaryValueColumns = isSingleOriginValueColumn ? new Binary[] {(Binary) originValueColumn} : (Binary[]) originValueColumn; - final Binary[] valueColumns = new Binary[rowIndexList.size()]; + Binary[] valueColumns = new Binary[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)]) || Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues()) @@ -561,7 +561,7 @@ private static Object filterValueColumnsByRowIndexList( //////////////////////////// process //////////////////////////// - public List processRowByRow(final BiConsumer consumer) { + public List processRowByRow(BiConsumer consumer) { if (valueColumns.length == 0 || timestampColumn.length == 0) { return Collections.emptyList(); } @@ -584,7 +584,7 @@ public List processRowByRow(final BiConsumer processTablet(final BiConsumer consumer) { + public List processTablet(BiConsumer consumer) { final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, sourceEvent); consumer.accept(convertToTablet(), rowCollector); return rowCollector.convertToTabletInsertionEvents(shouldReport); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index 3bb289d97db6..65f32b9fd3b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -78,18 +78,17 @@ public class TsFileInsertionDataContainer implements AutoCloseable { private boolean shouldParsePattern = false; public TsFileInsertionDataContainer( - final File tsFile, final PipePattern pattern, final long startTime, final long endTime) - throws IOException { + File tsFile, PipePattern pattern, long startTime, long endTime) throws IOException { this(tsFile, pattern, startTime, endTime, null, null); } public TsFileInsertionDataContainer( - final File tsFile, - final PipePattern pattern, - final long startTime, - final long endTime, - final PipeTaskMeta pipeTaskMeta, - final EnrichedEvent sourceEvent) + File tsFile, + PipePattern pattern, + long startTime, + long endTime, + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent) throws IOException { this.pattern = pattern; timeFilterExpression = @@ -136,14 +135,14 @@ public TsFileInsertionDataContainer( // No longer need this. Help GC. tsFileSequenceReader.clearCachedDeviceMetadata(); - } catch (final Exception e) { + } catch (Exception e) { close(); throw e; } } private Map> filterDeviceMeasurementsMapByPattern( - final Map> originalDeviceMeasurementsMap) { + Map> originalDeviceMeasurementsMap) { final Map> filteredDeviceMeasurementsMap = new HashMap<>(); for (Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { final IDeviceID deviceId = entry.getKey(); @@ -222,7 +221,7 @@ public boolean hasNext() { ((PlainDeviceID) entry.getKey()).toStringID(), entry.getValue(), timeFilterExpression); - } catch (final IOException e) { + } catch (IOException e) { close(); throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e); } @@ -278,7 +277,7 @@ public void close() { if (tsFileReader != null) { tsFileReader.close(); } - } catch (final IOException e) { + } catch (IOException e) { LOGGER.warn("Failed to close TsFileReader", e); } @@ -286,7 +285,7 @@ public void close() { if (tsFileSequenceReader != null) { tsFileSequenceReader.close(); } - } catch (final IOException e) { + } catch (IOException e) { LOGGER.warn("Failed to close TsFileSequenceReader", e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java index 386acf1758ec..7b6803313411 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java @@ -51,8 +51,7 @@ public class TsFileInsertionPointCounter implements AutoCloseable { private long count = 0; - public TsFileInsertionPointCounter(final File tsFile, final PipePattern pattern) - throws IOException { + public TsFileInsertionPointCounter(File tsFile, PipePattern pattern) throws IOException { this.pattern = pattern; try { @@ -80,8 +79,7 @@ private Map> filterDeviceMeasurementsMapByPattern() throw tsFileSequenceReader.getDeviceMeasurementsMap(); final Map> filteredDeviceMeasurementsMap = new HashMap<>(); - for (final Map.Entry> entry : - originalDeviceMeasurementsMap.entrySet()) { + for (Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { final IDeviceID deviceId = entry.getKey(); // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c @@ -162,7 +160,7 @@ public void close() { if (tsFileSequenceReader != null) { tsFileSequenceReader.close(); } - } catch (final IOException e) { + } catch (IOException e) { LOGGER.warn("Failed to close TsFileSequenceReader", e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java index 94b6b8c085eb..acae31cfa9b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java @@ -42,7 +42,7 @@ public class TsFileEpochManager { private final ConcurrentMap filePath2Epoch = new ConcurrentHashMap<>(); public PipeRealtimeEvent bindPipeTsFileInsertionEvent( - final PipeTsFileInsertionEvent event, final TsFileResource resource) { + PipeTsFileInsertionEvent event, TsFileResource resource) { final String filePath = resource.getTsFilePath(); // This would not happen, but just in case @@ -68,9 +68,7 @@ public PipeRealtimeEvent bindPipeTsFileInsertionEvent( } public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent( - final PipeInsertNodeTabletInsertionEvent event, - final InsertNode node, - final TsFileResource resource) { + PipeInsertNodeTabletInsertionEvent event, InsertNode node, TsFileResource resource) { final TsFileEpoch epoch = filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), TsFileEpoch::new); epoch.updateInsertNodeMinTime(node.getMinTime()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java index 546d8185c3a7..0585b215bfde 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java @@ -62,7 +62,7 @@ public CachedSchemaPatternMatcher() { } @Override - public void register(final PipeRealtimeDataRegionExtractor extractor) { + public void register(PipeRealtimeDataRegionExtractor extractor) { lock.writeLock().lock(); try { extractors.add(extractor); @@ -73,7 +73,7 @@ public void register(final PipeRealtimeDataRegionExtractor extractor) { } @Override - public void deregister(final PipeRealtimeDataRegionExtractor extractor) { + public void deregister(PipeRealtimeDataRegionExtractor extractor) { lock.writeLock().lock(); try { extractors.remove(extractor); @@ -94,7 +94,7 @@ public int getRegisterCount() { } @Override - public Set match(final PipeRealtimeEvent event) { + public Set match(PipeRealtimeEvent event) { final Set matchedExtractors = new HashSet<>(); lock.readLock().lock(); @@ -177,10 +177,10 @@ public Set match(final PipeRealtimeEvent event) return matchedExtractors; } - protected Set filterExtractorsByDevice(final IDeviceID device) { + protected Set filterExtractorsByDevice(IDeviceID device) { final Set filteredExtractors = new HashSet<>(); - for (final PipeRealtimeDataRegionExtractor extractor : extractors) { + for (PipeRealtimeDataRegionExtractor extractor : extractors) { // Return if the extractor only extract deletion if (!extractor.shouldExtractInsertion()) { continue; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java index d443577df26a..f886e6b518c3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java @@ -32,48 +32,48 @@ public class IoTDBPipePatternTest { @Test public void testIotdbPipePattern() { // Test legal and illegal pattern - final String[] legalPatterns = { + String[] legalPatterns = { "root", "root.db", "root.db.d1.s", "root.db.`1`", "root.*.d.*s.s", }; - final String[] illegalPatterns = { + String[] illegalPatterns = { "root.", "roo", "", "root..", "root./", }; - for (final String s : legalPatterns) { + for (String s : legalPatterns) { Assert.assertTrue(new IoTDBPipePattern(s).isLegal()); } - for (final String t : illegalPatterns) { + for (String t : illegalPatterns) { try { Assert.assertFalse(new IoTDBPipePattern(t).isLegal()); - } catch (final Exception e) { + } catch (Exception e) { Assert.assertTrue(e instanceof PipeException); } } // Test pattern cover db - final String db = "root.db"; - final String[] patternsCoverDb = { + String db = "root.db"; + String[] patternsCoverDb = { "root.**", "root.db.**", "root.*db*.**", }; - final String[] patternsNotCoverDb = { + String[] patternsNotCoverDb = { "root.db", "root.*", "root.*.*", "root.db.*.**", "root.db.d1", "root.**.db.**", }; - for (final String s : patternsCoverDb) { + for (String s : patternsCoverDb) { Assert.assertTrue(new IoTDBPipePattern(s).coversDb(db)); } - for (final String t : patternsNotCoverDb) { + for (String t : patternsNotCoverDb) { Assert.assertFalse(new IoTDBPipePattern(t).coversDb(db)); } - final IDeviceID device = new StringArrayDeviceID("root.db.d1"); + IDeviceID device = new StringArrayDeviceID("root.db.d1"); // Test pattern cover device - final String[] patternsCoverDevice = { + String[] patternsCoverDevice = { "root.**", "root.db.**", "root.*.*.*", "root.db.d1.*", "root.*db*.*d*.*", "root.**.*1.*", }; - final String[] patternsNotCoverDevice = { + String[] patternsNotCoverDevice = { "root.*", "root.*.*", "root.db.d1", "root.db.d2.*", "root.**.d2.**", }; - for (final String s : patternsCoverDevice) { + for (String s : patternsCoverDevice) { Assert.assertTrue(new IoTDBPipePattern(s).coversDevice(device)); } for (String t : patternsNotCoverDevice) { @@ -81,31 +81,31 @@ public void testIotdbPipePattern() { } // Test pattern may overlap with device - final String[] patternsOverlapWithDevice = { + String[] patternsOverlapWithDevice = { "root.db.**", "root.db.d1", "root.db.d1.*", "root.db.d1.s1", "root.**.d2.**", "root.*.d*.**", }; - final String[] patternsNotOverlapWithDevice = { + String[] patternsNotOverlapWithDevice = { "root.db.d2.**", "root.db2.d1.**", "root.db.db.d1.**", }; - for (final String s : patternsOverlapWithDevice) { + for (String s : patternsOverlapWithDevice) { Assert.assertTrue(new IoTDBPipePattern(s).mayOverlapWithDevice(device)); } - for (final String t : patternsNotOverlapWithDevice) { + for (String t : patternsNotOverlapWithDevice) { Assert.assertFalse(new IoTDBPipePattern(t).mayOverlapWithDevice(device)); } // Test pattern match measurement - final String measurement = "s1"; - final String[] patternsMatchMeasurement = { + String measurement = "s1"; + String[] patternsMatchMeasurement = { "root.db.d1.s1", "root.db.d1.*", }; - final String[] patternsNotMatchMeasurement = { + String[] patternsNotMatchMeasurement = { "root.db.d1", "root.db.d1", "root.db.d1.*.*", }; - for (final String s : patternsMatchMeasurement) { + for (String s : patternsMatchMeasurement) { Assert.assertTrue(new IoTDBPipePattern(s).matchesMeasurement(device, measurement)); } - for (final String t : patternsNotMatchMeasurement) { + for (String t : patternsNotMatchMeasurement) { Assert.assertFalse(new IoTDBPipePattern(t).matchesMeasurement(device, measurement)); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java index 6008303b3daf..750d9a87a057 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java @@ -31,76 +31,76 @@ public class PrefixPipePatternTest { @Test public void testPrefixPipePattern() { // Test legal and illegal pattern - final String[] legalPatterns = { + String[] legalPatterns = { "root", "root.", "root.db", "root.db.d1.s", "root.db.`1`", }; - final String[] illegalPatterns = { + String[] illegalPatterns = { "roo", "", "root..", "root./", }; - for (final String s : legalPatterns) { + for (String s : legalPatterns) { Assert.assertTrue(new PrefixPipePattern(s).isLegal()); } - for (final String t : illegalPatterns) { + for (String t : illegalPatterns) { Assert.assertFalse(new PrefixPipePattern(t).isLegal()); } // Test pattern cover db - final String db = "root.db"; - final String[] patternsCoverDb = { + String db = "root.db"; + String[] patternsCoverDb = { "root", "root.", "root.d", "root.db", }; - final String[] patternsNotCoverDb = { + String[] patternsNotCoverDb = { "root.**", "root.db.", }; - for (final String s : patternsCoverDb) { + for (String s : patternsCoverDb) { Assert.assertTrue(new PrefixPipePattern(s).coversDb(db)); } - for (final String t : patternsNotCoverDb) { + for (String t : patternsNotCoverDb) { Assert.assertFalse(new PrefixPipePattern(t).coversDb(db)); } - final IDeviceID device = new StringArrayDeviceID("root.db.d1"); + IDeviceID device = new StringArrayDeviceID("root.db.d1"); // Test pattern cover device - final String[] patternsCoverDevice = { + String[] patternsCoverDevice = { "root", "root.", "root.d", "root.db", "root.db.", "root.db.d", "root.db.d1", }; - final String[] patternsNotCoverDevice = { + String[] patternsNotCoverDevice = { "root.db.d1.", "root.db.d1.s1", "root.**", "root.db.d2", }; - for (final String s : patternsCoverDevice) { + for (String s : patternsCoverDevice) { Assert.assertTrue(new PrefixPipePattern(s).coversDevice(device)); } - for (final String t : patternsNotCoverDevice) { + for (String t : patternsNotCoverDevice) { Assert.assertFalse(new PrefixPipePattern(t).coversDevice(device)); } // Test pattern may overlap with device - final String[] patternsOverlapWithDevice = { + String[] patternsOverlapWithDevice = { "root", "root.db.d1", "root.db.d1.", "root.db.d1.s1", }; - final String[] patternsNotOverlapWithDevice = { + String[] patternsNotOverlapWithDevice = { "root.db.d2", "root.**", }; - for (final String s : patternsOverlapWithDevice) { + for (String s : patternsOverlapWithDevice) { Assert.assertTrue(new PrefixPipePattern(s).mayOverlapWithDevice(device)); } - for (final String t : patternsNotOverlapWithDevice) { + for (String t : patternsNotOverlapWithDevice) { Assert.assertFalse(new PrefixPipePattern(t).mayOverlapWithDevice(device)); } // Test pattern match measurement - final String measurement = "s1"; - final String[] patternsMatchMeasurement = { + String measurement = "s1"; + String[] patternsMatchMeasurement = { "root.db.d1", "root.db.d1.", "root.db.d1.s", "root.db.d1.s1", }; - final String[] patternsNotMatchMeasurement = { + String[] patternsNotMatchMeasurement = { "root.db.d1.s11", "root.db.d1.s2", }; - for (final String s : patternsMatchMeasurement) { + for (String s : patternsMatchMeasurement) { Assert.assertTrue(new PrefixPipePattern(s).matchesMeasurement(device, measurement)); } - for (final String t : patternsNotMatchMeasurement) { + for (String t : patternsNotMatchMeasurement) { Assert.assertFalse(new PrefixPipePattern(t).matchesMeasurement(device, measurement)); } } From cc7d4df178f09a8042475dedfa8dc9557c88e2b7 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 20 May 2024 15:30:22 +0800 Subject: [PATCH 03/10] Update TabletInsertionDataContainer.java --- .../tablet/TabletInsertionDataContainer.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index f3d4b58a8063..2e5ca32e42bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -63,6 +63,8 @@ public class TabletInsertionDataContainer { private final EnrichedEvent sourceEvent; // used to report progress and filter value columns by time range + // Used to preserve performance + private String deviceStr; private IDeviceID deviceId; private boolean isAligned; private MeasurementSchema[] measurementSchemaList; @@ -139,6 +141,8 @@ private void parse(InsertRowNode insertRowNode, PipePattern pattern) { final int originColumnSize = insertRowNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; + // The full path is always cached when device path is deserialized + this.deviceStr = insertRowNode.getDevicePath().getFullPath(); this.deviceId = insertRowNode.getDeviceID(); this.isAligned = insertRowNode.isAligned(); @@ -206,6 +210,8 @@ private void parse(InsertTabletNode insertTabletNode, PipePattern pattern) { final int originColumnSize = insertTabletNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; + // The full path is always cached when device path is deserialized + this.deviceStr = insertTabletNode.getDevicePath().getFullPath(); this.deviceId = insertTabletNode.getDeviceID(); this.isAligned = insertTabletNode.isAligned(); @@ -290,6 +296,7 @@ private void parse(Tablet tablet, boolean isAligned, PipePattern pattern) { final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; // Only support tree-model tablet + this.deviceStr = tablet.getDeviceId(); this.deviceId = new StringArrayDeviceID(tablet.getDeviceId()); this.isAligned = isAligned; @@ -571,7 +578,7 @@ public List processRowByRow(BiConsumer consumer.accept( new PipeRow( i, - deviceId.toString(), + getDeviceStr(), isAligned, measurementSchemaList, timestampColumn, @@ -598,7 +605,7 @@ public Tablet convertToTablet() { } final Tablet newTablet = - new Tablet(deviceId.toString(), Arrays.asList(measurementSchemaList), rowCount); + new Tablet(getDeviceStr(), Arrays.asList(measurementSchemaList), rowCount); newTablet.timestamps = timestampColumn; newTablet.bitMaps = nullValueColumnBitmaps; newTablet.values = valueColumns; @@ -608,4 +615,8 @@ public Tablet convertToTablet() { return tablet; } + + private String getDeviceStr() { + return Objects.nonNull(deviceStr) ? deviceStr : deviceId.toString(); + } } From 6c7f5a6498b97e2c2d21bf5780ead4cbb492ba01 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 20 May 2024 16:22:36 +0800 Subject: [PATCH 04/10] Added compatibility --- .../common/tablet/TabletInsertionDataContainer.java | 1 + .../common/tsfile/TsFileInsertionDataContainer.java | 2 +- .../tsfile/TsFileInsertionDataTabletIterator.java | 11 ++++++++--- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index 2e5ca32e42bb..8f059bd5fa05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -576,6 +576,7 @@ public List processRowByRow(BiConsumer final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, sourceEvent); for (int i = 0; i < rowCount; i++) { consumer.accept( + // Used for tree model new PipeRow( i, getDeviceStr(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index 65f32b9fd3b6..8fdeba2e1fac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -218,7 +218,7 @@ public boolean hasNext() { new TsFileInsertionDataTabletIterator( tsFileReader, measurementDataTypeMap, - ((PlainDeviceID) entry.getKey()).toStringID(), + entry.getKey(), entry.getValue(), timeFilterExpression); } catch (IOException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java index 1e028ae870e7..3985b04b8057 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java @@ -24,6 +24,7 @@ import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TsFileReader; import org.apache.tsfile.read.common.Field; import org.apache.tsfile.read.common.Path; @@ -47,7 +48,7 @@ public class TsFileInsertionDataTabletIterator implements Iterator { private final TsFileReader tsFileReader; private final Map measurementDataTypeMap; - private final String deviceId; + private final IDeviceID deviceId; private final List measurements; private final IExpression timeFilterExpression; @@ -57,7 +58,7 @@ public class TsFileInsertionDataTabletIterator implements Iterator { public TsFileInsertionDataTabletIterator( TsFileReader tsFileReader, Map measurementDataTypeMap, - String deviceId, + IDeviceID deviceId, List measurements, IExpression timeFilterExpression) throws IOException { @@ -117,7 +118,11 @@ private Tablet buildNextTablet() throws IOException { schemas.add(new MeasurementSchema(measurement, dataType)); } final Tablet tablet = - new Tablet(deviceId, schemas, PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); + new Tablet( + // Used for tree model + deviceId.toString(), + schemas, + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); tablet.initBitMaps(); while (queryDataSet.hasNext()) { From 2b04a4210ce29f6eb4c39caad6b477476920c2e3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 20 May 2024 17:36:13 +0800 Subject: [PATCH 05/10] fix --- .../event/common/tsfile/TsFileInsertionDataContainer.java | 4 ++-- .../dataregion/realtime/epoch/TsFileEpochManager.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index 8fdeba2e1fac..dccbba0da1d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -151,7 +151,7 @@ private Map> filterDeviceMeasurementsMapByPattern( // in this case, all data can be matched without checking the measurements if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) { if (!entry.getValue().isEmpty()) { - filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId), entry.getValue()); + filteredDeviceMeasurementsMap.put(deviceId, entry.getValue()); } } @@ -170,7 +170,7 @@ else if (pattern.mayOverlapWithDevice(deviceId)) { } if (!filteredMeasurements.isEmpty()) { - filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId), filteredMeasurements); + filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java index acae31cfa9b7..1b5e2e3218a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java @@ -19,12 +19,13 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch; -import com.google.common.base.Functions; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import com.google.common.base.Functions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 6f2e00a436dea2e7127fc37a0860cafd03313eaf Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 23 May 2024 15:49:24 +0800 Subject: [PATCH 06/10] Update TsFileInsertionDataContainer.java --- .../event/common/tsfile/TsFileInsertionDataContainer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index e8eeccd35b27..81e24a8f23e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -148,7 +148,7 @@ private Map> filterDeviceMeasurementsMapByPattern( // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c // in this case, all data can be matched without checking the measurements - if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceStr)) { + if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) { if (!entry.getValue().isEmpty()) { filteredDeviceMeasurementsMap.put(deviceId, entry.getValue()); } @@ -156,11 +156,11 @@ private Map> filterDeviceMeasurementsMapByPattern( // case 2: for example, pattern is root.a.b.c and device is root.a.b // in this case, we need to check the full path - else if (pattern.mayOverlapWithDevice(deviceStr)) { + else if (pattern.mayOverlapWithDevice(deviceId)) { final List filteredMeasurements = new ArrayList<>(); for (final String measurement : entry.getValue()) { - if (pattern.matchesMeasurement(deviceStr, measurement)) { + if (pattern.matchesMeasurement(deviceId, measurement)) { filteredMeasurements.add(measurement); } else { // Parse pattern iff there are measurements filtered out From c11d74cd77ce14d5efd4f36a5ef86ac9464dd654 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 24 May 2024 14:07:24 +0800 Subject: [PATCH 07/10] spotless --- .../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 4 +--- .../dataregion/realtime/epoch/TsFileEpochManager.java | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index f1dda9a86e80..fd4f12a4a633 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -309,9 +309,7 @@ public boolean shouldParsePattern() { ? ((InsertRowsNode) node) .getInsertRowNodeList().stream() .anyMatch( - insertRowNode -> - !pipePattern.coversDevice( - insertRowNode.getDeviceID())) + insertRowNode -> !pipePattern.coversDevice(insertRowNode.getDeviceID())) : !pipePattern.coversDevice(node.getDeviceID()))); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java index c8a942f286fb..e4244a1335e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java @@ -93,7 +93,7 @@ private Map getDevice2MeasurementsMapFromInsertRowsNode( return insertRowsNode.getInsertRowNodeList().stream() .collect( Collectors.toMap( - InsertNode::getDeviceID, + InsertNode::getDeviceID, InsertNode::getMeasurements, (oldMeasurements, newMeasurements) -> Stream.of(Arrays.asList(oldMeasurements), Arrays.asList(newMeasurements)) From 198bdac948a54f058b529db3ee9a263a3d3b24a0 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 May 2024 11:37:34 +0800 Subject: [PATCH 08/10] Update IoTDBPipePattern.java --- .../commons/pipe/pattern/IoTDBPipePattern.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java index cc158d767de2..b270a2b07001 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java @@ -120,14 +120,17 @@ public boolean matchesMeasurement(final IDeviceID device, final String measureme } /** - * Check if the {@link PipePattern} matches the given prefix path. - * - *

NOTE: In schema transmission, {@link #mayOverlapWithDevice(String)} can be used to detect - * whether the given path can act as a parent path of the {@link PipePattern}, and to transmit - * possibly used schemas like database creation and template setting. + * Check if the {@link PipePattern} matches the given prefix path. In schema transmission, this + * can be used to detect whether the given path can act as a parent path of the {@link + * PipePattern}, and to transmit possibly used schemas like database creation and template + * setting. */ public boolean matchPrefixPath(final String path) { - return mayOverlapWithDevice(path); + try { + return patternPartialPath.matchPrefixPath(new PartialPath(path)); + } catch (final IllegalPathException e) { + return false; + } } /** From d4f14d65d567d6a377e24921112f38bc8163cef5 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 May 2024 13:51:12 +0800 Subject: [PATCH 09/10] bug fix --- .../PipePlanPatternParseVisitor.java | 22 +++++++++++-------- .../PipeStatementPatternParseVisitor.java | 9 +++++--- .../CachedSchemaPatternMatcherTest.java | 21 ++++++++++++------ 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java index 79dbf04ba422..472b1de10739 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.Pair; import java.util.Arrays; @@ -74,7 +75,7 @@ public Optional visitPlan(final PlanNode node, final IoTDBPipePattern public Optional visitCreateTimeSeries( final CreateTimeSeriesNode node, final IoTDBPipePattern pattern) { return pattern.matchesMeasurement( - node.getPath().getDeviceString(), node.getPath().getMeasurement()) + node.getPath().getIDeviceID(), node.getPath().getMeasurement()) ? Optional.of(node) : Optional.empty(); } @@ -87,7 +88,8 @@ public Optional visitCreateAlignedTimeSeries( .filter( index -> pattern.matchesMeasurement( - node.getDevicePath().getFullPath(), node.getMeasurements().get(index))) + node.getDevicePath().getIDeviceIDAsFullDevice(), + node.getMeasurements().get(index))) .toArray(); return filteredIndexes.length > 0 ? Optional.of( @@ -115,7 +117,7 @@ public Optional visitCreateMultiTimeSeries( new Pair<>( entry.getKey(), trimMeasurementGroup( - entry.getKey().getFullPath(), entry.getValue(), pattern))) + entry.getKey().getIDeviceIDAsFullDevice(), entry.getValue(), pattern))) .filter(pair -> Objects.nonNull(pair.getRight())) .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); return !filteredMeasurementGroupMap.isEmpty() @@ -125,7 +127,7 @@ public Optional visitCreateMultiTimeSeries( } private static MeasurementGroup trimMeasurementGroup( - final String device, final MeasurementGroup group, final IoTDBPipePattern pattern) { + final IDeviceID device, final MeasurementGroup group, final IoTDBPipePattern pattern) { final int[] filteredIndexes = IntStream.range(0, group.size()) .filter(index -> pattern.matchesMeasurement(device, group.getMeasurements().get(index))) @@ -154,7 +156,7 @@ private static MeasurementGroup trimMeasurementGroup( public Optional visitAlterTimeSeries( final AlterTimeSeriesNode node, final IoTDBPipePattern pattern) { return pattern.matchesMeasurement( - node.getPath().getDeviceString(), node.getPath().getMeasurement()) + node.getPath().getIDeviceID(), node.getPath().getMeasurement()) ? Optional.of(node) : Optional.empty(); } @@ -165,7 +167,9 @@ public Optional visitInternalCreateTimeSeries( final MeasurementGroup group = pattern.matchPrefixPath(node.getDevicePath().getFullPath()) ? trimMeasurementGroup( - node.getDevicePath().getFullPath(), node.getMeasurementGroup(), pattern) + node.getDevicePath().getIDeviceIDAsFullDevice(), + node.getMeasurementGroup(), + pattern) : null; return Objects.nonNull(group) ? Optional.of( @@ -209,7 +213,7 @@ public Optional visitInternalCreateMultiTimeSeries( new Pair<>( entry.getValue().getLeft(), trimMeasurementGroup( - entry.getKey().getFullPath(), + entry.getKey().getIDeviceIDAsFullDevice(), entry.getValue().getRight(), pattern)))) .filter(pair -> Objects.nonNull(pair.getRight().getRight())) @@ -241,7 +245,7 @@ public Optional visitCreateLogicalView( .filter( entry -> pattern.matchesMeasurement( - entry.getKey().getDeviceString(), entry.getKey().getMeasurement())) + entry.getKey().getIDeviceID(), entry.getKey().getMeasurement())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return !filteredViewPathToSourceMap.isEmpty() ? Optional.of(new CreateLogicalViewNode(node.getPlanNodeId(), filteredViewPathToSourceMap)) @@ -256,7 +260,7 @@ public Optional visitAlterLogicalView( .filter( entry -> pattern.matchesMeasurement( - entry.getKey().getDeviceString(), entry.getKey().getMeasurement())) + entry.getKey().getIDeviceID(), entry.getKey().getMeasurement())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return !filteredViewPathToSourceMap.isEmpty() ? Optional.of(new AlterLogicalViewNode(node.getPlanNodeId(), filteredViewPathToSourceMap)) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java index 6535e6acfe80..26c22f89b3af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java @@ -59,7 +59,7 @@ public Optional visitNode( public Optional visitCreateTimeseries( final CreateTimeSeriesStatement statement, final IoTDBPipePattern pattern) { return pattern.matchesMeasurement( - statement.getPath().getDeviceString(), statement.getPath().getMeasurement()) + statement.getPath().getIDeviceID(), statement.getPath().getMeasurement()) ? Optional.of(statement) : Optional.empty(); } @@ -72,7 +72,7 @@ public Optional visitCreateAlignedTimeseries( .filter( index -> pattern.matchesMeasurement( - statement.getDevicePath().getFullPath(), + statement.getDevicePath().getIDeviceIDAsFullDevice(), statement.getMeasurements().get(index))) .toArray(); if (filteredIndexes.length == 0) { @@ -118,7 +118,10 @@ public Optional visitCreateLogicalView( .filter( index -> pattern.matchesMeasurement( - createLogicalViewStatement.getTargetPathList().get(index).getDeviceString(), + createLogicalViewStatement + .getTargetPathList() + .get(index) + .getIDeviceIDAsFullDevice(), createLogicalViewStatement.getTargetPathList().get(index).getMeasurement())) .toArray(); if (filteredIndexes.length == 0) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java index 1e7587153fb8..07855ebe829f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java @@ -29,6 +29,8 @@ import org.apache.iotdb.pipe.api.event.Event; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -85,7 +87,7 @@ public void testCachedMatcher() throws Exception { new PipeParameters( new HashMap() { { - put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root." + finalI1); + put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root.db" + finalI1); } }), new PipeTaskRuntimeConfiguration( @@ -101,7 +103,7 @@ public void testCachedMatcher() throws Exception { { put( PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, - "root." + finalI + "." + finalJ); + "root.db" + finalI + ".s" + finalJ); } }), new PipeTaskRuntimeConfiguration( @@ -116,18 +118,22 @@ public void testCachedMatcher() throws Exception { int epochNum = 10000; int deviceNum = 1000; int seriesNum = 100; - Map deviceMap = + Map deviceMap = IntStream.range(0, deviceNum) .mapToObj(String::valueOf) - .collect(Collectors.toMap(s -> "root." + s, s -> new String[0])); + .collect( + Collectors.toMap(s -> new StringArrayDeviceID("root.db" + s), s -> new String[0])); String[] measurements = - IntStream.range(0, seriesNum).mapToObj(String::valueOf).toArray(String[]::new); + IntStream.range(0, seriesNum).mapToObj(num -> "s" + num).toArray(String[]::new); long totalTime = 0; for (int i = 0; i < epochNum; i++) { for (int j = 0; j < deviceNum; j++) { PipeRealtimeEvent event = new PipeRealtimeEvent( - null, null, Collections.singletonMap("root." + i, measurements), null); + null, + null, + Collections.singletonMap(new StringArrayDeviceID("root.db" + i), measurements), + null); long startTime = System.currentTimeMillis(); matcher.match(event).forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); @@ -174,7 +180,8 @@ protected void doExtract(PipeRealtimeEvent event) { } else { match[0] = match[0] - || (getPatternString().startsWith(k) || k.startsWith(getPatternString())); + || (getPatternString().startsWith(k.toString()) + || k.toString().startsWith(getPatternString())); } }); Assert.assertTrue(match[0]); From a7a906fe70eb5ba1c17079889ab98d89291b0198 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 29 May 2024 19:07:22 +0800 Subject: [PATCH 10/10] Update SessionIT.java --- .../src/test/java/org/apache/iotdb/session/it/SessionIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java index 7ec02a955be5..05ee72f60762 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java @@ -269,7 +269,7 @@ public void testInsertStrRecord() { @Test public void testInsertTablet() { try (ISession session = EnvFactory.getEnv().getSessionConnection()) { - List schemaList = new ArrayList<>(); + List schemaList = new ArrayList<>(); String deviceId = "root.db.d1"; schemaList.add(new MeasurementSchema("s1", TSDataType.DATE)); schemaList.add(new MeasurementSchema("s2", TSDataType.TIMESTAMP));