Skip to content

Commit

Permalink
Pipe: add parameter realtime.loose-range to support subscription loos…
Browse files Browse the repository at this point in the history
…e range semantics
  • Loading branch information
VGalaxies committed May 23, 2024
1 parent 0ebac6b commit 6a6d0a5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;

public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
Expand Down Expand Up @@ -97,6 +100,8 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {

private boolean shouldTransferModFile; // Whether to transfer mods

private boolean sloppyTimeRange; // true to disable time range filter after extraction

// This queue is used to store pending events extracted by the method extract(). The method
// supply() will poll events from this queue and send them to the next pipe plugin.
protected final UnboundedBlockingPendingQueue<Event> pendingQueue =
Expand Down Expand Up @@ -194,6 +199,19 @@ public void customize(
parameters.getBooleanOrDefault(
Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion);

sloppyTimeRange =
Arrays.stream(
parameters
.getStringOrDefault(
Arrays.asList(
EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY),
"")
.split(","))
.map(String::trim)
.map(String::toLowerCase)
.collect(Collectors.toSet())
.contains("time");
}

@Override
Expand Down Expand Up @@ -259,6 +277,10 @@ public final void extract(final PipeRealtimeEvent event) {
// If there is no intersection, it indicates that this data will be filtered out by the
// extractor, and the extract process is skipped.
if (!event.shouldParseTime() || event.getEvent().mayEventTimeOverlappedWithTimeRange()) {
if (sloppyTimeRange) {
// only skip parsing time for events whose data timestamps may intersect with the time range
event.skipParsingTime();
}
doExtract(event);
} else {
event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE = "forced-log";
public static final String EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE = "stream";
public static final String EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE = "batch";
public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_KEY = "extractor.realtime.loose-range";
public static final String SOURCE_REALTIME_LOOSE_RANGE_KEY = "source.realtime.loose-range";

public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time";
public static final String SOURCE_START_TIME_KEY = "source.start-time";
Expand Down

0 comments on commit 6a6d0a5

Please sign in to comment.