diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java index 349a5cd7e1d0..eb6ea361d24d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java @@ -24,7 +24,7 @@ import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian; import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeReceiver; import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector; -import org.apache.iotdb.consensus.pipe.consensuspipe.ProgressIndexManager; +import org.apache.iotdb.consensus.pipe.consensuspipe.ReplicateProgressManager; import java.util.concurrent.TimeUnit; @@ -203,7 +203,7 @@ public static class Pipe { private final ConsensusPipeDispatcher consensusPipeDispatcher; private final ConsensusPipeGuardian consensusPipeGuardian; private final ConsensusPipeSelector consensusPipeSelector; - private final ProgressIndexManager progressIndexManager; + private final ReplicateProgressManager replicateProgressManager; private final ConsensusPipeReceiver consensusPipeReceiver; private final long consensusPipeGuardJobIntervalInSeconds; @@ -214,7 +214,7 @@ public Pipe( ConsensusPipeDispatcher consensusPipeDispatcher, ConsensusPipeGuardian consensusPipeGuardian, ConsensusPipeSelector consensusPipeSelector, - ProgressIndexManager progressIndexManager, + ReplicateProgressManager replicateProgressManager, ConsensusPipeReceiver consensusPipeReceiver, long consensusPipeGuardJobIntervalInSeconds) { this.extractorPluginName = extractorPluginName; @@ -223,7 +223,7 @@ public Pipe( this.consensusPipeDispatcher = consensusPipeDispatcher; this.consensusPipeGuardian = consensusPipeGuardian; this.consensusPipeSelector = consensusPipeSelector; - this.progressIndexManager = progressIndexManager; + this.replicateProgressManager = replicateProgressManager; this.consensusPipeReceiver = consensusPipeReceiver; this.consensusPipeGuardJobIntervalInSeconds = consensusPipeGuardJobIntervalInSeconds; } @@ -256,8 +256,8 @@ public ConsensusPipeReceiver getConsensusPipeReceiver() { return consensusPipeReceiver; } - public ProgressIndexManager getProgressIndexManager() { - return progressIndexManager; + public ReplicateProgressManager getProgressIndexManager() { + return replicateProgressManager; } public long getConsensusPipeGuardJobIntervalInSeconds() { @@ -277,7 +277,7 @@ public static class Builder { private ConsensusPipeDispatcher consensusPipeDispatcher = null; private ConsensusPipeGuardian consensusPipeGuardian = null; private ConsensusPipeSelector consensusPipeSelector = null; - private ProgressIndexManager progressIndexManager = null; + private ReplicateProgressManager replicateProgressManager = null; private ConsensusPipeReceiver consensusPipeReceiver = null; private long consensusPipeGuardJobIntervalInSeconds = 180L; @@ -317,8 +317,9 @@ public Pipe.Builder setConsensusPipeReceiver(ConsensusPipeReceiver consensusPipe return this; } - public Pipe.Builder setProgressIndexManager(ProgressIndexManager progressIndexManager) { - this.progressIndexManager = progressIndexManager; + public Pipe.Builder setProgressIndexManager( + ReplicateProgressManager replicateProgressManager) { + this.replicateProgressManager = replicateProgressManager; return this; } @@ -336,7 +337,7 @@ public Pipe build() { consensusPipeDispatcher, consensusPipeGuardian, consensusPipeSelector, - progressIndexManager, + replicateProgressManager, consensusPipeReceiver, consensusPipeGuardJobIntervalInSeconds); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java index b085abbd64b2..38d7b62570a3 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java @@ -42,7 +42,7 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException; import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager; import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName; -import org.apache.iotdb.consensus.pipe.consensuspipe.ProgressIndexManager; +import org.apache.iotdb.consensus.pipe.consensuspipe.ReplicateProgressManager; import org.apache.iotdb.consensus.pipe.metric.PipeConsensusServerMetrics; import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedReq; import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedResp; @@ -89,7 +89,7 @@ public class PipeConsensusServerImpl { private final AtomicBoolean isStarted; private final String consensusGroupId; private final ConsensusPipeManager consensusPipeManager; - private final ProgressIndexManager progressIndexManager; + private final ReplicateProgressManager replicateProgressManager; private final IClientManager syncClientManager; private final PipeConsensusServerMetrics pipeConsensusServerMetrics; private final ReplicateMode replicateMode; @@ -112,7 +112,7 @@ public PipeConsensusServerImpl( this.isStarted = new AtomicBoolean(false); this.consensusGroupId = thisNode.getGroupId().toString(); this.consensusPipeManager = consensusPipeManager; - this.progressIndexManager = config.getPipe().getProgressIndexManager(); + this.replicateProgressManager = config.getPipe().getProgressIndexManager(); this.syncClientManager = syncClientManager; this.pipeConsensusServerMetrics = new PipeConsensusServerMetrics(this); this.replicateMode = config.getReplicateMode(); @@ -332,7 +332,7 @@ public TSStatus write(IConsensusRequest request) { long writeToStateMachineStartTime = System.nanoTime(); if (request instanceof ComparableConsensusRequest) { ((ComparableConsensusRequest) request) - .setProgressIndex(progressIndexManager.assignProgressIndex(thisNode.getGroupId())); + .setProgressIndex(replicateProgressManager.assignProgressIndex(thisNode.getGroupId())); } TSStatus result = stateMachine.write(request); long writeToStateMachineEndTime = System.nanoTime(); @@ -629,12 +629,21 @@ private boolean isRemotePeerConsensusPipesTransmissionCompleted( } } + public boolean isConsensusPipesTransmissionCompleted(List consensusPipeNames) { + return consensusPipeNames.stream() + .noneMatch( + pipeName -> + replicateProgressManager.getSyncLagForSpecificConsensusPipe( + thisNode.getGroupId(), new ConsensusPipeName(pipeName)) + > 0); + } + public synchronized boolean isConsensusPipesTransmissionCompleted( List consensusPipeNames, boolean refreshCachedProgressIndex) { if (refreshCachedProgressIndex) { cachedProgressIndex = cachedProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex( - progressIndexManager.getMaxAssignedProgressIndex(thisNode.getGroupId())); + replicateProgressManager.getMaxAssignedProgressIndex(thisNode.getGroupId())); } try { @@ -642,7 +651,7 @@ public synchronized boolean isConsensusPipesTransmissionCompleted( .noneMatch( name -> cachedProgressIndex.isAfter( - progressIndexManager.getProgressIndex(new ConsensusPipeName(name)))); + replicateProgressManager.getProgressIndex(new ConsensusPipeName(name)))); } catch (PipeException e) { LOGGER.info(e.getMessage()); return false; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java index 6f1396db9722..fb82e8dcb59a 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java @@ -19,6 +19,8 @@ package org.apache.iotdb.consensus.pipe.consensuspipe; public interface ConsensusPipeConnector { + int getConsensusPipeRestartTimes(); + long getConsensusPipeCommitProgress(); long getConsensusPipeReplicateProgress(); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ProgressIndexManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java similarity index 75% rename from iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ProgressIndexManager.java rename to iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java index e5dc2e922ab6..1659ecdb1958 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ProgressIndexManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java @@ -22,10 +22,19 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; -public interface ProgressIndexManager { +public interface ReplicateProgressManager { ProgressIndex getProgressIndex(ConsensusPipeName consensusPipeName); ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId); ProgressIndex getMaxAssignedProgressIndex(ConsensusGroupId consensusGroupId); + + long getSyncLagForSpecificConsensusPipe( + ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName); + + void pinCommitIndexForMigration( + ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName); + + void pinRestartTimeForMigration( + ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java index a65662d5f5e2..96301ac76ac2 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java @@ -20,10 +20,10 @@ package org.apache.iotdb.consensus.pipe.metric; import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeConnector; +import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName; -import java.util.ArrayList; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -36,27 +36,92 @@ public class PipeConsensusSyncLagManager { long syncLag = Long.MIN_VALUE; ReentrantLock lock = new ReentrantLock(); - List consensusPipeConnectorList = new ArrayList<>(); + Map consensusPipe2ConnectorMap = + new ConcurrentHashMap<>(); + Map consensusPipe2MaxIndexContainerMap = + new ConcurrentHashMap<>(); - private long getSyncLagForSpecificConsensusPipe(ConsensusPipeConnector consensusPipeConnector) { - long userWriteProgress = consensusPipeConnector.getConsensusPipeCommitProgress(); - long replicateProgress = consensusPipeConnector.getConsensusPipeReplicateProgress(); - return Math.max(userWriteProgress - replicateProgress, 0); + /** + * pinnedCommitIndex - currentReplicateProgress. If res <= 0, indicating that replication is + * finished. + */ + public long getSyncLagForRegionMigration( + ConsensusPipeName consensusPipeName, long pinnedCommitIndex, int pinnedPipeRestartTimes) { + return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName)) + .map( + consensusPipeConnector -> { + int pipeRestartTimes = consensusPipeConnector.getConsensusPipeRestartTimes(); + if (pipeRestartTimes > pinnedPipeRestartTimes) { + long accumulatedReplicatedProgress = 0; + for (int i = pinnedPipeRestartTimes; i <= pipeRestartTimes; i++) { + accumulatedReplicatedProgress += + consensusPipe2MaxIndexContainerMap + .get(consensusPipeName) + .getMaxReplicateIndex(i); + } + return Math.max(pinnedCommitIndex - accumulatedReplicatedProgress, 0); + } else { + return Math.max( + pinnedCommitIndex + - consensusPipe2MaxIndexContainerMap + .get(consensusPipeName) + .getMaxReplicateIndex(pinnedPipeRestartTimes), + 0L); + } + }) + .orElse(0L); + } + + public long getCurrentCommitIndex(ConsensusPipeName consensusPipeName) { + return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName)) + .map(ConsensusPipeConnector::getConsensusPipeCommitProgress) + .orElse(0L); + } + + public int getCurrentRestartTimes(ConsensusPipeName consensusPipeName) { + return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName)) + .map(ConsensusPipeConnector::getConsensusPipeRestartTimes) + .orElse(0); + } + + public long getSyncLagForSpecificConsensusPipe(ConsensusPipeName consensusPipeName) { + return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName)) + .map( + consensusPipeConnector -> { + int pipeRestartTimes = consensusPipeConnector.getConsensusPipeRestartTimes(); + long userWriteProgress = + consensusPipe2MaxIndexContainerMap + .get(consensusPipeName) + .computeUserWriteIndex( + pipeRestartTimes, + consensusPipeConnector.getConsensusPipeCommitProgress()); + long replicateProgress = + consensusPipe2MaxIndexContainerMap + .get(consensusPipeName) + .computeReplicateIndex( + pipeRestartTimes, + consensusPipeConnector.getConsensusPipeReplicateProgress()); + return Math.max(userWriteProgress - replicateProgress, 0L); + }) + .orElse(0L); } - public void addConsensusPipeConnector(ConsensusPipeConnector consensusPipeConnector) { + public void addConsensusPipeConnector( + ConsensusPipeName consensusPipeName, ConsensusPipeConnector consensusPipeConnector) { try { lock.lock(); - consensusPipeConnectorList.add(consensusPipeConnector); + consensusPipe2ConnectorMap.put(consensusPipeName, consensusPipeConnector); + consensusPipe2MaxIndexContainerMap.computeIfAbsent( + consensusPipeName, k -> new MaxIndexContainer()); } finally { lock.unlock(); } } - public void removeConsensusPipeConnector(ConsensusPipeConnector connector) { + public void removeConsensusPipeConnector(ConsensusPipeName consensusPipeName) { try { lock.lock(); - consensusPipeConnectorList.remove(connector); + consensusPipe2ConnectorMap.remove(consensusPipeName); } finally { lock.unlock(); } @@ -71,21 +136,65 @@ public long calculateSyncLag() { try { lock.lock(); // if there isn't a consensus pipe task, the syncLag is 0 - if (consensusPipeConnectorList.isEmpty()) { + if (consensusPipe2ConnectorMap.isEmpty()) { return 0; } // else we find the biggest gap between leader and replicas in all consensus pipe task. syncLag = Long.MIN_VALUE; - consensusPipeConnectorList.forEach( - consensusPipeConnector -> - syncLag = - Math.max(syncLag, getSyncLagForSpecificConsensusPipe(consensusPipeConnector))); + consensusPipe2ConnectorMap + .keySet() + .forEach( + consensusPipeName -> + syncLag = + Math.max(syncLag, getSyncLagForSpecificConsensusPipe(consensusPipeName))); return syncLag; } finally { lock.unlock(); } } + public void clear() { + this.consensusPipe2ConnectorMap.clear(); + this.consensusPipe2MaxIndexContainerMap.clear(); + } + + public static class MaxIndexContainer { + // pipe restart times -> max(pipe event commit index) + Map pipeRestartTimes2MaxUserWriteIndex = new ConcurrentHashMap<>(); + // pipe restart times -> max(replicated event commit index) + Map pipeRestartTimes2MaxReplicateIndex = new ConcurrentHashMap<>(); + + public long computeUserWriteIndex(int restartTimes, long commitIndex) { + return pipeRestartTimes2MaxUserWriteIndex.compute( + restartTimes, + (k, v) -> { + if (v == null) { + return commitIndex; + } + return Math.max(v, commitIndex); + }); + } + + public long computeReplicateIndex(int restartTimes, long commitIndex) { + return pipeRestartTimes2MaxReplicateIndex.compute( + restartTimes, + (k, v) -> { + if (v == null) { + return commitIndex; + } + return Math.max(v, commitIndex); + }); + } + + public Long getMaxUserWriteIndex(int restartTimes) { + return pipeRestartTimes2MaxUserWriteIndex.getOrDefault(restartTimes, 0); + } + + public Long getMaxReplicateIndex(int restartTimes) { + return pipeRestartTimes2MaxReplicateIndex.getOrDefault(restartTimes, 0); + } + } + private PipeConsensusSyncLagManager() { // do nothing } @@ -110,6 +219,7 @@ public static PipeConsensusSyncLagManager getInstance(String groupId) { } public static void release(String groupId) { + PipeConsensusSyncLagManager.getInstance(groupId).clear(); PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.remove(groupId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 0994935a6d78..42d0830e711f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -40,7 +40,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeDispatcher; import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeRuntimeAgentGuardian; -import org.apache.iotdb.db.pipe.consensus.ProgressIndexDataNodeManager; +import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; @@ -170,7 +170,7 @@ private static ConsensusConfig buildConsensusConfig() { .setConsensusPipeSelector( () -> PipeDataNodeAgent.task().getAllConsensusPipe()) .setConsensusPipeReceiver(PipeDataNodeAgent.receiver().pipeConsensus()) - .setProgressIndexManager(new ProgressIndexDataNodeManager()) + .setProgressIndexManager(new ReplicateProgressDataNodeManager()) .setConsensusPipeGuardJobIntervalInSeconds(300) .build()) .setReplicateMode(ReplicateMode.fromValue(CONF.getIotConsensusV2Mode())) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java index c0fa792da799..8c5e95a94db4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java @@ -33,6 +33,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeConnector; +import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName; import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager; import org.apache.iotdb.consensus.pipe.thrift.TCommitId; import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq; @@ -131,7 +132,7 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati // initialize metric components pipeConsensusConnectorMetrics = new PipeConsensusConnectorMetrics(this); PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr()) - .addConsensusPipeConnector(this); + .addConsensusPipeConnector(new ConsensusPipeName(consensusPipeName), this); MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetrics); // In PipeConsensus, one pipeConsensusTask corresponds to a pipeConsensusConnector. Thus, @@ -584,7 +585,7 @@ public synchronized void close() { } PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr()) - .removeConsensusPipeConnector(this); + .removeConsensusPipeConnector(new ConsensusPipeName(consensusPipeName)); MetricService.getInstance().removeMetricSet(this.pipeConsensusConnectorMetrics); } @@ -598,6 +599,15 @@ public int getRetryBufferSize() { return retryEventQueue.size(); } + @Override + public int getConsensusPipeRestartTimes() { + return PipeEventCommitManager.getInstance() + .getGivenConsensusPipeRestartTimes( + consensusPipeName, + PipeDataNodeAgent.task().getPipeCreationTime(consensusPipeName), + consensusGroupId); + } + @Override public long getConsensusPipeCommitProgress() { return PipeEventCommitManager.getInstance() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java similarity index 80% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java index e9ae4ff4454d..bef488298245 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java @@ -27,7 +27,8 @@ import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName; -import org.apache.iotdb.consensus.pipe.consensuspipe.ProgressIndexManager; +import org.apache.iotdb.consensus.pipe.consensuspipe.ReplicateProgressManager; +import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.storageengine.StorageEngine; @@ -40,11 +41,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -public class ProgressIndexDataNodeManager implements ProgressIndexManager { +public class ReplicateProgressDataNodeManager implements ReplicateProgressManager { private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); private final Map groupId2MaxProgressIndex; + private long pinnedCommitIndexForMigration; + private int pinnedPipeRestartTimesForMigration; - public ProgressIndexDataNodeManager() { + public ReplicateProgressDataNodeManager() { this.groupId2MaxProgressIndex = new ConcurrentHashMap<>(); recoverMaxProgressIndexFromDataRegion(); @@ -129,4 +132,30 @@ public ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId) { public ProgressIndex getMaxAssignedProgressIndex(ConsensusGroupId consensusGroupId) { return groupId2MaxProgressIndex.getOrDefault(consensusGroupId, MinimumProgressIndex.INSTANCE); } + + @Override + public long getSyncLagForSpecificConsensusPipe( + ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName) { + return PipeConsensusSyncLagManager.getInstance(consensusGroupId.toString()) + .getSyncLagForRegionMigration( + consensusPipeName, + this.pinnedCommitIndexForMigration, + this.pinnedPipeRestartTimesForMigration); + } + + @Override + public void pinCommitIndexForMigration( + ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName) { + this.pinnedCommitIndexForMigration = + PipeConsensusSyncLagManager.getInstance(consensusGroupId.toString()) + .getCurrentCommitIndex(consensusPipeName); + } + + @Override + public void pinRestartTimeForMigration( + ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName) { + this.pinnedCommitIndexForMigration = + PipeConsensusSyncLagManager.getInstance(consensusGroupId.toString()) + .getCurrentRestartTimes(consensusPipeName); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java index 0f6ff3c63152..7e7dec38c661 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java @@ -26,7 +26,7 @@ import org.apache.iotdb.consensus.pipe.PipeConsensus; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; -import org.apache.iotdb.db.pipe.consensus.ProgressIndexDataNodeManager; +import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager; import org.apache.iotdb.db.pipe.consensus.deletion.persist.DeletionBuffer; import org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer; import org.apache.iotdb.db.pipe.consensus.deletion.recover.DeletionReader; @@ -176,7 +176,7 @@ private synchronized void removeDeletionResource(DeletionResource deletionResour deleteNode2ResourcesMap.remove(deletionResource.getDeleteDataNode()); // Clean disk ProgressIndex currentProgressIndex = - ProgressIndexDataNodeManager.extractLocalSimpleProgressIndex( + ReplicateProgressDataNodeManager.extractLocalSimpleProgressIndex( deletionResource.getProgressIndex()); try (Stream pathStream = Files.walk(Paths.get(storageDir.getPath()), 1)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java index 289bcfc53fac..f9f6965f40d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java @@ -25,7 +25,7 @@ import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.pipe.consensus.ProgressIndexDataNodeManager; +import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager; import org.apache.iotdb.db.utils.MmapUtil; @@ -222,7 +222,8 @@ private void switchLoggingFile() throws IOException { try { // PipeConsensus ensures that deleteDataNodes use recoverProgressIndex. ProgressIndex curProgressIndex = - ProgressIndexDataNodeManager.extractLocalSimpleProgressIndex(maxProgressIndexInLastFile); + ReplicateProgressDataNodeManager.extractLocalSimpleProgressIndex( + maxProgressIndexInLastFile); if (!(curProgressIndex instanceof SimpleProgressIndex)) { throw new IOException("Invalid deletion progress index: " + curProgressIndex); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index b37bd07d1d9b..2ac078edf0c1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -176,6 +176,17 @@ public void setCommitRateMarker(final BiConsumer commitRateMark this.commitRateMarker = commitRateMarker; } + public int getGivenConsensusPipeRestartTimes( + final String consensusPipeName, final long creationTime, final int consensusGroupId) { + final CommitterKey committerKey = + generateCommitterKey(consensusPipeName, creationTime, consensusGroupId); + final PipeEventCommitter committer = eventCommitterMap.get(committerKey); + if (committer == null) { + return 0; + } + return committer.getPipeTaskRestartTimes(); + } + public long getGivenConsensusPipeCommitId( final String consensusPipeName, final long creationTime, final int consensusGroupId) { final CommitterKey committerKey = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java index ba61f03f841e..f47130119daa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java @@ -124,6 +124,10 @@ public int getRegionId() { return committerKey.getRegionId(); } + public int getPipeTaskRestartTimes() { + return committerKey.getRestartTimes(); + } + public long commitQueueSize() { return commitQueue.size(); }