Skip to content

Commit

Permalink
Pipe: Smoothed the rate in pipe's remaining time calculations (#12699)
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored Jun 17, 2024
1 parent d00c766 commit 89af73d
Show file tree
Hide file tree
Showing 39 changed files with 672 additions and 339 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,28 @@ public class CountPointProcessor implements PipeProcessor {
private PartialPath aggregateSeries;

@Override
public void validate(PipeParameterValidator validator) {
public void validate(final PipeParameterValidator validator) {
validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY);
}

@Override
public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
public void customize(
final PipeParameters parameters, final PipeProcessorRuntimeConfiguration configuration)
throws Exception {
this.aggregateSeries = new PartialPath(parameters.getString(AGGREGATE_SERIES_KEY));
}

@Override
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) {
public void process(
final TabletInsertionEvent tabletInsertionEvent, final EventCollector eventCollector) {
tabletInsertionEvent.processTablet(
(tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize));
}

@Override
public void process(Event event, EventCollector eventCollector) throws Exception {
public void process(final Event event, final EventCollector eventCollector) throws Exception {
if (event instanceof PipeHeartbeatEvent) {
Tablet tablet =
final Tablet tablet =
new Tablet(
aggregateSeries.getDevice(),
Collections.singletonList(
Expand All @@ -73,7 +75,7 @@ public void process(Event event, EventCollector eventCollector) throws Exception
tablet.addTimestamp(0, System.currentTimeMillis());
tablet.addValue(aggregateSeries.getMeasurement(), 0, writePointCount.get());
eventCollector.collect(
new PipeRawTabletInsertionEvent(tablet, false, null, null, null, false));
new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null, false));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ protected boolean isShutdown() {
return PipeConfigNodeAgent.runtime().isShutdown();
}

@Override
protected void thawRate(final String pipeName, final long creationTime) {
PipeConfigNodeRemainingTimeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
}

@Override
protected void freezeRate(final String pipeName, final long creationTime) {
PipeConfigNodeRemainingTimeMetrics.getInstance().freezeRate(pipeName + "_" + creationTime);
}

@Override
protected Map<Integer, PipeTask> buildPipeTasks(final PipeMeta pipeMetaFromConfigNode)
throws IllegalPathException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,23 @@ public PipeConfigRegionSnapshotEvent() {

public PipeConfigRegionSnapshotEvent(
final String snapshotPath, final String templateFilePath, final CNSnapshotFileType type) {
this(snapshotPath, templateFilePath, type, null, null, null);
this(snapshotPath, templateFilePath, type, null, 0, null, null);
}

public PipeConfigRegionSnapshotEvent(
final String snapshotPath,
final String templateFilePath,
final CNSnapshotFileType type,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern) {
super(pipeName, pipeTaskMeta, pattern, PipeConfigNodeSnapshotResourceManager.getInstance());
super(
pipeName,
creationTime,
pipeTaskMeta,
pattern,
PipeConfigNodeSnapshotResourceManager.getInstance());
this.snapshotPath = snapshotPath;
this.templateFilePath = Objects.nonNull(templateFilePath) ? templateFilePath : "";
this.fileType = type;
Expand Down Expand Up @@ -157,12 +163,13 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeConfigRegionSnapshotEvent(
snapshotPath, templateFilePath, fileType, pipeName, pipeTaskMeta, pattern);
snapshotPath, templateFilePath, fileType, pipeName, creationTime, pipeTaskMeta, pattern);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ public PipeConfigRegionWritePlanEvent() {

public PipeConfigRegionWritePlanEvent(
final ConfigPhysicalPlan configPhysicalPlan, final boolean isGeneratedByPipe) {
this(configPhysicalPlan, null, null, null, isGeneratedByPipe);
this(configPhysicalPlan, null, 0, null, null, isGeneratedByPipe);
}

public PipeConfigRegionWritePlanEvent(
final ConfigPhysicalPlan configPhysicalPlan,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final boolean isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
this.configPhysicalPlan = configPhysicalPlan;
}

Expand All @@ -61,12 +62,13 @@ public ConfigPhysicalPlan getConfigPhysicalPlan() {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeConfigRegionWritePlanEvent(
configPhysicalPlan, pipeName, pipeTaskMeta, pattern, false);
configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, pattern, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.confignode.manager.pipe.metric;

import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
Expand Down Expand Up @@ -110,6 +109,26 @@ public void register(final IoTDBConfigRegionExtractor extractor) {
}
}

public void thawRate(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
"Failed to thaw pipe remaining time rate, RemainingTimeOperator({}) does not exist",
pipeID);
return;
}
remainingTimeOperatorMap.get(pipeID).thawRate(true);
}

public void freezeRate(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
"Failed to freeze pipe remaining time rate, RemainingTimeOperator({}) does not exist",
pipeID);
return;
}
remainingTimeOperatorMap.get(pipeID).freezeRate(true);
}

public void deregister(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
Expand All @@ -122,12 +141,7 @@ public void deregister(final String pipeID) {
}
}

public void markRegionCommit(final PipeTaskRuntimeEnvironment pipeTaskRuntimeEnvironment) {
// Filter commit attempt from assigner
final String pipeName = pipeTaskRuntimeEnvironment.getPipeName();
final long creationTime = pipeTaskRuntimeEnvironment.getCreationTime();
final String pipeID = pipeName + "_" + creationTime;

public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
if (Objects.isNull(metricService)) {
return;
}
Expand All @@ -137,12 +151,6 @@ public void markRegionCommit(final PipeTaskRuntimeEnvironment pipeTaskRuntimeEnv
"Failed to mark pipe region commit, RemainingTimeOperator({}) does not exist", pipeID);
return;
}
// Prevent not set pipeName / creation times & potential differences between pipeNames and
// creation times
if (!Objects.equals(pipeName, operator.getPipeName())
|| !Objects.equals(creationTime, operator.getCreationTime())) {
return;
}

operator.markConfigRegionCommit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,95 +19,110 @@

package org.apache.iotdb.confignode.manager.pipe.metric;

import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import org.apache.iotdb.confignode.manager.pipe.execution.PipeConfigNodeSubtask;
import org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor;

import com.codahale.metrics.Clock;
import com.codahale.metrics.ExponentialMovingAverages;
import com.codahale.metrics.Meter;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;

class PipeConfigNodeRemainingTimeOperator {
class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator {

private static final long CONFIG_NODE_REMAINING_MAX_SECONDS = 365 * 24 * 60 * 60L; // 1 year
private final Set<IoTDBConfigRegionExtractor> configRegionExtractors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicReference<Meter> configRegionCommitMeter = new AtomicReference<>(null);

private String pipeName;
private long creationTime = 0;

private final ConcurrentMap<IoTDBConfigRegionExtractor, IoTDBConfigRegionExtractor>
configRegionExtractors = new ConcurrentHashMap<>();
private final Meter configRegionCommitMeter =
new Meter(new ExponentialMovingAverages(), Clock.defaultClock());

private double lastConfigRegionCommitSmoothingValue = Long.MIN_VALUE;

//////////////////////////// Tags ////////////////////////////

String getPipeName() {
return pipeName;
}

long getCreationTime() {
return creationTime;
}
private double lastConfigRegionCommitSmoothingValue = Long.MAX_VALUE;

//////////////////////////// Remaining time calculation ////////////////////////////

/**
* This will calculate the estimated remaining time of the given pipe's config region subTask.
* This will calculate the estimated remaining time of the given pipe's {@link
* PipeConfigNodeSubtask}.
*
* @return The estimated remaining time
*/
double getRemainingTime() {
final double pipeRemainingTimeCommitRateSmoothingFactor =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateSmoothingFactor();
final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();

// Do not calculate heartbeat event
final long totalConfigRegionWriteEventCount =
configRegionExtractors.keySet().stream()
configRegionExtractors.stream()
.map(IoTDBConfigRegionExtractor::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);

lastConfigRegionCommitSmoothingValue =
lastConfigRegionCommitSmoothingValue == Long.MIN_VALUE
? configRegionCommitMeter.getOneMinuteRate()
: pipeRemainingTimeCommitRateSmoothingFactor
* configRegionCommitMeter.getOneMinuteRate()
+ (1 - pipeRemainingTimeCommitRateSmoothingFactor)
* lastConfigRegionCommitSmoothingValue;
configRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
lastConfigRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
}
return meter;
});

final double configRegionRemainingTime;
if (totalConfigRegionWriteEventCount <= 0) {
notifyEmpty();
configRegionRemainingTime = 0;
} else {
notifyNonEmpty();
configRegionRemainingTime =
lastConfigRegionCommitSmoothingValue <= 0
? Double.MAX_VALUE
: totalConfigRegionWriteEventCount / lastConfigRegionCommitSmoothingValue;
}

return configRegionRemainingTime >= CONFIG_NODE_REMAINING_MAX_SECONDS
? CONFIG_NODE_REMAINING_MAX_SECONDS
return configRegionRemainingTime >= REMAINING_MAX_SECONDS
? REMAINING_MAX_SECONDS
: configRegionRemainingTime;
}

//////////////////////////// Register & deregister (pipe integration) ////////////////////////////

void register(final IoTDBConfigRegionExtractor extractor) {
setNameAndCreationTime(extractor.getPipeName(), extractor.getCreationTime());
configRegionExtractors.put(extractor, extractor);
}

private void setNameAndCreationTime(final String pipeName, final long creationTime) {
this.pipeName = pipeName;
this.creationTime = creationTime;
configRegionExtractors.add(extractor);
}

//////////////////////////// Rate ////////////////////////////

void markConfigRegionCommit() {
configRegionCommitMeter.mark();
configRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
meter.mark();
}
return meter;
});
}

//////////////////////////// Switch ////////////////////////////

@Override
public synchronized void thawRate(final boolean isStartPipe) {
super.thawRate(isStartPipe);
// The stopped pipe's rate should only be thawed by "startPipe" command
if (isStopped) {
return;
}
configRegionCommitMeter.compareAndSet(
null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock()));
}

@Override
public synchronized void freezeRate(final boolean isStopPipe) {
super.freezeRate(isStopPipe);
configRegionCommitMeter.set(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
private final PipeConfigNodeSubtask subtask;

public PipeConfigNodeTaskStage(
String pipeName,
long creationTime,
Map<String, String> extractorAttributes,
Map<String, String> processorAttributes,
Map<String, String> connectorAttributes,
PipeTaskMeta pipeTaskMeta) {
final String pipeName,
final long creationTime,
final Map<String, String> extractorAttributes,
final Map<String, String> processorAttributes,
final Map<String, String> connectorAttributes,
final PipeTaskMeta pipeTaskMeta) {

try {
subtask =
Expand All @@ -48,7 +48,7 @@ public PipeConfigNodeTaskStage(
processorAttributes,
connectorAttributes,
pipeTaskMeta);
} catch (Exception e) {
} catch (final Exception e) {
throw new PipeException(
String.format(
"Failed to create subtask for pipe %s, creation time %d", pipeName, creationTime),
Expand All @@ -63,8 +63,6 @@ public void createSubtask() throws PipeException {

@Override
public void startSubtask() throws PipeException {
// IoTDBConfigRegionExtractor is started by executor because starting
// here may cause deadlock when triggering snapshot
PipeConfigNodeSubtaskExecutor.getInstance().start(subtask.getTaskID());
}

Expand Down
Loading

0 comments on commit 89af73d

Please sign in to comment.