Skip to content

Commit

Permalink
Pipe: support new synonym pipe parameters for table model (#13799)
Browse files Browse the repository at this point in the history
Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
VGalaxies and SteveYurongSu authored Oct 18, 2024
1 parent 2b8d6c6 commit 2e5b5f1
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 85 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,21 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT;
import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
Expand Down Expand Up @@ -144,28 +150,39 @@ public void validate(final PipeParameterValidator validator) {
throw new PipeParameterNotValidException(e.getMessage());
}

final String extractorHistoryLooseRangeValue =
parameters
.getStringOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY),
EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE)
.trim();
if (EXTRACTOR_HISTORY_LOOSE_RANGE_ALL_VALUE.equalsIgnoreCase(extractorHistoryLooseRangeValue)) {
sloppyTimeRange = true;
sloppyPattern = true;
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) {
final boolean isStrictMode =
parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY),
EXTRACTOR_MODE_STRICT_DEFAULT_VALUE);
sloppyTimeRange = !isStrictMode;
sloppyPattern = !isStrictMode;
} else {
final Set<String> sloppyOptionSet =
Arrays.stream(extractorHistoryLooseRangeValue.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(String::toLowerCase)
.collect(Collectors.toSet());
sloppyTimeRange = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE);
sloppyPattern = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE);
if (!sloppyOptionSet.isEmpty()) {
throw new PipeParameterNotValidException(
String.format(
"Parameters in set %s are not allowed in 'history.loose-range'", sloppyOptionSet));
final String extractorHistoryLooseRangeValue =
parameters
.getStringOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY),
EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE)
.trim();
if (EXTRACTOR_HISTORY_LOOSE_RANGE_ALL_VALUE.equalsIgnoreCase(
extractorHistoryLooseRangeValue)) {
sloppyTimeRange = true;
sloppyPattern = true;
} else {
final Set<String> sloppyOptionSet =
Arrays.stream(extractorHistoryLooseRangeValue.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(String::toLowerCase)
.collect(Collectors.toSet());
sloppyTimeRange = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE);
sloppyPattern = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE);
if (!sloppyOptionSet.isEmpty()) {
throw new PipeParameterNotValidException(
String.format(
"Parameters in set %s are not allowed in 'history.loose-range'",
sloppyOptionSet));
}
}
}

Expand Down Expand Up @@ -333,12 +350,21 @@ public void customize(
}
}

shouldTransferModFile =
parameters.getBooleanOrDefault(
Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE
|| // Should extract deletion
listeningOptionPair.getRight());
if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
shouldTransferModFile =
parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY),
EXTRACTOR_MODS_DEFAULT_VALUE
|| // Should extract deletion
listeningOptionPair.getRight());
} else {
shouldTransferModFile =
parameters.getBooleanOrDefault(
Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE
|| // Should extract deletion
listeningOptionPair.getRight());
}

final String extractorModeValue =
parameters.getStringOrDefault(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@

import static com.google.common.base.MoreObjects.toStringHelper;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_ALL_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY;
Expand All @@ -73,6 +75,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;

Expand Down Expand Up @@ -240,10 +243,17 @@ public void customize(
PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);

shouldTransferModFile =
parameters.getBooleanOrDefault(
Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion);
if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
shouldTransferModFile =
parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY),
EXTRACTOR_MODS_DEFAULT_VALUE || shouldExtractDeletion);
} else {
shouldTransferModFile =
parameters.getBooleanOrDefault(
Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion);
}

if (LOGGER.isInfoEnabled()) {
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.PathUtils;
Expand Down Expand Up @@ -71,6 +70,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant._PROCESSOR_OUTPUT_SERIES_KEY;

public class TwoStageCountProcessor implements PipeProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageCountProcessor.class);
Expand Down Expand Up @@ -98,17 +100,34 @@ public class TwoStageCountProcessor implements PipeProcessor {

@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
checkInvalidParameters(validator.getParameters());

final String rawOutputSeries;
if (!validator.getParameters().hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)) {
validator.validateRequiredAttribute(_PROCESSOR_OUTPUT_SERIES_KEY);
rawOutputSeries = validator.getParameters().getString(_PROCESSOR_OUTPUT_SERIES_KEY);
} else {
rawOutputSeries = validator.getParameters().getString(PROCESSOR_OUTPUT_SERIES_KEY);
}

final String rawOutputSeries =
validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
try {
PathUtils.isLegalPath(rawOutputSeries);
} catch (IllegalPathException e) {
throw new IllegalArgumentException("Illegal output series path: " + rawOutputSeries);
}
}

private void checkInvalidParameters(final PipeParameters parameters) {
// Check coexistence of output.series and output-series
if (parameters.hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)
&& parameters.hasAttribute(_PROCESSOR_OUTPUT_SERIES_KEY)) {
LOGGER.warn(
"When {} is specified, specifying {} is invalid.",
PROCESSOR_OUTPUT_SERIES_KEY,
_PROCESSOR_OUTPUT_SERIES_KEY);
}
}

@Override
public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
throws Exception {
Expand All @@ -119,8 +138,7 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati
regionId = runtimeEnvironment.getRegionId();
pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();

outputSeries =
new PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY));
outputSeries = new PartialPath(parameters.getString(_PROCESSOR_OUTPUT_SERIES_KEY));

if (Objects.nonNull(pipeTaskMeta) && Objects.nonNull(pipeTaskMeta.getProgressIndex())) {
if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_MODS_ENABLE_KEY = "extractor.mods.enable";
public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable";
public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;
public static final String EXTRACTOR_MODS_KEY = "extractor.mods";
public static final String SOURCE_MODS_KEY = "source.mods";
public static final boolean EXTRACTOR_MODS_DEFAULT_VALUE = EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;

public static final String EXTRACTOR_REALTIME_ENABLE_KEY = "extractor.realtime.enable";
public static final String SOURCE_REALTIME_ENABLE_KEY = "source.realtime.enable";
Expand All @@ -101,16 +104,29 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_ALL_VALUE = "all";
public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE = "";

public static final String EXTRACTOR_MODE_STREAMING_KEY = "extractor.mode.streaming";
public static final String SOURCE_MODE_STREAMING_KEY = "source.mode.streaming";
public static final boolean EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE = true;
public static final String EXTRACTOR_MODE_STRICT_KEY = "extractor.mode.strict";
public static final String SOURCE_MODE_STRICT_KEY = "source.mode.strict";
public static final boolean EXTRACTOR_MODE_STRICT_DEFAULT_VALUE = true;
public static final String EXTRACTOR_MODE_SNAPSHOT_KEY = "extractor.mode.snapshot";
public static final String SOURCE_MODE_SNAPSHOT_KEY = "source.mode.snapshot";
public static final boolean EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE = false;

public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time";
public static final String SOURCE_START_TIME_KEY = "source.start-time";
public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time";
public static final String SOURCE_END_TIME_KEY = "source.end-time";

public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark-interval-ms";
public static final String SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark-interval-ms";
public static final String _EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark-interval-ms";
public static final String _SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark-interval-ms";
public static final long EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE = -1; // -1 means no watermark
public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark.interval-ms";
public static final String SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark.interval-ms";

///////////////////// pipe consensus /////////////////////

public static final String EXTRACTOR_CONSENSUS_GROUP_ID_KEY = "extractor.consensus.group-id";
public static final String EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY =
"extractor.consensus.sender-dn-id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public class PipeProcessorConstant {
public static final long PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE =
Long.MAX_VALUE;

public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series";
public static final String _PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series";
public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output.series";

private PipeProcessorConstant() {
throw new IllegalStateException("Utility class");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public boolean isRoot() {
}

/**
* Interpret from source parameters and get a {@link PipePattern}.
* Interpret from source parameters and get a {@link TreePattern}.
*
* @return The interpreted {@link TreePattern} which is not {@code null}.
*/
Expand Down

0 comments on commit 2e5b5f1

Please sign in to comment.