Skip to content

Commit

Permalink
[BugFix] Lake compaction scheduler might trigger redundant transactio…
Browse files Browse the repository at this point in the history
…n which can cause CN crash (#56263)

Signed-off-by: drake_wang <[email protected]>
  • Loading branch information
wxl24life authored Feb 26, 2025
1 parent 35b66f8 commit 8a6f4ad
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public class CompactionMgr implements MemoryTrackable {
private CompactionScheduler compactionScheduler;

/**
*
* In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is
* necessary to ensure that the compaction task of the same partition is executed serially, that is, the next
* compaction task can be executed only after the status of the previous compaction task changes to visible or
* canceled.
* So when FE restarted, we should make sure all the active compaction transactions before restarting were tracked,
* and exclude them from choosing as candidates for compaction.
*
* We use `activeCompactionTransactionMap` to track all lake compaction txns that are not published on FE restart.
* The key of the map is the transaction id related to the compaction task, and the value is table id of the
* compaction task. It's possible that multiple keys have the same value, because there might be multiple compaction
Expand Down Expand Up @@ -106,9 +114,10 @@ public void start() {
}

/**
* iterate all transactions and find those with LAKE_COMPACTION labels and are not finished before FE restart.
* iterate all transactions and find those with LAKE_COMPACTION labels and are not finished before FE restart
* or Leader FE changed.
**/
public void rebuildActiveCompactionTransactionMapOnRestart() {
public void buildActiveCompactionTransactionMap() {
Map<Long, Long> activeTxnStates =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLakeCompactionActiveTxnStats();
for (Map.Entry<Long, Long> txnState : activeTxnStates.entrySet()) {
Expand Down Expand Up @@ -257,16 +266,6 @@ public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockExcepti
public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException {
CompactionMgr compactionManager = reader.readJson(CompactionMgr.class);
partitionStatisticsHashMap = compactionManager.partitionStatisticsHashMap;

// In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is
// necessary to ensure that the compaction task of the same partition is executed serially, that is, the next
// compaction task can be executed only after the status of the previous compaction task changes to visible or
// canceled.
// So when FE restarted, we should make sure all the active compaction transactions before restarting were tracked,
// and exclude them from choosing as candidates for compaction.
// Note here, the map is maintained on leader and follower fe, its keys were removed from the map after compaction
// transaction has finished, and for follower FE, this is done by replay process.
rebuildActiveCompactionTransactionMapOnRestart();
}

public long getPartitionStatsCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,10 @@ private void startLeaderOnlyDaemonThreads() {
taskRunStateSynchronizer.start();

if (RunMode.isSharedDataMode()) {
// Need to rebuild active lake compaction transactions before lake scheduler starting to run
// Lake compactionMgr is started on all FE nodes and scheduler only starts to run when the FE is leader
compactionMgr.buildActiveCompactionTransactionMap();

starMgrMetaSyncer.start();
autovacuumDaemon.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testChoosePartitionsToCompactWithActiveTxnFilter() {

}
};
compactionManager.rebuildActiveCompactionTransactionMapOnRestart();
compactionManager.buildActiveCompactionTransactionMap();

Set<PartitionIdentifier> allPartitions = compactionManager.getAllPartitions();
Assert.assertEquals(3, allPartitions.size());
Expand Down Expand Up @@ -297,7 +297,7 @@ public void testActiveCompactionTransactionMapOnRestart() {
};

CompactionMgr compactionMgr = new CompactionMgr();
compactionMgr.rebuildActiveCompactionTransactionMapOnRestart();
compactionMgr.buildActiveCompactionTransactionMap();
ConcurrentHashMap<Long, Long> activeCompactionTransactionMap =
compactionMgr.getRemainedActiveCompactionTxnWhenStart();
Assert.assertEquals(1, activeCompactionTransactionMap.size());
Expand Down

0 comments on commit 8a6f4ad

Please sign in to comment.