diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index 22a24dc387a2..d809bd72a818 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -56,13 +56,16 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { @@ -97,6 +100,8 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { private boolean shouldTransferModFile; // Whether to transfer mods + private boolean sloppyTimeRange; // true to disable time range filter after extraction + // This queue is used to store pending events extracted by the method extract(). The method // supply() will poll events from this queue and send them to the next pipe plugin. protected final UnboundedBlockingPendingQueue pendingQueue = @@ -194,6 +199,19 @@ public void customize( parameters.getBooleanOrDefault( Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion); + + sloppyTimeRange = + Arrays.stream( + parameters + .getStringOrDefault( + Arrays.asList( + EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY), + "") + .split(",")) + .map(String::trim) + .map(String::toLowerCase) + .collect(Collectors.toSet()) + .contains("time"); } @Override @@ -259,6 +277,10 @@ public final void extract(final PipeRealtimeEvent event) { // If there is no intersection, it indicates that this data will be filtered out by the // extractor, and the extract process is skipped. if (!event.shouldParseTime() || event.getEvent().mayEventTimeOverlappedWithTimeRange()) { + if (sloppyTimeRange) { + // only skip parsing time for events whose data timestamps may intersect with the time range + event.skipParsingTime(); + } doExtract(event); } else { event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), false); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index 6a60d77cd0e9..b5ee9c153dcc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -79,6 +79,8 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE = "forced-log"; public static final String EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE = "stream"; public static final String EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE = "batch"; + public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_KEY = "extractor.realtime.loose-range"; + public static final String SOURCE_REALTIME_LOOSE_RANGE_KEY = "source.realtime.loose-range"; public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time"; public static final String SOURCE_START_TIME_KEY = "source.start-time";