Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu committed Jun 17, 2024
1 parent b687d55 commit b6318f8
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,13 @@ public void testRealtimeLooseRange() throws Exception {
senderEnv,
Arrays.asList(
"insert into root.db.d1 (time, at1, at2)" + " values (1000, 1, 2), (3000, 3, 4)",
"flush"))) {
return;
}

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db1.d1 (time, at1, at2)" + " values (1000, 1, 2), (3000, 3, 4)",
"flush"))) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ public boolean isGeneratedByPipe() {
public boolean mayEventTimeOverlappedWithTimeRange() {
return enrichedEvent.mayEventTimeOverlappedWithTimeRange();
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
return enrichedEvent.mayEventPathsOverlappedWithPattern();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
return true;
}

/////////////////////////////// Whether to print ///////////////////////////////

public boolean isShouldPrintMessage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
}
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
final String deviceId = getDeviceId();
return Objects.isNull(deviceId) || pipePattern.mayOverlapWithDevice(deviceId);
}

/////////////////////////// TabletInsertionEvent ///////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
return startTime <= timestamps[timestamps.length - 1] && timestamps[0] <= endTime;
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
final String deviceId = getDeviceId();
return Objects.isNull(deviceId) || pipePattern.mayOverlapWithDevice(deviceId);
}

public void markAsNeedToReport() {
this.needToReport = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
return true;
}

@Override
public void reportProgress() {
PipeAgent.task().markCompleted(pipeName, dataRegionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,25 @@
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileInsertionEvent {
Expand Down Expand Up @@ -159,10 +165,6 @@ public boolean waitForTsFileClose() throws InterruptedException {
return !resource.isEmpty();
}

public TsFileResource getResource() {
return resource;
}

public File getTsFile() {
return tsFile;
}
Expand Down Expand Up @@ -281,6 +283,34 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
: resource.getFileStartTime() <= endTime;
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
if (!resource.isClosed()) {
return true;
}

try {
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
PipeResourceManager.tsfile()
.getDeviceIsAlignedMapFromCache(
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
final Set<IDeviceID> deviceSet =
Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() : resource.getDevices();
return deviceSet.stream()
.anyMatch(
// TODO: use IDeviceID
deviceID ->
pipePattern.mayOverlapWithDevice(((PlainDeviceID) deviceID).toStringID()));
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}: failed to get devices from TsFile {}, extract it anyway",
pipeName,
resource.getTsFilePath(),
e);
return true;
}
}

/////////////////////////// TsFileInsertionEvent ///////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
return event.mayEventTimeOverlappedWithTimeRange();
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
return event.mayEventPathsOverlappedWithPattern();
}

@Override
public String toString() {
return "PipeRealtimeEvent{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,13 @@
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeTimePartitionListener;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
Expand All @@ -50,18 +45,14 @@
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.utils.Pair;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -307,16 +298,21 @@ public final void extract(final PipeRealtimeEvent event) {

// 1. Check if time parsing is necessary. If not, it means that the timestamps of the data
// contained in this event are definitely within the time range [start time, end time].
// Otherwise,
// 2. Check if the timestamps of the data contained in this event intersect with the time range.
// If there is no intersection, it indicates that this data will be filtered out by the
// extractor, and the extract process is skipped.
if (!event.shouldParseTime() || event.getEvent().mayEventTimeOverlappedWithTimeRange()) {
// 2. Check if pattern parsing is necessary. If not, it means that the paths of the data
// contained in this event are definitely covered by the pattern.
// 3. Check if the event's data timestamps may intersect with the time range. If not, it means
// that the data timestamps of this event are definitely not within the time range.
// 4. Check if the event's data paths may intersect with the pattern. If not, it means that the
// data of this event is definitely not overlapped with the pattern.
if (!event.shouldParseTime()
|| !event.shouldParsePattern()
|| event.getEvent().mayEventTimeOverlappedWithTimeRange()
|| event.getEvent().mayEventPathsOverlappedWithPattern()) {
if (sloppyTimeRange) {
// only skip parsing time for events whose data timestamps may intersect with the time range
event.skipParsingTime();
}
if (sloppyPattern && mayEventPathsOverlappedWithPattern(event)) {
if (sloppyPattern) {
// only skip parsing pattern for events whose data paths may intersect with the pattern
event.skipParsingPattern();
}
Expand All @@ -333,49 +329,6 @@ public final void extract(final PipeRealtimeEvent event) {
}
}

private boolean mayEventPathsOverlappedWithPattern(final PipeRealtimeEvent realtimeEvent) {
final EnrichedEvent event = realtimeEvent.getEvent();

if (event instanceof PipeInsertNodeTabletInsertionEvent) {
final String deviceId = ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId();
return Objects.isNull(deviceId) || pipePattern.mayOverlapWithDevice(deviceId);
}

if (event instanceof PipeTsFileInsertionEvent) {
final TsFileResource resource = ((PipeTsFileInsertionEvent) event).getResource();
if (!resource.isClosed()) {
return true;
}

try {
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
PipeResourceManager.tsfile()
.getDeviceIsAlignedMapFromCache(
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(
resource.getTsFile()));
final Set<IDeviceID> deviceSet =
Objects.nonNull(deviceIsAlignedMap)
? deviceIsAlignedMap.keySet()
: resource.getDevices();
return deviceSet.stream()
.anyMatch(
// TODO: use IDeviceID
deviceID ->
pipePattern.mayOverlapWithDevice(((PlainDeviceID) deviceID).toStringID()));
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to get devices from TsFile {}, extract it anyway",
pipeName,
dataRegionId,
resource.getTsFilePath(),
e);
return true;
}
}

return false;
}

protected abstract void doExtract(final PipeRealtimeEvent event);

protected void extractHeartbeat(final PipeRealtimeEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ public boolean needToCommit() {

public abstract boolean mayEventTimeOverlappedWithTimeRange();

public abstract boolean mayEventPathsOverlappedWithPattern();

public void setCommitterKeyAndCommitId(final String committerKey, final long commitId) {
this.committerKey = committerKey;
this.commitId = commitId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
return true;
}

/////////////////////////////// Type parsing ///////////////////////////////

public String toSealTypeString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
return true;
}

/////////////////////////// Object ///////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,9 @@ public boolean isGeneratedByPipe() {
public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}

@Override
public boolean mayEventPathsOverlappedWithPattern() {
return true;
}
}

0 comments on commit b6318f8

Please sign in to comment.