From ba1f16fca317130d5127d0f49aa364b1de53313b Mon Sep 17 00:00:00 2001 From: Duncan Coutts Date: Thu, 19 Dec 2024 15:41:23 +0000 Subject: [PATCH] Move MergePolicyForLevel from MergingRun to IncomingRun As part of the changes to the scheduled merges prototype to introduce table unions, this refactoring was done. This is because the merging run gets reused in tree merges, where the merging policy makes no sense. So we move it up one level into something that's specific to the levels (the incomming run). So this is the corresonding refactoring as in the prototype, and is preparation for introducing table unions. --- .../LSMTree/Internal/MergeSchedule.hs | 67 +++++++++---------- src/Database/LSMTree/Internal/MergingRun.hs | 27 +++----- src/Database/LSMTree/Internal/Snapshot.hs | 8 +-- 3 files changed, 47 insertions(+), 55 deletions(-) diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index 70e273801..85cf9bd46 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -202,8 +202,8 @@ mkLevelsCache reg lvls = do foldRunAndMergeM k1 k2 ls = fmap fold $ V.forM ls $ \(Level ir rs) -> do incoming <- case ir of - Single r -> k1 r - Merging mr -> k2 mr + Single r -> k1 r + Merging _ mr -> k2 mr (incoming <>) . fold <$> V.forM rs k1 {-# SPECIALISE rebuildCache :: @@ -283,7 +283,7 @@ data Level m h = Level { -- | An incoming run is either a single run, or a merge. data IncomingRun m h = Single !(Ref (Run m h)) - | Merging !(Ref (MergingRun m h)) + | Merging !MergePolicyForLevel !(Ref (MergingRun m h)) mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels m h -> MergePolicyForLevel mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels @@ -329,16 +329,16 @@ duplicateIncomingRun :: duplicateIncomingRun reg (Single r) = Single <$> allocateTemp reg (dupRef r) releaseRef -duplicateIncomingRun reg (Merging mr) = - Merging <$> allocateTemp reg (dupRef mr) releaseRef +duplicateIncomingRun reg (Merging mp mr) = + Merging mp <$> allocateTemp reg (dupRef mr) releaseRef {-# SPECIALISE releaseIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO () #-} releaseIncomingRun :: (PrimMonad m, MonadMask m, MonadMVar m) => TempRegistry m -> IncomingRun m h -> m () -releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r) -releaseIncomingRun reg (Merging mr) = freeTemp reg (releaseRef mr) +releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r) +releaseIncomingRun reg (Merging _ mr) = freeTemp reg (releaseRef mr) {-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-} iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m () @@ -618,8 +618,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h)) expectCompletedMerge ln ir = do r <- case ir of - Single r -> pure r - Merging mr -> do + Single r -> pure r + Merging _ mr -> do r <- allocateTemp reg (MR.expectCompleted mr) releaseRef freeTemp reg (releaseRef mr) pure r @@ -656,7 +656,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = -- The runs will end up inside the merging run, with fresh references. -- The original references can be released (but only on the happy path). mr <- allocateTemp reg - (MR.new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths rs) + (MR.new hfs hbio resolve caching alloc mergeLevel runPaths rs) releaseRef V.forM_ rs $ \r -> freeTemp reg (releaseRef r) case confMergeSchedule of @@ -671,7 +671,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = traceWith tr $ AtLevel ln $ TraceCompletedMerge (Run.size r) (Run.runFsPathsNumber r) - return (Merging mr) + return (Merging mergePolicy mr) -- $setup -- >>> import Database.LSMTree.Internal.Entry @@ -790,8 +790,8 @@ supplyCredits conf c levels = iforLevelM_ levels $ \ln (Level ir _rs) -> case ir of Single{} -> pure () - Merging mr -> do - let !c' = scaleCreditsForMerge mr c + Merging mp mr -> do + let !c' = scaleCreditsForMerge mp mr c let !thresh = creditThresholdForLevel conf ln MR.supplyCredits c' thresh mr @@ -801,28 +801,27 @@ supplyCredits conf c levels = -- Initially, 1 update supplies 1 credit. However, since merging runs have -- different numbers of input runs/entries, we may have to a more or less -- merging work than 1 merge step for each credit. -scaleCreditsForMerge :: Ref (MergingRun m h) -> Credits -> MR.Credits +scaleCreditsForMerge :: MergePolicyForLevel -> Ref (MergingRun m h) -> Credits -> MR.Credits -- A single run is a trivially completed merge, so it requires no credits. -scaleCreditsForMerge (DeRef mr) (Credits c) = - case MR.mergePolicy mr of - LevelTiering -> - -- A tiering merge has 5 runs at most (one could be held back to merged - -- again) and must be completed before the level is full (once 4 more - -- runs come in). - MR.Credits (c * (1 + 4)) - LevelLevelling -> - -- A levelling merge has 1 input run and one resident run, which is (up - -- to) 4x bigger than the others. It needs to be completed before - -- another run comes in. - -- TODO: this is currently assuming a naive worst case, where the - -- resident run is as large as it can be for the current level. We - -- probably have enough information available here to lower the - -- worst-case upper bound by looking at the sizes of the input runs. - -- As as result, merge work would/could be more evenly distributed over - -- time when the resident run is smaller than the worst case. - let NumRuns n = MR.mergeNumRuns mr - -- same as division rounding up: ceiling (c * n / 4) - in MR.Credits ((c * n + 3) `div` 4) +scaleCreditsForMerge LevelTiering _ (Credits c) = + -- A tiering merge has 5 runs at most (one could be held back to merged + -- again) and must be completed before the level is full (once 4 more + -- runs come in). + MR.Credits (c * (1 + 4)) + +scaleCreditsForMerge LevelLevelling (DeRef mr) (Credits c) = + -- A levelling merge has 1 input run and one resident run, which is (up + -- to) 4x bigger than the others. It needs to be completed before + -- another run comes in. + -- TODO: this is currently assuming a naive worst case, where the + -- resident run is as large as it can be for the current level. We + -- probably have enough information available here to lower the + -- worst-case upper bound by looking at the sizes of the input runs. + -- As as result, merge work would/could be more evenly distributed over + -- time when the resident run is smaller than the worst case. + let NumRuns n = MR.mergeNumRuns mr + -- same as division rounding up: ceiling (c * n / 4) + in MR.Credits ((c * n + 3) `div` 4) -- TODO: the thresholds for doing merge work should be different for each level, -- maybe co-prime? diff --git a/src/Database/LSMTree/Internal/MergingRun.hs b/src/Database/LSMTree/Internal/MergingRun.hs index 214411ef0..cf95aff51 100644 --- a/src/Database/LSMTree/Internal/MergingRun.hs +++ b/src/Database/LSMTree/Internal/MergingRun.hs @@ -50,8 +50,7 @@ import System.FS.API (HasFS) import System.FS.BlockIO.API (HasBlockIO) data MergingRun m h = MergingRun { - mergePolicy :: !MergePolicyForLevel - , mergeNumRuns :: !NumRuns + mergeNumRuns :: !NumRuns -- | Sum of number of entries in the input runs , mergeNumEntries :: !NumEntries -- | The number of currently /unspent/ credits @@ -118,7 +117,6 @@ instance NFData MergeKnownCompleted where -> Run.RunDataCaching -> RunBloomFilterAlloc -> Merge.Level - -> MergePolicyForLevel -> RunFsPaths -> V.Vector (Ref (Run IO h)) -> IO (Ref (MergingRun IO h)) #-} @@ -137,11 +135,10 @@ new :: -> Run.RunDataCaching -> RunBloomFilterAlloc -> Merge.Level - -> MergePolicyForLevel -> RunFsPaths -> V.Vector (Ref (Run m h)) -> m (Ref (MergingRun m h)) -new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns = +new hfs hbio resolve caching alloc mergeLevel runPaths inputRuns = -- If creating the Merge fails, we must release the references again. withTempRegistry $ \reg -> do runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns @@ -150,12 +147,11 @@ new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns = let numInputRuns = NumRuns $ V.length runs let numInputEntries = V.foldMap' Run.size runs spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0 - unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $ + unsafeNew numInputRuns numInputEntries MergeMaybeCompleted $ OngoingMerge runs spentCreditsVar merge {-# SPECIALISE newCompleted :: - MergePolicyForLevel - -> NumRuns + NumRuns -> NumEntries -> Ref (Run IO h) -> IO (Ref (MergingRun IO h)) #-} @@ -168,26 +164,24 @@ new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns = -- failing after internal resources have already been created. newCompleted :: (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) - => MergePolicyForLevel - -> NumRuns + => NumRuns -> NumEntries -> Ref (Run m h) -> m (Ref (MergingRun m h)) -newCompleted mergePolicy numInputRuns numInputEntries inputRun = do +newCompleted numInputRuns numInputEntries inputRun = do bracketOnError (dupRef inputRun) releaseRef $ \run -> - unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $ + unsafeNew numInputRuns numInputEntries MergeKnownCompleted $ CompletedMerge run {-# INLINE unsafeNew #-} unsafeNew :: (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) - => MergePolicyForLevel - -> NumRuns + => NumRuns -> NumEntries -> MergeKnownCompleted -> MergingRunState m h -> m (Ref (MergingRun m h)) -unsafeNew mergePolicy mergeNumRuns mergeNumEntries knownCompleted state = do +unsafeNew mergeNumRuns mergeNumEntries knownCompleted state = do mergeUnspentCredits <- UnspentCreditsVar <$> newPrimVar 0 mergeStepsPerformed <- TotalStepsVar <$> newPrimVar 0 case state of @@ -197,8 +191,7 @@ unsafeNew mergePolicy mergeNumRuns mergeNumEntries knownCompleted state = do mergeState <- newMVar $! state newRef (finalise mergeState) $ \mergeRefCounter -> MergingRun { - mergePolicy - , mergeNumRuns + mergeNumRuns , mergeNumEntries , mergeUnspentCredits , mergeStepsPerformed diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index 4d678ffb3..c6307318f 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -185,7 +185,7 @@ toSnapIncomingRun (Single r) = pure (SnapSingleRun r) -- We need to know how many credits were yet unspent so we can restore merge -- work on snapshot load. No need to snapshot the contents of totalStepsVar -- here, since we still start counting from 0 again when loading the snapshot. -toSnapIncomingRun (Merging (DeRef MR.MergingRun {..})) = do +toSnapIncomingRun (Merging mergePolicy (DeRef MR.MergingRun {..})) = do unspentCredits <- readPrimVar (MR.getUnspentCreditsVar mergeUnspentCredits) smrs <- withMVar mergeState $ \mrs -> toSnapMergingRunState mrs pure $ @@ -345,14 +345,14 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve fromSnapIncomingRun (SnapSingleRun run) = do Single <$> dupRun run fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do - Merging <$> case smrs of + Merging mpfl <$> case smrs of SnapCompletedMerge run -> - allocateTemp reg (MR.newCompleted mpfl nr ne run) releaseRef + allocateTemp reg (MR.newCompleted nr ne run) releaseRef SnapOngoingMerge runs spentCredits lvl -> do rn <- uniqueToRunNumber <$> incrUniqCounter uc mr <- allocateTemp reg - (MR.new hfs hbio resolve caching alloc lvl mpfl (mkPath rn) runs) + (MR.new hfs hbio resolve caching alloc lvl (mkPath rn) runs) releaseRef -- When a snapshot is created, merge progress is lost, so we -- have to redo merging work here. UnspentCredits and