Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IoTConsensusV2: Use syncLag for region migration progress management #14624

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeReceiver;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector;
import org.apache.iotdb.consensus.pipe.consensuspipe.ProgressIndexManager;
import org.apache.iotdb.consensus.pipe.consensuspipe.ReplicateProgressManager;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -203,7 +203,7 @@ public static class Pipe {
private final ConsensusPipeDispatcher consensusPipeDispatcher;
private final ConsensusPipeGuardian consensusPipeGuardian;
private final ConsensusPipeSelector consensusPipeSelector;
private final ProgressIndexManager progressIndexManager;
private final ReplicateProgressManager replicateProgressManager;
private final ConsensusPipeReceiver consensusPipeReceiver;
private final long consensusPipeGuardJobIntervalInSeconds;

Expand All @@ -214,7 +214,7 @@ public Pipe(
ConsensusPipeDispatcher consensusPipeDispatcher,
ConsensusPipeGuardian consensusPipeGuardian,
ConsensusPipeSelector consensusPipeSelector,
ProgressIndexManager progressIndexManager,
ReplicateProgressManager replicateProgressManager,
ConsensusPipeReceiver consensusPipeReceiver,
long consensusPipeGuardJobIntervalInSeconds) {
this.extractorPluginName = extractorPluginName;
Expand All @@ -223,7 +223,7 @@ public Pipe(
this.consensusPipeDispatcher = consensusPipeDispatcher;
this.consensusPipeGuardian = consensusPipeGuardian;
this.consensusPipeSelector = consensusPipeSelector;
this.progressIndexManager = progressIndexManager;
this.replicateProgressManager = replicateProgressManager;
this.consensusPipeReceiver = consensusPipeReceiver;
this.consensusPipeGuardJobIntervalInSeconds = consensusPipeGuardJobIntervalInSeconds;
}
Expand Down Expand Up @@ -256,8 +256,8 @@ public ConsensusPipeReceiver getConsensusPipeReceiver() {
return consensusPipeReceiver;
}

public ProgressIndexManager getProgressIndexManager() {
return progressIndexManager;
public ReplicateProgressManager getProgressIndexManager() {
return replicateProgressManager;
}

public long getConsensusPipeGuardJobIntervalInSeconds() {
Expand All @@ -277,7 +277,7 @@ public static class Builder {
private ConsensusPipeDispatcher consensusPipeDispatcher = null;
private ConsensusPipeGuardian consensusPipeGuardian = null;
private ConsensusPipeSelector consensusPipeSelector = null;
private ProgressIndexManager progressIndexManager = null;
private ReplicateProgressManager replicateProgressManager = null;
private ConsensusPipeReceiver consensusPipeReceiver = null;
private long consensusPipeGuardJobIntervalInSeconds = 180L;

Expand Down Expand Up @@ -317,8 +317,9 @@ public Pipe.Builder setConsensusPipeReceiver(ConsensusPipeReceiver consensusPipe
return this;
}

public Pipe.Builder setProgressIndexManager(ProgressIndexManager progressIndexManager) {
this.progressIndexManager = progressIndexManager;
public Pipe.Builder setProgressIndexManager(
ReplicateProgressManager replicateProgressManager) {
this.replicateProgressManager = replicateProgressManager;
return this;
}

Expand All @@ -336,7 +337,7 @@ public Pipe build() {
consensusPipeDispatcher,
consensusPipeGuardian,
consensusPipeSelector,
progressIndexManager,
replicateProgressManager,
consensusPipeReceiver,
consensusPipeGuardJobIntervalInSeconds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.consensuspipe.ProgressIndexManager;
import org.apache.iotdb.consensus.pipe.consensuspipe.ReplicateProgressManager;
import org.apache.iotdb.consensus.pipe.metric.PipeConsensusServerMetrics;
import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedReq;
import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedResp;
Expand Down Expand Up @@ -89,7 +89,7 @@ public class PipeConsensusServerImpl {
private final AtomicBoolean isStarted;
private final String consensusGroupId;
private final ConsensusPipeManager consensusPipeManager;
private final ProgressIndexManager progressIndexManager;
private final ReplicateProgressManager replicateProgressManager;
private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> syncClientManager;
private final PipeConsensusServerMetrics pipeConsensusServerMetrics;
private final ReplicateMode replicateMode;
Expand All @@ -112,7 +112,7 @@ public PipeConsensusServerImpl(
this.isStarted = new AtomicBoolean(false);
this.consensusGroupId = thisNode.getGroupId().toString();
this.consensusPipeManager = consensusPipeManager;
this.progressIndexManager = config.getPipe().getProgressIndexManager();
this.replicateProgressManager = config.getPipe().getProgressIndexManager();
this.syncClientManager = syncClientManager;
this.pipeConsensusServerMetrics = new PipeConsensusServerMetrics(this);
this.replicateMode = config.getReplicateMode();
Expand Down Expand Up @@ -332,7 +332,7 @@ public TSStatus write(IConsensusRequest request) {
long writeToStateMachineStartTime = System.nanoTime();
if (request instanceof ComparableConsensusRequest) {
((ComparableConsensusRequest) request)
.setProgressIndex(progressIndexManager.assignProgressIndex(thisNode.getGroupId()));
.setProgressIndex(replicateProgressManager.assignProgressIndex(thisNode.getGroupId()));
}
TSStatus result = stateMachine.write(request);
long writeToStateMachineEndTime = System.nanoTime();
Expand Down Expand Up @@ -629,20 +629,29 @@ private boolean isRemotePeerConsensusPipesTransmissionCompleted(
}
}

public boolean isConsensusPipesTransmissionCompleted(List<String> consensusPipeNames) {
return consensusPipeNames.stream()
.noneMatch(
pipeName ->
replicateProgressManager.getSyncLagForSpecificConsensusPipe(
thisNode.getGroupId(), new ConsensusPipeName(pipeName))
> 0);
}

public synchronized boolean isConsensusPipesTransmissionCompleted(
List<String> consensusPipeNames, boolean refreshCachedProgressIndex) {
if (refreshCachedProgressIndex) {
cachedProgressIndex =
cachedProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
progressIndexManager.getMaxAssignedProgressIndex(thisNode.getGroupId()));
replicateProgressManager.getMaxAssignedProgressIndex(thisNode.getGroupId()));
}

try {
return consensusPipeNames.stream()
.noneMatch(
name ->
cachedProgressIndex.isAfter(
progressIndexManager.getProgressIndex(new ConsensusPipeName(name))));
replicateProgressManager.getProgressIndex(new ConsensusPipeName(name))));
} catch (PipeException e) {
LOGGER.info(e.getMessage());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iotdb.consensus.pipe.consensuspipe;

public interface ConsensusPipeConnector {
int getConsensusPipeRestartTimes();

long getConsensusPipeCommitProgress();

long getConsensusPipeReplicateProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;

public interface ProgressIndexManager {
public interface ReplicateProgressManager {
ProgressIndex getProgressIndex(ConsensusPipeName consensusPipeName);

ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId);

ProgressIndex getMaxAssignedProgressIndex(ConsensusGroupId consensusGroupId);

long getSyncLagForSpecificConsensusPipe(
ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName);

void pinCommitIndexForMigration(
ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName);

void pinRestartTimeForMigration(
ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
package org.apache.iotdb.consensus.pipe.metric;

import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeConnector;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -36,27 +36,92 @@
public class PipeConsensusSyncLagManager {
long syncLag = Long.MIN_VALUE;
ReentrantLock lock = new ReentrantLock();
List<ConsensusPipeConnector> consensusPipeConnectorList = new ArrayList<>();
Map<ConsensusPipeName, ConsensusPipeConnector> consensusPipe2ConnectorMap =
new ConcurrentHashMap<>();
Map<ConsensusPipeName, MaxIndexContainer> consensusPipe2MaxIndexContainerMap =
new ConcurrentHashMap<>();

private long getSyncLagForSpecificConsensusPipe(ConsensusPipeConnector consensusPipeConnector) {
long userWriteProgress = consensusPipeConnector.getConsensusPipeCommitProgress();
long replicateProgress = consensusPipeConnector.getConsensusPipeReplicateProgress();
return Math.max(userWriteProgress - replicateProgress, 0);
/**
* pinnedCommitIndex - currentReplicateProgress. If res <= 0, indicating that replication is
* finished.
*/
public long getSyncLagForRegionMigration(
ConsensusPipeName consensusPipeName, long pinnedCommitIndex, int pinnedPipeRestartTimes) {
return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
.map(
consensusPipeConnector -> {
int pipeRestartTimes = consensusPipeConnector.getConsensusPipeRestartTimes();
if (pipeRestartTimes > pinnedPipeRestartTimes) {
long accumulatedReplicatedProgress = 0;
for (int i = pinnedPipeRestartTimes; i <= pipeRestartTimes; i++) {
accumulatedReplicatedProgress +=
consensusPipe2MaxIndexContainerMap
.get(consensusPipeName)
.getMaxReplicateIndex(i);
}
return Math.max(pinnedCommitIndex - accumulatedReplicatedProgress, 0);
} else {
return Math.max(
pinnedCommitIndex
- consensusPipe2MaxIndexContainerMap
.get(consensusPipeName)
.getMaxReplicateIndex(pinnedPipeRestartTimes),
0L);
}
})
.orElse(0L);
}

public long getCurrentCommitIndex(ConsensusPipeName consensusPipeName) {
return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
.map(ConsensusPipeConnector::getConsensusPipeCommitProgress)
.orElse(0L);
}

public int getCurrentRestartTimes(ConsensusPipeName consensusPipeName) {
return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
.map(ConsensusPipeConnector::getConsensusPipeRestartTimes)
.orElse(0);
}

public long getSyncLagForSpecificConsensusPipe(ConsensusPipeName consensusPipeName) {
return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
.map(
consensusPipeConnector -> {
int pipeRestartTimes = consensusPipeConnector.getConsensusPipeRestartTimes();
long userWriteProgress =
consensusPipe2MaxIndexContainerMap
.get(consensusPipeName)
.computeUserWriteIndex(
pipeRestartTimes,
consensusPipeConnector.getConsensusPipeCommitProgress());
long replicateProgress =
consensusPipe2MaxIndexContainerMap
.get(consensusPipeName)
.computeReplicateIndex(
pipeRestartTimes,
consensusPipeConnector.getConsensusPipeReplicateProgress());
return Math.max(userWriteProgress - replicateProgress, 0L);
})
.orElse(0L);
}

public void addConsensusPipeConnector(ConsensusPipeConnector consensusPipeConnector) {
public void addConsensusPipeConnector(
ConsensusPipeName consensusPipeName, ConsensusPipeConnector consensusPipeConnector) {
try {
lock.lock();
consensusPipeConnectorList.add(consensusPipeConnector);
consensusPipe2ConnectorMap.put(consensusPipeName, consensusPipeConnector);
consensusPipe2MaxIndexContainerMap.computeIfAbsent(
consensusPipeName, k -> new MaxIndexContainer());
} finally {
lock.unlock();
}
}

public void removeConsensusPipeConnector(ConsensusPipeConnector connector) {
public void removeConsensusPipeConnector(ConsensusPipeName consensusPipeName) {
try {
lock.lock();
consensusPipeConnectorList.remove(connector);
consensusPipe2ConnectorMap.remove(consensusPipeName);
} finally {
lock.unlock();
}
Expand All @@ -71,21 +136,65 @@ public long calculateSyncLag() {
try {
lock.lock();
// if there isn't a consensus pipe task, the syncLag is 0
if (consensusPipeConnectorList.isEmpty()) {
if (consensusPipe2ConnectorMap.isEmpty()) {
return 0;
}
// else we find the biggest gap between leader and replicas in all consensus pipe task.
syncLag = Long.MIN_VALUE;
consensusPipeConnectorList.forEach(
consensusPipeConnector ->
syncLag =
Math.max(syncLag, getSyncLagForSpecificConsensusPipe(consensusPipeConnector)));
consensusPipe2ConnectorMap
.keySet()
.forEach(
consensusPipeName ->
syncLag =
Math.max(syncLag, getSyncLagForSpecificConsensusPipe(consensusPipeName)));
return syncLag;
} finally {
lock.unlock();
}
}

public void clear() {
this.consensusPipe2ConnectorMap.clear();
this.consensusPipe2MaxIndexContainerMap.clear();
}

public static class MaxIndexContainer {
// pipe restart times -> max(pipe event commit index)
Map<Integer, Long> pipeRestartTimes2MaxUserWriteIndex = new ConcurrentHashMap<>();
// pipe restart times -> max(replicated event commit index)
Map<Integer, Long> pipeRestartTimes2MaxReplicateIndex = new ConcurrentHashMap<>();

public long computeUserWriteIndex(int restartTimes, long commitIndex) {
return pipeRestartTimes2MaxUserWriteIndex.compute(
restartTimes,
(k, v) -> {
if (v == null) {
return commitIndex;
}
return Math.max(v, commitIndex);
});
}

public long computeReplicateIndex(int restartTimes, long commitIndex) {
return pipeRestartTimes2MaxReplicateIndex.compute(
restartTimes,
(k, v) -> {
if (v == null) {
return commitIndex;
}
return Math.max(v, commitIndex);
});
}

public Long getMaxUserWriteIndex(int restartTimes) {
return pipeRestartTimes2MaxUserWriteIndex.getOrDefault(restartTimes, 0);
}

public Long getMaxReplicateIndex(int restartTimes) {
return pipeRestartTimes2MaxReplicateIndex.getOrDefault(restartTimes, 0);
}
}

private PipeConsensusSyncLagManager() {
// do nothing
}
Expand All @@ -110,6 +219,7 @@ public static PipeConsensusSyncLagManager getInstance(String groupId) {
}

public static void release(String groupId) {
PipeConsensusSyncLagManager.getInstance(groupId).clear();
PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.remove(groupId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeDispatcher;
import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeRuntimeAgentGuardian;
import org.apache.iotdb.db.pipe.consensus.ProgressIndexDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
Expand Down Expand Up @@ -170,7 +170,7 @@ private static ConsensusConfig buildConsensusConfig() {
.setConsensusPipeSelector(
() -> PipeDataNodeAgent.task().getAllConsensusPipe())
.setConsensusPipeReceiver(PipeDataNodeAgent.receiver().pipeConsensus())
.setProgressIndexManager(new ProgressIndexDataNodeManager())
.setProgressIndexManager(new ReplicateProgressDataNodeManager())
.setConsensusPipeGuardJobIntervalInSeconds(300)
.build())
.setReplicateMode(ReplicateMode.fromValue(CONF.getIotConsensusV2Mode()))
Expand Down
Loading
Loading