Skip to content

Commit

Permalink
Pipe: record database name in PipeInsertionEvent (#13712)
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu authored Oct 10, 2024
1 parent 1b4d1fb commit d25cee0
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void process(final Event event, final EventCollector eventCollector) thro
tablet.addTimestamp(0, System.currentTimeMillis());
tablet.addValue(aggregateSeries.getMeasurement(), 0, writePointCount.get());
eventCollector.collect(
new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null, false));
new PipeRawTabletInsertionEvent(null, tablet, false, null, 0, null, null, false));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.pipe.api.access.Row;
Expand Down Expand Up @@ -102,6 +103,10 @@ private void collectTabletInsertionEvent() {
if (tablet != null) {
tabletInsertionEventList.add(
new PipeRawTabletInsertionEvent(
// TODO: non-PipeInsertionEvent sourceEvent is not supported?
sourceEvent instanceof PipeInsertionEvent
? ((PipeInsertionEvent) sourceEvent).getTreeModelDatabaseName()
: null,
tablet,
isAligned,
sourceEvent == null ? null : sourceEvent.getPipeName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
Expand All @@ -51,7 +51,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent
implements TabletInsertionEvent {

private static final Logger LOGGER =
Expand All @@ -68,12 +68,14 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
private ProgressIndex progressIndex;

public PipeInsertNodeTabletInsertionEvent(
final String databaseName,
final WALEntryHandler walEntryHandler,
final PartialPath devicePath,
final ProgressIndex progressIndex,
final boolean isAligned,
final boolean isGeneratedByPipe) {
this(
databaseName,
walEntryHandler,
devicePath,
progressIndex,
Expand All @@ -89,6 +91,7 @@ public PipeInsertNodeTabletInsertionEvent(
}

private PipeInsertNodeTabletInsertionEvent(
final String databaseName,
final WALEntryHandler walEntryHandler,
final PartialPath devicePath,
final ProgressIndex progressIndex,
Expand All @@ -101,7 +104,15 @@ private PipeInsertNodeTabletInsertionEvent(
final TablePattern tablePattern,
final long startTime,
final long endTime) {
super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime);
super(
pipeName,
creationTime,
pipeTaskMeta,
treePattern,
tablePattern,
startTime,
endTime,
databaseName);
this.walEntryHandler = walEntryHandler;
// Record device path here so there's no need to get it from InsertNode cache later.
this.devicePath = devicePath;
Expand Down Expand Up @@ -187,6 +198,7 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP
final long startTime,
final long endTime) {
return new PipeInsertNodeTabletInsertionEvent(
getTreeModelDatabaseName(),
walEntryHandler,
devicePath,
progressIndex,
Expand Down Expand Up @@ -372,6 +384,7 @@ public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
.map(
container ->
new PipeRawTabletInsertionEvent(
getTreeModelDatabaseName(),
container.convertToTablet(),
container.isAligned(),
pipeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
Expand All @@ -40,7 +41,8 @@
import java.util.Objects;
import java.util.function.BiConsumer;

public class PipeRawTabletInsertionEvent extends EnrichedEvent implements TabletInsertionEvent {
public class PipeRawTabletInsertionEvent extends PipeInsertionEvent
implements TabletInsertionEvent {

private Tablet tablet;
private String deviceId; // Only used when the tablet is released.
Expand All @@ -56,6 +58,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
private volatile ProgressIndex overridingProgressIndex;

private PipeRawTabletInsertionEvent(
final String databaseName,
final Tablet tablet,
final boolean isAligned,
final EnrichedEvent sourceEvent,
Expand All @@ -67,14 +70,23 @@ private PipeRawTabletInsertionEvent(
final TablePattern tablePattern,
final long startTime,
final long endTime) {
super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime);
super(
pipeName,
creationTime,
pipeTaskMeta,
treePattern,
tablePattern,
startTime,
endTime,
databaseName);
this.tablet = Objects.requireNonNull(tablet);
this.isAligned = isAligned;
this.sourceEvent = sourceEvent;
this.needToReport = needToReport;
}

public PipeRawTabletInsertionEvent(
final String databaseName,
final Tablet tablet,
final boolean isAligned,
final String pipeName,
Expand All @@ -83,6 +95,7 @@ public PipeRawTabletInsertionEvent(
final EnrichedEvent sourceEvent,
final boolean needToReport) {
this(
databaseName,
tablet,
isAligned,
sourceEvent,
Expand All @@ -98,13 +111,26 @@ public PipeRawTabletInsertionEvent(

@TestOnly
public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned) {
this(tablet, isAligned, null, false, null, 0, null, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
this(
null,
tablet,
isAligned,
null,
false,
null,
0,
null,
null,
null,
Long.MIN_VALUE,
Long.MAX_VALUE);
}

@TestOnly
public PipeRawTabletInsertionEvent(
final Tablet tablet, final boolean isAligned, final TreePattern treePattern) {
this(
null,
tablet,
isAligned,
null,
Expand All @@ -121,7 +147,7 @@ public PipeRawTabletInsertionEvent(
@TestOnly
public PipeRawTabletInsertionEvent(
final Tablet tablet, final long startTime, final long endTime) {
this(tablet, false, null, false, null, 0, null, null, null, startTime, endTime);
this(null, tablet, false, null, false, null, 0, null, null, null, startTime, endTime);
}

@Override
Expand Down Expand Up @@ -188,6 +214,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final long startTime,
final long endTime) {
return new PipeRawTabletInsertionEvent(
getTreeModelDatabaseName(),
tablet,
isAligned,
sourceEvent,
Expand Down Expand Up @@ -286,7 +313,14 @@ public long count() {

public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
return new PipeRawTabletInsertionEvent(
convertToTablet(), isAligned, pipeName, creationTime, pipeTaskMeta, this, needToReport);
getTreeModelDatabaseName(),
convertToTablet(),
isAligned,
pipeName,
creationTime,
pipeTaskMeta,
this,
needToReport);
}

public boolean hasNoNeedParsingAndIsEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
Expand All @@ -50,7 +50,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileInsertionEvent {
public class PipeTsFileInsertionEvent extends PipeInsertionEvent implements TsFileInsertionEvent {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);

Expand All @@ -76,12 +76,14 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
private volatile ProgressIndex overridingProgressIndex;

public PipeTsFileInsertionEvent(
final String databaseName,
final TsFileResource resource,
final boolean isLoaded,
final boolean isGeneratedByPipe,
final boolean isGeneratedByHistoricalExtractor) {
// The modFile must be copied before the event is assigned to the listening pipes
this(
databaseName,
resource,
true,
isLoaded,
Expand All @@ -97,6 +99,7 @@ public PipeTsFileInsertionEvent(
}

public PipeTsFileInsertionEvent(
final String databaseName,
final TsFileResource resource,
final boolean isWithMod,
final boolean isLoaded,
Expand All @@ -109,7 +112,15 @@ public PipeTsFileInsertionEvent(
final TablePattern tablePattern,
final long startTime,
final long endTime) {
super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime);
super(
pipeName,
creationTime,
pipeTaskMeta,
treePattern,
tablePattern,
startTime,
endTime,
databaseName);

this.resource = resource;
tsFile = resource.getTsFile();
Expand Down Expand Up @@ -328,6 +339,7 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep
final long startTime,
final long endTime) {
return new PipeTsFileInsertionEvent(
getTreeModelDatabaseName(),
resource,
isWithMod,
isLoaded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand All @@ -42,7 +42,7 @@ public abstract class TsFileInsertionDataContainer implements AutoCloseable {
protected final GlobalTimeExpression timeFilterExpression; // used to filter data

protected final PipeTaskMeta pipeTaskMeta; // used to report progress
protected final EnrichedEvent sourceEvent; // used to report progress
protected final PipeInsertionEvent sourceEvent; // used to report progress

protected final PipeMemoryBlock allocatedMemoryBlockForTablet;

Expand All @@ -53,7 +53,7 @@ protected TsFileInsertionDataContainer(
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent) {
final PipeInsertionEvent sourceEvent) {
this.pattern = pattern;
timeFilterExpression =
(startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
Expand Down Expand Up @@ -80,7 +80,7 @@ public TsFileInsertionQueryDataContainer(
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent)
final PipeInsertionEvent sourceEvent)
throws IOException {
this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null);
}
Expand All @@ -91,7 +91,7 @@ public TsFileInsertionQueryDataContainer(
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent,
final PipeInsertionEvent sourceEvent,
final Map<IDeviceID, Boolean> deviceIsAlignedMap)
throws IOException {
super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
Expand Down Expand Up @@ -307,6 +307,7 @@ public TabletInsertionEvent next() {
if (!hasNext()) {
next =
new PipeRawTabletInsertionEvent(
sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null,
tablet,
isAligned,
sourceEvent != null ? sourceEvent.getPipeName() : null,
Expand All @@ -318,6 +319,7 @@ public TabletInsertionEvent next() {
} else {
next =
new PipeRawTabletInsertionEvent(
sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null,
tablet,
isAligned,
sourceEvent != null ? sourceEvent.getPipeName() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
Expand Down Expand Up @@ -94,7 +94,7 @@ public TsFileInsertionScanDataContainer(
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent)
final PipeInsertionEvent sourceEvent)
throws IOException {
super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);

Expand Down Expand Up @@ -134,6 +134,7 @@ public TabletInsertionEvent next() {
final boolean hasNext = hasNext();
try {
return new PipeRawTabletInsertionEvent(
sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null,
tablet,
currentIsAligned,
sourceEvent != null ? sourceEvent.getPipeName() : null,
Expand Down
Loading

0 comments on commit d25cee0

Please sign in to comment.