Skip to content

Commit

Permalink
Merge pull request #504 from IntersectMBO/dcoutts/move-mergepolicy-fr…
Browse files Browse the repository at this point in the history
…om-mergingrun-to-incomingrun

Move MergePolicyForLevel from MergingRun to IncomingRun
  • Loading branch information
dcoutts authored Dec 19, 2024
2 parents 58e4bb2 + ba1f16f commit 4ce3c9d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 55 deletions.
67 changes: 33 additions & 34 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ mkLevelsCache reg lvls = do
foldRunAndMergeM k1 k2 ls =
fmap fold $ V.forM ls $ \(Level ir rs) -> do
incoming <- case ir of
Single r -> k1 r
Merging mr -> k2 mr
Single r -> k1 r
Merging _ mr -> k2 mr
(incoming <>) . fold <$> V.forM rs k1

{-# SPECIALISE rebuildCache ::
Expand Down Expand Up @@ -283,7 +283,7 @@ data Level m h = Level {
-- | An incoming run is either a single run, or a merge.
data IncomingRun m h =
Single !(Ref (Run m h))
| Merging !(Ref (MergingRun m h))
| Merging !MergePolicyForLevel !(Ref (MergingRun m h))

mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels m h -> MergePolicyForLevel
mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels
Expand Down Expand Up @@ -329,16 +329,16 @@ duplicateIncomingRun ::
duplicateIncomingRun reg (Single r) =
Single <$> allocateTemp reg (dupRef r) releaseRef

duplicateIncomingRun reg (Merging mr) =
Merging <$> allocateTemp reg (dupRef mr) releaseRef
duplicateIncomingRun reg (Merging mp mr) =
Merging mp <$> allocateTemp reg (dupRef mr) releaseRef

{-# SPECIALISE releaseIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO () #-}
releaseIncomingRun ::
(PrimMonad m, MonadMask m, MonadMVar m)
=> TempRegistry m
-> IncomingRun m h -> m ()
releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r)
releaseIncomingRun reg (Merging mr) = freeTemp reg (releaseRef mr)
releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r)
releaseIncomingRun reg (Merging _ mr) = freeTemp reg (releaseRef mr)

{-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-}
iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
Expand Down Expand Up @@ -618,8 +618,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h))
expectCompletedMerge ln ir = do
r <- case ir of
Single r -> pure r
Merging mr -> do
Single r -> pure r
Merging _ mr -> do
r <- allocateTemp reg (MR.expectCompleted mr) releaseRef
freeTemp reg (releaseRef mr)
pure r
Expand Down Expand Up @@ -656,7 +656,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
-- The runs will end up inside the merging run, with fresh references.
-- The original references can be released (but only on the happy path).
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths rs)
(MR.new hfs hbio resolve caching alloc mergeLevel runPaths rs)
releaseRef
V.forM_ rs $ \r -> freeTemp reg (releaseRef r)
case confMergeSchedule of
Expand All @@ -671,7 +671,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
traceWith tr $ AtLevel ln $
TraceCompletedMerge (Run.size r) (Run.runFsPathsNumber r)

return (Merging mr)
return (Merging mergePolicy mr)

-- $setup
-- >>> import Database.LSMTree.Internal.Entry
Expand Down Expand Up @@ -790,8 +790,8 @@ supplyCredits conf c levels =
iforLevelM_ levels $ \ln (Level ir _rs) ->
case ir of
Single{} -> pure ()
Merging mr -> do
let !c' = scaleCreditsForMerge mr c
Merging mp mr -> do
let !c' = scaleCreditsForMerge mp mr c
let !thresh = creditThresholdForLevel conf ln
MR.supplyCredits c' thresh mr

Expand All @@ -801,28 +801,27 @@ supplyCredits conf c levels =
-- Initially, 1 update supplies 1 credit. However, since merging runs have
-- different numbers of input runs/entries, we may have to a more or less
-- merging work than 1 merge step for each credit.
scaleCreditsForMerge :: Ref (MergingRun m h) -> Credits -> MR.Credits
scaleCreditsForMerge :: MergePolicyForLevel -> Ref (MergingRun m h) -> Credits -> MR.Credits
-- A single run is a trivially completed merge, so it requires no credits.
scaleCreditsForMerge (DeRef mr) (Credits c) =
case MR.mergePolicy mr of
LevelTiering ->
-- A tiering merge has 5 runs at most (one could be held back to merged
-- again) and must be completed before the level is full (once 4 more
-- runs come in).
MR.Credits (c * (1 + 4))
LevelLevelling ->
-- A levelling merge has 1 input run and one resident run, which is (up
-- to) 4x bigger than the others. It needs to be completed before
-- another run comes in.
-- TODO: this is currently assuming a naive worst case, where the
-- resident run is as large as it can be for the current level. We
-- probably have enough information available here to lower the
-- worst-case upper bound by looking at the sizes of the input runs.
-- As as result, merge work would/could be more evenly distributed over
-- time when the resident run is smaller than the worst case.
let NumRuns n = MR.mergeNumRuns mr
-- same as division rounding up: ceiling (c * n / 4)
in MR.Credits ((c * n + 3) `div` 4)
scaleCreditsForMerge LevelTiering _ (Credits c) =
-- A tiering merge has 5 runs at most (one could be held back to merged
-- again) and must be completed before the level is full (once 4 more
-- runs come in).
MR.Credits (c * (1 + 4))

scaleCreditsForMerge LevelLevelling (DeRef mr) (Credits c) =
-- A levelling merge has 1 input run and one resident run, which is (up
-- to) 4x bigger than the others. It needs to be completed before
-- another run comes in.
-- TODO: this is currently assuming a naive worst case, where the
-- resident run is as large as it can be for the current level. We
-- probably have enough information available here to lower the
-- worst-case upper bound by looking at the sizes of the input runs.
-- As as result, merge work would/could be more evenly distributed over
-- time when the resident run is smaller than the worst case.
let NumRuns n = MR.mergeNumRuns mr
-- same as division rounding up: ceiling (c * n / 4)
in MR.Credits ((c * n + 3) `div` 4)

-- TODO: the thresholds for doing merge work should be different for each level,
-- maybe co-prime?
Expand Down
27 changes: 10 additions & 17 deletions src/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ import System.FS.API (HasFS)
import System.FS.BlockIO.API (HasBlockIO)

data MergingRun m h = MergingRun {
mergePolicy :: !MergePolicyForLevel
, mergeNumRuns :: !NumRuns
mergeNumRuns :: !NumRuns
-- | Sum of number of entries in the input runs
, mergeNumEntries :: !NumEntries
-- | The number of currently /unspent/ credits
Expand Down Expand Up @@ -118,7 +117,6 @@ instance NFData MergeKnownCompleted where
-> Run.RunDataCaching
-> RunBloomFilterAlloc
-> Merge.Level
-> MergePolicyForLevel
-> RunFsPaths
-> V.Vector (Ref (Run IO h))
-> IO (Ref (MergingRun IO h)) #-}
Expand All @@ -137,11 +135,10 @@ new ::
-> Run.RunDataCaching
-> RunBloomFilterAlloc
-> Merge.Level
-> MergePolicyForLevel
-> RunFsPaths
-> V.Vector (Ref (Run m h))
-> m (Ref (MergingRun m h))
new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns =
new hfs hbio resolve caching alloc mergeLevel runPaths inputRuns =
-- If creating the Merge fails, we must release the references again.
withTempRegistry $ \reg -> do
runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns
Expand All @@ -150,12 +147,11 @@ new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns =
let numInputRuns = NumRuns $ V.length runs
let numInputEntries = V.foldMap' Run.size runs
spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
unsafeNew numInputRuns numInputEntries MergeMaybeCompleted $
OngoingMerge runs spentCreditsVar merge

{-# SPECIALISE newCompleted ::
MergePolicyForLevel
-> NumRuns
NumRuns
-> NumEntries
-> Ref (Run IO h)
-> IO (Ref (MergingRun IO h)) #-}
Expand All @@ -168,26 +164,24 @@ new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns =
-- failing after internal resources have already been created.
newCompleted ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
-> NumRuns
=> NumRuns
-> NumEntries
-> Ref (Run m h)
-> m (Ref (MergingRun m h))
newCompleted mergePolicy numInputRuns numInputEntries inputRun = do
newCompleted numInputRuns numInputEntries inputRun = do
bracketOnError (dupRef inputRun) releaseRef $ \run ->
unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $
unsafeNew numInputRuns numInputEntries MergeKnownCompleted $
CompletedMerge run

{-# INLINE unsafeNew #-}
unsafeNew ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
-> NumRuns
=> NumRuns
-> NumEntries
-> MergeKnownCompleted
-> MergingRunState m h
-> m (Ref (MergingRun m h))
unsafeNew mergePolicy mergeNumRuns mergeNumEntries knownCompleted state = do
unsafeNew mergeNumRuns mergeNumEntries knownCompleted state = do
mergeUnspentCredits <- UnspentCreditsVar <$> newPrimVar 0
mergeStepsPerformed <- TotalStepsVar <$> newPrimVar 0
case state of
Expand All @@ -197,8 +191,7 @@ unsafeNew mergePolicy mergeNumRuns mergeNumEntries knownCompleted state = do
mergeState <- newMVar $! state
newRef (finalise mergeState) $ \mergeRefCounter ->
MergingRun {
mergePolicy
, mergeNumRuns
mergeNumRuns
, mergeNumEntries
, mergeUnspentCredits
, mergeStepsPerformed
Expand Down
8 changes: 4 additions & 4 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ toSnapIncomingRun (Single r) = pure (SnapSingleRun r)
-- We need to know how many credits were yet unspent so we can restore merge
-- work on snapshot load. No need to snapshot the contents of totalStepsVar
-- here, since we still start counting from 0 again when loading the snapshot.
toSnapIncomingRun (Merging (DeRef MR.MergingRun {..})) = do
toSnapIncomingRun (Merging mergePolicy (DeRef MR.MergingRun {..})) = do
unspentCredits <- readPrimVar (MR.getUnspentCreditsVar mergeUnspentCredits)
smrs <- withMVar mergeState $ \mrs -> toSnapMergingRunState mrs
pure $
Expand Down Expand Up @@ -345,14 +345,14 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
fromSnapIncomingRun (SnapSingleRun run) = do
Single <$> dupRun run
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do
Merging <$> case smrs of
Merging mpfl <$> case smrs of
SnapCompletedMerge run ->
allocateTemp reg (MR.newCompleted mpfl nr ne run) releaseRef
allocateTemp reg (MR.newCompleted nr ne run) releaseRef

SnapOngoingMerge runs spentCredits lvl -> do
rn <- uniqueToRunNumber <$> incrUniqCounter uc
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc lvl mpfl (mkPath rn) runs)
(MR.new hfs hbio resolve caching alloc lvl (mkPath rn) runs)
releaseRef
-- When a snapshot is created, merge progress is lost, so we
-- have to redo merging work here. UnspentCredits and
Expand Down

0 comments on commit 4ce3c9d

Please sign in to comment.