Skip to content

Commit

Permalink
Move MergePolicyForLevel from MergingRun to IncomingRun
Browse files Browse the repository at this point in the history
As part of the changes to the scheduled merges prototype to introduce
table unions, this refactoring was done. This is because the merging run
gets reused in tree merges, where the merging policy makes no sense. So
we move it up one level into something that's specific to the levels
(the incomming run).

So this is the corresonding refactoring as in the prototype, and is
preparation for introducing table unions.
  • Loading branch information
dcoutts committed Dec 19, 2024
1 parent 42711c5 commit ba1f16f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 55 deletions.
67 changes: 33 additions & 34 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ mkLevelsCache reg lvls = do
foldRunAndMergeM k1 k2 ls =
fmap fold $ V.forM ls $ \(Level ir rs) -> do
incoming <- case ir of
Single r -> k1 r
Merging mr -> k2 mr
Single r -> k1 r
Merging _ mr -> k2 mr
(incoming <>) . fold <$> V.forM rs k1

{-# SPECIALISE rebuildCache ::
Expand Down Expand Up @@ -283,7 +283,7 @@ data Level m h = Level {
-- | An incoming run is either a single run, or a merge.
data IncomingRun m h =
Single !(Ref (Run m h))
| Merging !(Ref (MergingRun m h))
| Merging !MergePolicyForLevel !(Ref (MergingRun m h))

mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels m h -> MergePolicyForLevel
mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels
Expand Down Expand Up @@ -329,16 +329,16 @@ duplicateIncomingRun ::
duplicateIncomingRun reg (Single r) =
Single <$> allocateTemp reg (dupRef r) releaseRef

duplicateIncomingRun reg (Merging mr) =
Merging <$> allocateTemp reg (dupRef mr) releaseRef
duplicateIncomingRun reg (Merging mp mr) =
Merging mp <$> allocateTemp reg (dupRef mr) releaseRef

{-# SPECIALISE releaseIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO () #-}
releaseIncomingRun ::
(PrimMonad m, MonadMask m, MonadMVar m)
=> TempRegistry m
-> IncomingRun m h -> m ()
releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r)
releaseIncomingRun reg (Merging mr) = freeTemp reg (releaseRef mr)
releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r)
releaseIncomingRun reg (Merging _ mr) = freeTemp reg (releaseRef mr)

{-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-}
iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
Expand Down Expand Up @@ -618,8 +618,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h))
expectCompletedMerge ln ir = do
r <- case ir of
Single r -> pure r
Merging mr -> do
Single r -> pure r
Merging _ mr -> do
r <- allocateTemp reg (MR.expectCompleted mr) releaseRef
freeTemp reg (releaseRef mr)
pure r
Expand Down Expand Up @@ -656,7 +656,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
-- The runs will end up inside the merging run, with fresh references.
-- The original references can be released (but only on the happy path).
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths rs)
(MR.new hfs hbio resolve caching alloc mergeLevel runPaths rs)
releaseRef
V.forM_ rs $ \r -> freeTemp reg (releaseRef r)
case confMergeSchedule of
Expand All @@ -671,7 +671,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
traceWith tr $ AtLevel ln $
TraceCompletedMerge (Run.size r) (Run.runFsPathsNumber r)

return (Merging mr)
return (Merging mergePolicy mr)

-- $setup
-- >>> import Database.LSMTree.Internal.Entry
Expand Down Expand Up @@ -790,8 +790,8 @@ supplyCredits conf c levels =
iforLevelM_ levels $ \ln (Level ir _rs) ->
case ir of
Single{} -> pure ()
Merging mr -> do
let !c' = scaleCreditsForMerge mr c
Merging mp mr -> do
let !c' = scaleCreditsForMerge mp mr c
let !thresh = creditThresholdForLevel conf ln
MR.supplyCredits c' thresh mr

Expand All @@ -801,28 +801,27 @@ supplyCredits conf c levels =
-- Initially, 1 update supplies 1 credit. However, since merging runs have
-- different numbers of input runs/entries, we may have to a more or less
-- merging work than 1 merge step for each credit.
scaleCreditsForMerge :: Ref (MergingRun m h) -> Credits -> MR.Credits
scaleCreditsForMerge :: MergePolicyForLevel -> Ref (MergingRun m h) -> Credits -> MR.Credits
-- A single run is a trivially completed merge, so it requires no credits.
scaleCreditsForMerge (DeRef mr) (Credits c) =
case MR.mergePolicy mr of
LevelTiering ->
-- A tiering merge has 5 runs at most (one could be held back to merged
-- again) and must be completed before the level is full (once 4 more
-- runs come in).
MR.Credits (c * (1 + 4))
LevelLevelling ->
-- A levelling merge has 1 input run and one resident run, which is (up
-- to) 4x bigger than the others. It needs to be completed before
-- another run comes in.
-- TODO: this is currently assuming a naive worst case, where the
-- resident run is as large as it can be for the current level. We
-- probably have enough information available here to lower the
-- worst-case upper bound by looking at the sizes of the input runs.
-- As as result, merge work would/could be more evenly distributed over
-- time when the resident run is smaller than the worst case.
let NumRuns n = MR.mergeNumRuns mr
-- same as division rounding up: ceiling (c * n / 4)
in MR.Credits ((c * n + 3) `div` 4)
scaleCreditsForMerge LevelTiering _ (Credits c) =
-- A tiering merge has 5 runs at most (one could be held back to merged
-- again) and must be completed before the level is full (once 4 more
-- runs come in).
MR.Credits (c * (1 + 4))

scaleCreditsForMerge LevelLevelling (DeRef mr) (Credits c) =
-- A levelling merge has 1 input run and one resident run, which is (up
-- to) 4x bigger than the others. It needs to be completed before
-- another run comes in.
-- TODO: this is currently assuming a naive worst case, where the
-- resident run is as large as it can be for the current level. We
-- probably have enough information available here to lower the
-- worst-case upper bound by looking at the sizes of the input runs.
-- As as result, merge work would/could be more evenly distributed over
-- time when the resident run is smaller than the worst case.
let NumRuns n = MR.mergeNumRuns mr
-- same as division rounding up: ceiling (c * n / 4)
in MR.Credits ((c * n + 3) `div` 4)

-- TODO: the thresholds for doing merge work should be different for each level,
-- maybe co-prime?
Expand Down
27 changes: 10 additions & 17 deletions src/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ import System.FS.API (HasFS)
import System.FS.BlockIO.API (HasBlockIO)

data MergingRun m h = MergingRun {
mergePolicy :: !MergePolicyForLevel
, mergeNumRuns :: !NumRuns
mergeNumRuns :: !NumRuns
-- | Sum of number of entries in the input runs
, mergeNumEntries :: !NumEntries
-- | The number of currently /unspent/ credits
Expand Down Expand Up @@ -118,7 +117,6 @@ instance NFData MergeKnownCompleted where
-> Run.RunDataCaching
-> RunBloomFilterAlloc
-> Merge.Level
-> MergePolicyForLevel
-> RunFsPaths
-> V.Vector (Ref (Run IO h))
-> IO (Ref (MergingRun IO h)) #-}
Expand All @@ -137,11 +135,10 @@ new ::
-> Run.RunDataCaching
-> RunBloomFilterAlloc
-> Merge.Level
-> MergePolicyForLevel
-> RunFsPaths
-> V.Vector (Ref (Run m h))
-> m (Ref (MergingRun m h))
new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns =
new hfs hbio resolve caching alloc mergeLevel runPaths inputRuns =
-- If creating the Merge fails, we must release the references again.
withTempRegistry $ \reg -> do
runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns
Expand All @@ -150,12 +147,11 @@ new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns =
let numInputRuns = NumRuns $ V.length runs
let numInputEntries = V.foldMap' Run.size runs
spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
unsafeNew numInputRuns numInputEntries MergeMaybeCompleted $
OngoingMerge runs spentCreditsVar merge

{-# SPECIALISE newCompleted ::
MergePolicyForLevel
-> NumRuns
NumRuns
-> NumEntries
-> Ref (Run IO h)
-> IO (Ref (MergingRun IO h)) #-}
Expand All @@ -168,26 +164,24 @@ new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns =
-- failing after internal resources have already been created.
newCompleted ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
-> NumRuns
=> NumRuns
-> NumEntries
-> Ref (Run m h)
-> m (Ref (MergingRun m h))
newCompleted mergePolicy numInputRuns numInputEntries inputRun = do
newCompleted numInputRuns numInputEntries inputRun = do
bracketOnError (dupRef inputRun) releaseRef $ \run ->
unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $
unsafeNew numInputRuns numInputEntries MergeKnownCompleted $
CompletedMerge run

{-# INLINE unsafeNew #-}
unsafeNew ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
-> NumRuns
=> NumRuns
-> NumEntries
-> MergeKnownCompleted
-> MergingRunState m h
-> m (Ref (MergingRun m h))
unsafeNew mergePolicy mergeNumRuns mergeNumEntries knownCompleted state = do
unsafeNew mergeNumRuns mergeNumEntries knownCompleted state = do
mergeUnspentCredits <- UnspentCreditsVar <$> newPrimVar 0
mergeStepsPerformed <- TotalStepsVar <$> newPrimVar 0
case state of
Expand All @@ -197,8 +191,7 @@ unsafeNew mergePolicy mergeNumRuns mergeNumEntries knownCompleted state = do
mergeState <- newMVar $! state
newRef (finalise mergeState) $ \mergeRefCounter ->
MergingRun {
mergePolicy
, mergeNumRuns
mergeNumRuns
, mergeNumEntries
, mergeUnspentCredits
, mergeStepsPerformed
Expand Down
8 changes: 4 additions & 4 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ toSnapIncomingRun (Single r) = pure (SnapSingleRun r)
-- We need to know how many credits were yet unspent so we can restore merge
-- work on snapshot load. No need to snapshot the contents of totalStepsVar
-- here, since we still start counting from 0 again when loading the snapshot.
toSnapIncomingRun (Merging (DeRef MR.MergingRun {..})) = do
toSnapIncomingRun (Merging mergePolicy (DeRef MR.MergingRun {..})) = do
unspentCredits <- readPrimVar (MR.getUnspentCreditsVar mergeUnspentCredits)
smrs <- withMVar mergeState $ \mrs -> toSnapMergingRunState mrs
pure $
Expand Down Expand Up @@ -345,14 +345,14 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
fromSnapIncomingRun (SnapSingleRun run) = do
Single <$> dupRun run
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do
Merging <$> case smrs of
Merging mpfl <$> case smrs of
SnapCompletedMerge run ->
allocateTemp reg (MR.newCompleted mpfl nr ne run) releaseRef
allocateTemp reg (MR.newCompleted nr ne run) releaseRef

SnapOngoingMerge runs spentCredits lvl -> do
rn <- uniqueToRunNumber <$> incrUniqCounter uc
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc lvl mpfl (mkPath rn) runs)
(MR.new hfs hbio resolve caching alloc lvl (mkPath rn) runs)
releaseRef
-- When a snapshot is created, merge progress is lost, so we
-- have to redo merging work here. UnspentCredits and
Expand Down

0 comments on commit ba1f16f

Please sign in to comment.