diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index d7e33ee0a49f..7065324aa35d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -61,16 +61,25 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STREAMING_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE; @@ -84,13 +93,22 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_SNAPSHOT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STREAMING_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._EXTRACTOR_WATERMARK_INTERVAL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._SOURCE_WATERMARK_INTERVAL_KEY; public class IoTDBDataRegionExtractor extends IoTDBExtractor { @@ -225,7 +243,13 @@ public void validate(final PipeParameterValidator validator) throws Exception { .getBooleanOrDefault( Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE) - || validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, SOURCE_END_TIME_KEY)) { + || validator + .getParameters() + .hasAnyAttributes( + SOURCE_START_TIME_KEY, + EXTRACTOR_START_TIME_KEY, + SOURCE_END_TIME_KEY, + EXTRACTOR_END_TIME_KEY)) { validator.validateAttributeValueRange( validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY) ? EXTRACTOR_REALTIME_MODE_KEY @@ -239,26 +263,7 @@ public void validate(final PipeParameterValidator validator) throws Exception { EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE); } - // Validate source.start-time and source.end-time - if (validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, SOURCE_END_TIME_KEY) - && validator - .getParameters() - .hasAnyAttributes( - EXTRACTOR_HISTORY_ENABLE_KEY, - EXTRACTOR_REALTIME_ENABLE_KEY, - SOURCE_HISTORY_ENABLE_KEY, - SOURCE_REALTIME_ENABLE_KEY)) { - LOGGER.warn( - "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is invalid.", - SOURCE_START_TIME_KEY, - EXTRACTOR_START_TIME_KEY, - SOURCE_END_TIME_KEY, - EXTRACTOR_END_TIME_KEY, - SOURCE_HISTORY_START_TIME_KEY, - EXTRACTOR_HISTORY_START_TIME_KEY, - SOURCE_HISTORY_END_TIME_KEY, - EXTRACTOR_HISTORY_END_TIME_KEY); - } + checkInvalidParameters(validator.getParameters()); constructHistoricalExtractor(); constructRealtimeExtractor(validator.getParameters()); @@ -290,6 +295,120 @@ private void validatePattern(final TreePattern treePattern, final TablePattern t } } + private void checkInvalidParameters(final PipeParameters parameters) { + // Enable history and realtime if specifying start-time or end-time + if (parameters.hasAnyAttributes( + SOURCE_START_TIME_KEY, + EXTRACTOR_START_TIME_KEY, + SOURCE_END_TIME_KEY, + EXTRACTOR_END_TIME_KEY) + && parameters.hasAnyAttributes( + EXTRACTOR_HISTORY_ENABLE_KEY, + EXTRACTOR_REALTIME_ENABLE_KEY, + SOURCE_HISTORY_ENABLE_KEY, + SOURCE_REALTIME_ENABLE_KEY)) { + LOGGER.warn( + "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is invalid.", + SOURCE_START_TIME_KEY, + EXTRACTOR_START_TIME_KEY, + SOURCE_END_TIME_KEY, + EXTRACTOR_END_TIME_KEY, + SOURCE_HISTORY_START_TIME_KEY, + EXTRACTOR_HISTORY_START_TIME_KEY, + SOURCE_HISTORY_END_TIME_KEY, + EXTRACTOR_HISTORY_END_TIME_KEY); + } + + // Check coexistence of mode.snapshot and mode + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY) + && parameters.hasAnyAttributes(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODE_SNAPSHOT_KEY, + SOURCE_MODE_SNAPSHOT_KEY, + EXTRACTOR_MODE_KEY, + SOURCE_MODE_KEY); + } + + // Check coexistence of mode.streaming and realtime.mode + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) + && parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODE_STREAMING_KEY, + SOURCE_MODE_STREAMING_KEY, + EXTRACTOR_REALTIME_MODE_KEY, + SOURCE_REALTIME_MODE_KEY); + } + + // Check coexistence of mode.strict, history.loose-range and realtime.loose-range + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) { + if (parameters.hasAnyAttributes( + EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODE_STRICT_KEY, + SOURCE_MODE_STRICT_KEY, + EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, + SOURCE_HISTORY_LOOSE_RANGE_KEY); + } + if (parameters.hasAnyAttributes( + EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODE_STRICT_KEY, + SOURCE_MODE_STRICT_KEY, + EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, + SOURCE_REALTIME_LOOSE_RANGE_KEY); + } + } + + // Check coexistence of mods and mods.enable + if (parameters.hasAnyAttributes(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY) + && parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODS_KEY, + SOURCE_MODS_KEY, + EXTRACTOR_MODS_ENABLE_KEY, + SOURCE_MODS_ENABLE_KEY); + } + + // Check coexistence of watermark.interval-ms and watermark-interval-ms + if (parameters.hasAnyAttributes(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY) + && parameters.hasAnyAttributes( + _EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_WATERMARK_INTERVAL_KEY, + SOURCE_WATERMARK_INTERVAL_KEY, + _EXTRACTOR_WATERMARK_INTERVAL_KEY, + _SOURCE_WATERMARK_INTERVAL_KEY); + } + + // Check if specifying mode.snapshot or mode.streaming when disable realtime extractor + if (!parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), + EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) { + LOGGER.info( + "When '{}' ('{}') is set to false, specifying {} and {} is invalid.", + EXTRACTOR_REALTIME_ENABLE_KEY, + SOURCE_REALTIME_ENABLE_KEY, + EXTRACTOR_MODE_SNAPSHOT_KEY, + SOURCE_MODE_SNAPSHOT_KEY); + } + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)) { + LOGGER.info( + "When '{}' ('{}') is set to false, specifying {} and {} is invalid.", + EXTRACTOR_REALTIME_ENABLE_KEY, + SOURCE_REALTIME_ENABLE_KEY, + EXTRACTOR_MODE_STREAMING_KEY, + SOURCE_MODE_STREAMING_KEY); + } + } + } + private void constructHistoricalExtractor() { // Enable historical extractor by default historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor(); @@ -303,31 +422,59 @@ private void constructRealtimeExtractor(final PipeParameters parameters) EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor(); LOGGER.info( - "Pipe: '{}' is set to false, use heartbeat realtime extractor.", - EXTRACTOR_REALTIME_ENABLE_KEY); + "Pipe: '{}' ('{}') is set to false, use heartbeat realtime extractor.", + EXTRACTOR_REALTIME_ENABLE_KEY, + SOURCE_REALTIME_ENABLE_KEY); return; } + final boolean isSnapshotMode; + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) { + isSnapshotMode = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY), + EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE); + } else { + final String extractorModeValue = + parameters.getStringOrDefault( + Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE); + isSnapshotMode = + extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE) + || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); + } + // Use heartbeat only extractor if enable snapshot mode - final String extractorModeValue = - parameters.getStringOrDefault( - Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE); - if (extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE) - || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)) { + if (isSnapshotMode) { realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor(); - LOGGER.info( - "Pipe: '{}' is set to {}, use heartbeat realtime extractor.", - EXTRACTOR_MODE_KEY, - EXTRACTOR_MODE_SNAPSHOT_VALUE); + LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime extractor."); return; } // Use hybrid mode by default - if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { + if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) + && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { checkWalEnable(parameters); realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); LOGGER.info( - "Pipe: '{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE_KEY); + "Pipe: '{}' ('{}') and '{}' ('{}') is not set, use hybrid mode by default.", + EXTRACTOR_MODE_STREAMING_KEY, + SOURCE_MODE_STREAMING_KEY, + EXTRACTOR_REALTIME_MODE_KEY, + SOURCE_REALTIME_MODE_KEY); + return; + } + + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)) { + final boolean isStreamingMode = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY), + EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE); + if (isStreamingMode) { + checkWalEnable(parameters); + realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); + } else { + realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor(); + } return; } @@ -381,20 +528,27 @@ public void customize( realtimeExtractor.customize(parameters, configuration); // Set watermark injector + long watermarkIntervalInMs = EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE; if (parameters.hasAnyAttributes( EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) { - final long watermarkIntervalInMs = + watermarkIntervalInMs = parameters.getLongOrDefault( - Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY), + Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY), EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE); - if (watermarkIntervalInMs > 0) { - watermarkInjector = new DataRegionWatermarkInjector(regionId, watermarkIntervalInMs); - LOGGER.info( - "Pipe {}@{}: Set watermark injector with interval {} ms.", - pipeName, - regionId, - watermarkInjector.getInjectionIntervalInMs()); - } + } else if (parameters.hasAnyAttributes( + _EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) { + watermarkIntervalInMs = + parameters.getLongOrDefault( + Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY), + EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE); + } + if (watermarkIntervalInMs > 0) { + watermarkInjector = new DataRegionWatermarkInjector(regionId, watermarkIntervalInMs); + LOGGER.info( + "Pipe {}@{}: Set watermark injector with interval {} ms.", + pipeName, + regionId, + watermarkInjector.getInjectionIntervalInMs()); } // register metric after generating taskID diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 3acc589ca313..f56ec5c7199a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -76,15 +76,21 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; @@ -144,28 +150,39 @@ public void validate(final PipeParameterValidator validator) { throw new PipeParameterNotValidException(e.getMessage()); } - final String extractorHistoryLooseRangeValue = - parameters - .getStringOrDefault( - Arrays.asList(EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY), - EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE) - .trim(); - if (EXTRACTOR_HISTORY_LOOSE_RANGE_ALL_VALUE.equalsIgnoreCase(extractorHistoryLooseRangeValue)) { - sloppyTimeRange = true; - sloppyPattern = true; + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) { + final boolean isStrictMode = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY), + EXTRACTOR_MODE_STRICT_DEFAULT_VALUE); + sloppyTimeRange = !isStrictMode; + sloppyPattern = !isStrictMode; } else { - final Set sloppyOptionSet = - Arrays.stream(extractorHistoryLooseRangeValue.split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .map(String::toLowerCase) - .collect(Collectors.toSet()); - sloppyTimeRange = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE); - sloppyPattern = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE); - if (!sloppyOptionSet.isEmpty()) { - throw new PipeParameterNotValidException( - String.format( - "Parameters in set %s are not allowed in 'history.loose-range'", sloppyOptionSet)); + final String extractorHistoryLooseRangeValue = + parameters + .getStringOrDefault( + Arrays.asList(EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY), + EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE) + .trim(); + if (EXTRACTOR_HISTORY_LOOSE_RANGE_ALL_VALUE.equalsIgnoreCase( + extractorHistoryLooseRangeValue)) { + sloppyTimeRange = true; + sloppyPattern = true; + } else { + final Set sloppyOptionSet = + Arrays.stream(extractorHistoryLooseRangeValue.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .map(String::toLowerCase) + .collect(Collectors.toSet()); + sloppyTimeRange = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE); + sloppyPattern = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE); + if (!sloppyOptionSet.isEmpty()) { + throw new PipeParameterNotValidException( + String.format( + "Parameters in set %s are not allowed in 'history.loose-range'", + sloppyOptionSet)); + } } } @@ -333,12 +350,21 @@ public void customize( } } - shouldTransferModFile = - parameters.getBooleanOrDefault( - Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), - EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE - || // Should extract deletion - listeningOptionPair.getRight()); + if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { + shouldTransferModFile = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY), + EXTRACTOR_MODS_DEFAULT_VALUE + || // Should extract deletion + listeningOptionPair.getRight()); + } else { + shouldTransferModFile = + parameters.getBooleanOrDefault( + Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), + EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE + || // Should extract deletion + listeningOptionPair.getRight()); + } final String extractorModeValue = parameters.getStringOrDefault( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index d992d87b3d78..2924eb4fc0ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -63,8 +63,10 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_ALL_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY; @@ -73,6 +75,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; @@ -240,10 +243,17 @@ public void customize( PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); - shouldTransferModFile = - parameters.getBooleanOrDefault( - Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), - EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion); + if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { + shouldTransferModFile = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY), + EXTRACTOR_MODS_DEFAULT_VALUE || shouldExtractDeletion); + } else { + shouldTransferModFile = + parameters.getBooleanOrDefault( + Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), + EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion); + } if (LOGGER.isInfoEnabled()) { LOGGER.info( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java index 59f8cb498c98..f5a2a84c4739 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.PathUtils; @@ -71,6 +70,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant._PROCESSOR_OUTPUT_SERIES_KEY; + public class TwoStageCountProcessor implements PipeProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageCountProcessor.class); @@ -98,10 +100,16 @@ public class TwoStageCountProcessor implements PipeProcessor { @Override public void validate(PipeParameterValidator validator) throws Exception { - validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY); + checkInvalidParameters(validator.getParameters()); + + final String rawOutputSeries; + if (!validator.getParameters().hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)) { + validator.validateRequiredAttribute(_PROCESSOR_OUTPUT_SERIES_KEY); + rawOutputSeries = validator.getParameters().getString(_PROCESSOR_OUTPUT_SERIES_KEY); + } else { + rawOutputSeries = validator.getParameters().getString(PROCESSOR_OUTPUT_SERIES_KEY); + } - final String rawOutputSeries = - validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY); try { PathUtils.isLegalPath(rawOutputSeries); } catch (IllegalPathException e) { @@ -109,6 +117,17 @@ public void validate(PipeParameterValidator validator) throws Exception { } } + private void checkInvalidParameters(final PipeParameters parameters) { + // Check coexistence of output.series and output-series + if (parameters.hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY) + && parameters.hasAttribute(_PROCESSOR_OUTPUT_SERIES_KEY)) { + LOGGER.warn( + "When {} is specified, specifying {} is invalid.", + PROCESSOR_OUTPUT_SERIES_KEY, + _PROCESSOR_OUTPUT_SERIES_KEY); + } + } + @Override public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) throws Exception { @@ -119,8 +138,7 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati regionId = runtimeEnvironment.getRegionId(); pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta(); - outputSeries = - new PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY)); + outputSeries = new PartialPath(parameters.getString(_PROCESSOR_OUTPUT_SERIES_KEY)); if (Objects.nonNull(pipeTaskMeta) && Objects.nonNull(pipeTaskMeta.getProgressIndex())) { if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index 1ddca0c3edff..f1e32337368b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -82,6 +82,9 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_MODS_ENABLE_KEY = "extractor.mods.enable"; public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable"; public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false; + public static final String EXTRACTOR_MODS_KEY = "extractor.mods"; + public static final String SOURCE_MODS_KEY = "source.mods"; + public static final boolean EXTRACTOR_MODS_DEFAULT_VALUE = EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; public static final String EXTRACTOR_REALTIME_ENABLE_KEY = "extractor.realtime.enable"; public static final String SOURCE_REALTIME_ENABLE_KEY = "source.realtime.enable"; @@ -101,16 +104,29 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_ALL_VALUE = "all"; public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE = ""; + public static final String EXTRACTOR_MODE_STREAMING_KEY = "extractor.mode.streaming"; + public static final String SOURCE_MODE_STREAMING_KEY = "source.mode.streaming"; + public static final boolean EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE = true; + public static final String EXTRACTOR_MODE_STRICT_KEY = "extractor.mode.strict"; + public static final String SOURCE_MODE_STRICT_KEY = "source.mode.strict"; + public static final boolean EXTRACTOR_MODE_STRICT_DEFAULT_VALUE = true; + public static final String EXTRACTOR_MODE_SNAPSHOT_KEY = "extractor.mode.snapshot"; + public static final String SOURCE_MODE_SNAPSHOT_KEY = "source.mode.snapshot"; + public static final boolean EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE = false; + public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time"; public static final String SOURCE_START_TIME_KEY = "source.start-time"; public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time"; public static final String SOURCE_END_TIME_KEY = "source.end-time"; - public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark-interval-ms"; - public static final String SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark-interval-ms"; + public static final String _EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark-interval-ms"; + public static final String _SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark-interval-ms"; public static final long EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE = -1; // -1 means no watermark + public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark.interval-ms"; + public static final String SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark.interval-ms"; ///////////////////// pipe consensus ///////////////////// + public static final String EXTRACTOR_CONSENSUS_GROUP_ID_KEY = "extractor.consensus.group-id"; public static final String EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY = "extractor.consensus.sender-dn-id"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java index 22bc87b2917b..f8aef880bd96 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java @@ -77,7 +77,8 @@ public class PipeProcessorConstant { public static final long PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE = Long.MAX_VALUE; - public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series"; + public static final String _PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series"; + public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output.series"; private PipeProcessorConstant() { throw new IllegalStateException("Utility class"); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java index cafc2a5288b8..7358ce32ec57 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java @@ -64,7 +64,7 @@ public boolean isRoot() { } /** - * Interpret from source parameters and get a {@link PipePattern}. + * Interpret from source parameters and get a {@link TreePattern}. * * @return The interpreted {@link TreePattern} which is not {@code null}. */