diff --git a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java index a8c71d0fa59f..a0ea64a04576 100644 --- a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java +++ b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java @@ -75,7 +75,7 @@ public void process(final Event event, final EventCollector eventCollector) thro tablet.addTimestamp(0, System.currentTimeMillis()); tablet.addValue(aggregateSeries.getMeasurement(), 0, writePointCount.get()); eventCollector.collect( - new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null, false)); + new PipeRawTabletInsertionEvent(null, tablet, false, null, 0, null, null, false)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java index 28c2d920fc14..40f31eb827c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.pipe.api.access.Row; @@ -102,6 +103,10 @@ private void collectTabletInsertionEvent() { if (tablet != null) { tabletInsertionEventList.add( new PipeRawTabletInsertionEvent( + // TODO: non-PipeInsertionEvent sourceEvent is not supported? + sourceEvent instanceof PipeInsertionEvent + ? ((PipeInsertionEvent) sourceEvent).getTreeModelDatabaseName() + : null, tablet, isAligned, sourceEvent == null ? null : sourceEvent.getPipeName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 9f719642da39..4f6157928fa5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; -import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; @@ -51,7 +51,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; -public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent +public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent implements TabletInsertionEvent { private static final Logger LOGGER = @@ -68,12 +68,14 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent private ProgressIndex progressIndex; public PipeInsertNodeTabletInsertionEvent( + final String databaseName, final WALEntryHandler walEntryHandler, final PartialPath devicePath, final ProgressIndex progressIndex, final boolean isAligned, final boolean isGeneratedByPipe) { this( + databaseName, walEntryHandler, devicePath, progressIndex, @@ -89,6 +91,7 @@ public PipeInsertNodeTabletInsertionEvent( } private PipeInsertNodeTabletInsertionEvent( + final String databaseName, final WALEntryHandler walEntryHandler, final PartialPath devicePath, final ProgressIndex progressIndex, @@ -101,7 +104,15 @@ private PipeInsertNodeTabletInsertionEvent( final TablePattern tablePattern, final long startTime, final long endTime) { - super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); + super( + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + startTime, + endTime, + databaseName); this.walEntryHandler = walEntryHandler; // Record device path here so there's no need to get it from InsertNode cache later. this.devicePath = devicePath; @@ -187,6 +198,7 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP final long startTime, final long endTime) { return new PipeInsertNodeTabletInsertionEvent( + getTreeModelDatabaseName(), walEntryHandler, devicePath, progressIndex, @@ -372,6 +384,7 @@ public List toRawTabletInsertionEvents() { .map( container -> new PipeRawTabletInsertionEvent( + getTreeModelDatabaseName(), container.convertToTablet(), container.isAligned(), pipeName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 013ccb2579eb..d09d9579c7ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -40,7 +41,8 @@ import java.util.Objects; import java.util.function.BiConsumer; -public class PipeRawTabletInsertionEvent extends EnrichedEvent implements TabletInsertionEvent { +public class PipeRawTabletInsertionEvent extends PipeInsertionEvent + implements TabletInsertionEvent { private Tablet tablet; private String deviceId; // Only used when the tablet is released. @@ -56,6 +58,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet private volatile ProgressIndex overridingProgressIndex; private PipeRawTabletInsertionEvent( + final String databaseName, final Tablet tablet, final boolean isAligned, final EnrichedEvent sourceEvent, @@ -67,7 +70,15 @@ private PipeRawTabletInsertionEvent( final TablePattern tablePattern, final long startTime, final long endTime) { - super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); + super( + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + startTime, + endTime, + databaseName); this.tablet = Objects.requireNonNull(tablet); this.isAligned = isAligned; this.sourceEvent = sourceEvent; @@ -75,6 +86,7 @@ private PipeRawTabletInsertionEvent( } public PipeRawTabletInsertionEvent( + final String databaseName, final Tablet tablet, final boolean isAligned, final String pipeName, @@ -83,6 +95,7 @@ public PipeRawTabletInsertionEvent( final EnrichedEvent sourceEvent, final boolean needToReport) { this( + databaseName, tablet, isAligned, sourceEvent, @@ -98,13 +111,26 @@ public PipeRawTabletInsertionEvent( @TestOnly public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned) { - this(tablet, isAligned, null, false, null, 0, null, null, null, Long.MIN_VALUE, Long.MAX_VALUE); + this( + null, + tablet, + isAligned, + null, + false, + null, + 0, + null, + null, + null, + Long.MIN_VALUE, + Long.MAX_VALUE); } @TestOnly public PipeRawTabletInsertionEvent( final Tablet tablet, final boolean isAligned, final TreePattern treePattern) { this( + null, tablet, isAligned, null, @@ -121,7 +147,7 @@ public PipeRawTabletInsertionEvent( @TestOnly public PipeRawTabletInsertionEvent( final Tablet tablet, final long startTime, final long endTime) { - this(tablet, false, null, false, null, 0, null, null, null, startTime, endTime); + this(null, tablet, false, null, false, null, 0, null, null, null, startTime, endTime); } @Override @@ -188,6 +214,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final long startTime, final long endTime) { return new PipeRawTabletInsertionEvent( + getTreeModelDatabaseName(), tablet, isAligned, sourceEvent, @@ -286,7 +313,14 @@ public long count() { public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() { return new PipeRawTabletInsertionEvent( - convertToTablet(), isAligned, pipeName, creationTime, pipeTaskMeta, this, needToReport); + getTreeModelDatabaseName(), + convertToTablet(), + isAligned, + pipeName, + creationTime, + pipeTaskMeta, + this, + needToReport); } public boolean hasNoNeedParsingAndIsEmpty() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index c78ea9703e4a..55ce63431855 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -24,7 +24,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; -import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider; @@ -50,7 +50,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileInsertionEvent { +public class PipeTsFileInsertionEvent extends PipeInsertionEvent implements TsFileInsertionEvent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileInsertionEvent.class); @@ -76,12 +76,14 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns private volatile ProgressIndex overridingProgressIndex; public PipeTsFileInsertionEvent( + final String databaseName, final TsFileResource resource, final boolean isLoaded, final boolean isGeneratedByPipe, final boolean isGeneratedByHistoricalExtractor) { // The modFile must be copied before the event is assigned to the listening pipes this( + databaseName, resource, true, isLoaded, @@ -97,6 +99,7 @@ public PipeTsFileInsertionEvent( } public PipeTsFileInsertionEvent( + final String databaseName, final TsFileResource resource, final boolean isWithMod, final boolean isLoaded, @@ -109,7 +112,15 @@ public PipeTsFileInsertionEvent( final TablePattern tablePattern, final long startTime, final long endTime) { - super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); + super( + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + startTime, + endTime, + databaseName); this.resource = resource; tsFile = resource.getTsFile(); @@ -328,6 +339,7 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep final long startTime, final long endTime) { return new PipeTsFileInsertionEvent( + getTreeModelDatabaseName(), resource, isWithMod, isLoaded, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java index 9c1e435f68ff..3ff454d97eb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java @@ -21,7 +21,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; -import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -42,7 +42,7 @@ public abstract class TsFileInsertionDataContainer implements AutoCloseable { protected final GlobalTimeExpression timeFilterExpression; // used to filter data protected final PipeTaskMeta pipeTaskMeta; // used to report progress - protected final EnrichedEvent sourceEvent; // used to report progress + protected final PipeInsertionEvent sourceEvent; // used to report progress protected final PipeMemoryBlock allocatedMemoryBlockForTablet; @@ -53,7 +53,7 @@ protected TsFileInsertionDataContainer( final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, - final EnrichedEvent sourceEvent) { + final PipeInsertionEvent sourceEvent) { this.pattern = pattern; timeFilterExpression = (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java index d7c81a92e193..fecd6eb63455 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java @@ -22,7 +22,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; -import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; @@ -80,7 +80,7 @@ public TsFileInsertionQueryDataContainer( final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, - final EnrichedEvent sourceEvent) + final PipeInsertionEvent sourceEvent) throws IOException { this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null); } @@ -91,7 +91,7 @@ public TsFileInsertionQueryDataContainer( final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, - final EnrichedEvent sourceEvent, + final PipeInsertionEvent sourceEvent, final Map deviceIsAlignedMap) throws IOException { super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent); @@ -307,6 +307,7 @@ public TabletInsertionEvent next() { if (!hasNext()) { next = new PipeRawTabletInsertionEvent( + sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null, tablet, isAligned, sourceEvent != null ? sourceEvent.getPipeName() : null, @@ -318,6 +319,7 @@ public TabletInsertionEvent next() { } else { next = new PipeRawTabletInsertionEvent( + sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null, tablet, isAligned, sourceEvent != null ? sourceEvent.getPipeName() : null, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java index 5bd0e7144136..8f92ecea4cc5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java @@ -21,7 +21,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; -import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -94,7 +94,7 @@ public TsFileInsertionScanDataContainer( final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, - final EnrichedEvent sourceEvent) + final PipeInsertionEvent sourceEvent) throws IOException { super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent); @@ -134,6 +134,7 @@ public TabletInsertionEvent next() { final boolean hasNext = hasNext(); try { return new PipeRawTabletInsertionEvent( + sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null, tablet, currentIsAligned, sourceEvent != null ? sourceEvent.getPipeName() : null, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index 3ef134a96cbd..9bb94e2e48fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -35,17 +35,23 @@ public class PipeRealtimeEventFactory { private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager(); public static PipeRealtimeEvent createRealtimeEvent( - final TsFileResource resource, final boolean isLoaded, final boolean isGeneratedByPipe) { + final String databaseName, + final TsFileResource resource, + final boolean isLoaded, + final boolean isGeneratedByPipe) { return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent( - new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe, false), resource); + new PipeTsFileInsertionEvent(databaseName, resource, isLoaded, isGeneratedByPipe, false), + resource); } public static PipeRealtimeEvent createRealtimeEvent( + final String databaseName, final WALEntryHandler walEntryHandler, final InsertNode insertNode, final TsFileResource resource) { return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent( new PipeInsertNodeTabletInsertionEvent( + databaseName, walEntryHandler, insertNode.getTargetPath(), insertNode.getProgressIndex(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index cfe8b639fa23..3985c85f07ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -608,6 +608,7 @@ public synchronized Event supply() { final PipeTsFileInsertionEvent event = new PipeTsFileInsertionEvent( + resource.getDatabaseName(), resource, shouldTransferModFile, false, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 3eb118701a78..ed6d5c18573c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -96,6 +96,7 @@ public synchronized void stopListenAndAssign( public void listenToTsFile( String dataRegionId, + String databaseName, TsFileResource tsFileResource, boolean isLoaded, boolean isGeneratedByPipe) { @@ -111,11 +112,13 @@ public void listenToTsFile( } assigner.publishToAssign( - PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource, isLoaded, isGeneratedByPipe)); + PipeRealtimeEventFactory.createRealtimeEvent( + databaseName, tsFileResource, isLoaded, isGeneratedByPipe)); } public void listenToInsertNode( String dataRegionId, + String databaseName, WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource tsFileResource) { @@ -131,7 +134,8 @@ public void listenToInsertNode( } assigner.publishToAssign( - PipeRealtimeEventFactory.createRealtimeEvent(walEntryHandler, insertNode, tsFileResource)); + PipeRealtimeEventFactory.createRealtimeEvent( + databaseName, walEntryHandler, insertNode, tsFileResource)); } public void listenToHeartbeat(boolean shouldPrintMessage) { @@ -141,6 +145,7 @@ public void listenToHeartbeat(boolean shouldPrintMessage) { PipeRealtimeEventFactory.createRealtimeEvent(key, shouldPrintMessage))); } + // TODO: record database name in enriched events? public void listenToDeleteData(DeleteDataNode node) { dataRegionId2Assigner.forEach( (key, value) -> value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node))); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java index 39eb3b62a788..067bb9603da7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java @@ -248,8 +248,9 @@ private void collectGlobalCountIfNecessary(EventCollector eventCollector) throws tablet.addTimestamp(0, timestampCountPair.left); tablet.addValue(outputSeries.getMeasurement(), 0, timestampCountPair.right); + // TODO: table model database name is not supported eventCollector.collect( - new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null, false)); + new PipeRawTabletInsertionEvent(null, tablet, false, null, 0, null, null, false)); PipeCombineHandlerManager.getInstance() .updateLastCombinedValue(pipeName, creationTime, timestampCountPair); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 2abcfe8ebc32..dedafba9dc39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3273,7 +3273,7 @@ private boolean loadTsFileToUnSequence( // Listen before the tsFile is added into tsFile manager to avoid it being compacted PipeInsertionDataNodeListener.getInstance() - .listenToTsFile(dataRegionId, tsFileResource, true, isGeneratedByPipe); + .listenToTsFile(dataRegionId, databaseName, tsFileResource, true, isGeneratedByPipe); // help tsfile resource degrade tsFileResourceManager.registerSealedTsFileResource(tsFileResource); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 5adcb14bae93..706081ce936d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -327,6 +327,7 @@ public void insert(InsertRowNode insertRowNode, long[] costsForMetrics) PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( dataRegionInfo.getDataRegion().getDataRegionId(), + dataRegionInfo.getDataRegion().getDatabaseName(), walFlushListener.getWalEntryHandler(), insertRowNode, tsFileResource); @@ -414,6 +415,7 @@ public void insert(InsertRowsNode insertRowsNode, long[] costsForMetrics) PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( dataRegionInfo.getDataRegion().getDataRegionId(), + dataRegionInfo.getDataRegion().getDatabaseName(), walFlushListener.getWalEntryHandler(), insertRowsNode, tsFileResource); @@ -562,6 +564,7 @@ public void insertTablet( PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( dataRegionInfo.getDataRegion().getDataRegionId(), + dataRegionInfo.getDataRegion().getDatabaseName(), walFlushListener.getWalEntryHandler(), insertTabletNode, tsFileResource); @@ -1274,6 +1277,7 @@ public Future asyncClose() { PipeInsertionDataNodeListener.getInstance() .listenToTsFile( dataRegionInfo.getDataRegion().getDataRegionId(), + dataRegionInfo.getDataRegion().getDatabaseName(), tsFileResource, false, tmpMemTable.isTotallyGeneratedByPipe()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java index 00d5835a48c8..5437910dbdb3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java @@ -283,6 +283,7 @@ private Future write2DataRegion(int writeNum, String dataRegionId, int startN PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( + dataRegionId, dataRegionId, mock(WALEntryHandler.class), new InsertRowNode( @@ -297,6 +298,7 @@ private Future write2DataRegion(int writeNum, String dataRegionId, int startN resource); PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( + dataRegionId, dataRegionId, mock(WALEntryHandler.class), new InsertRowNode( @@ -310,7 +312,7 @@ private Future write2DataRegion(int writeNum, String dataRegionId, int startN false), resource); PipeInsertionDataNodeListener.getInstance() - .listenToTsFile(dataRegionId, resource, false, false); + .listenToTsFile(dataRegionId, dataRegionId, resource, false, false); } }); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeInsertionEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeInsertionEvent.java new file mode 100644 index 000000000000..938f6e189dc4 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeInsertionEvent.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.event; + +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; + +public abstract class PipeInsertionEvent extends EnrichedEvent { + + private final String treeModelDatabaseName; + private String tableModelDatabaseName; // lazy initialization + + protected PipeInsertionEvent( + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final TreePattern treePattern, + final TablePattern tablePattern, + final long startTime, + final long endTime, + final String treeModelDatabaseName, + final String tableModelDatabaseName) { + super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime); + this.treeModelDatabaseName = treeModelDatabaseName; + this.tableModelDatabaseName = tableModelDatabaseName; + } + + protected PipeInsertionEvent( + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final TreePattern treePattern, + final TablePattern tablePattern, + final long startTime, + final long endTime, + final String databaseNameFromDataRegion) { + this( + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + startTime, + endTime, + databaseNameFromDataRegion, + null); + } + + public String getTreeModelDatabaseName() { + return treeModelDatabaseName; + } + + public String getTableModelDatabaseName() { + return tableModelDatabaseName == null + ? tableModelDatabaseName = + treeModelDatabaseName != null && treeModelDatabaseName.startsWith("root.") + ? treeModelDatabaseName.substring(5) + : treeModelDatabaseName + : tableModelDatabaseName; + } +}