Skip to content

Commit

Permalink
Merge pull request #478 from IntersectMBO/wenkokke/issue392-2
Browse files Browse the repository at this point in the history
Persist write buffer when creating a snapshot
  • Loading branch information
wenkokke authored Jan 2, 2025
2 parents 3ac3bfa + 1efe0e0 commit 840c26a
Show file tree
Hide file tree
Showing 19 changed files with 874 additions and 127 deletions.
9 changes: 8 additions & 1 deletion doc/format-directory.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ Each snapshot contains a metadata file for the snapshot overall and a checksum:
* `${session}/snapshots/${name}/snapshot`
* `${session}/snapshots/${name}/snapshot.checksum`

plus three files for the serialised write buffer:

* `${session}/snapshots/${name}/${n}.keyops`
* `${session}/snapshots/${name}/${n}.blobs`
* `${session}/snapshots/${name}/${n}.checksum`

plus the five files for each LSM run in the snapshot:

* `${session}/snapshots/${name}/${n}.keyops`
Expand All @@ -85,7 +91,7 @@ plus the five files for each LSM run in the snapshot:
* `${session}/snapshots/${name}/${n}.index`
* `${session}/snapshots/${name}/${n}.checksum`

In this case the LSM run files are numbered from 0 within each snapshot.
In this case the serialised write buffer and the LSM run files are numbered from 0 within each snapshot.

## Snapshot metadata

Expand All @@ -97,6 +103,7 @@ The snapshot metadata file contains the following information:
- Page size for all the key/operations files
- Index type and parameters
* Key/value type information for dynamic-type sanity checking
* The number of the serialised write buffer files (should always be 0).
* The shape of the LSM tree overall (runs within levels etc), referencing
the numbered LSM runs.

Expand Down
2 changes: 2 additions & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ library
Database.LSMTree.Internal.Vector.Growing
Database.LSMTree.Internal.WriteBuffer
Database.LSMTree.Internal.WriteBufferBlobs
Database.LSMTree.Internal.WriteBufferReader
Database.LSMTree.Internal.WriteBufferWriter
Database.LSMTree.Monoidal
Database.LSMTree.Normal

Expand Down
52 changes: 51 additions & 1 deletion src-extras/Database/LSMTree/Extras/RunData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ module Database.LSMTree.Extras.RunData (
withRun
, withRuns
, unsafeFlushAsWriteBuffer
-- * Serialise write buffers
, withRunDataAsWriteBuffer
, withSerialisedWriteBuffer
-- * RunData
, RunData (..)
, SerialisedRunData
Expand All @@ -20,18 +23,24 @@ module Database.LSMTree.Extras.RunData (
, liftShrink2Map
) where

import Control.Exception (bracket)
import Control.Exception (bracket, bracket_)
import Control.Monad
import Control.RefCount
import Data.Bifoldable (Bifoldable (bifoldMap))
import Data.Bifunctor
import Data.Foldable (for_)
import qualified Data.Map as M
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Vector as V
import Data.Word (Word64)
import Database.LSMTree.Extras (showPowersOf10)
import Database.LSMTree.Extras.Generators ()
import Database.LSMTree.Internal.Entry
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import Database.LSMTree.Internal.MergeSchedule (addWriteBufferEntries)
import Database.LSMTree.Internal.Paths
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Run (Run, RunDataCaching (..))
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..),
Expand All @@ -40,7 +49,10 @@ import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import Database.LSMTree.Internal.WriteBufferWriter (writeWriteBuffer)
import qualified System.FS.API as FS
import System.FS.API
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API
import Test.QuickCheck

Expand Down Expand Up @@ -96,6 +108,44 @@ unsafeFlushAsWriteBuffer fs hbio fsPaths (RunData m) = do
releaseRef wbblobs
return run

{-------------------------------------------------------------------------------
Serialise write buffers
-------------------------------------------------------------------------------}

-- | Use 'SerialisedRunData' to 'WriteBuffer' and 'WriteBufferBlobs'.
withRunDataAsWriteBuffer ::
FS.HasFS IO h
-> ResolveSerialisedValue
-> WriteBufferFsPaths
-> SerialisedRunData
-> (WB.WriteBuffer -> Ref (WBB.WriteBufferBlobs IO h) -> IO a)
-> IO a
withRunDataAsWriteBuffer hfs f fsPaths rd action = do
let es = V.fromList . M.toList $ unRunData rd
let maxn = NumEntries $ V.length es
let wbbPath = Paths.writeBufferBlobPath fsPaths
bracket (WBB.new hfs wbbPath) releaseRef $ \wbb -> do
(wb, _) <- addWriteBufferEntries hfs f wbb maxn WB.empty es
action wb wbb

-- | Serialise a 'WriteBuffer' and 'WriteBufferBlobs' to disk and perform an 'IO' action.
withSerialisedWriteBuffer ::
FS.HasFS IO h
-> FS.HasBlockIO IO h
-> WriteBufferFsPaths
-> WB.WriteBuffer
-> Ref (WBB.WriteBufferBlobs IO h)
-> IO a
-> IO a
withSerialisedWriteBuffer hfs hbio wbPaths wb wbb =
bracket_ (writeWriteBuffer hfs hbio wbPaths wb wbb) $ do
for_ [ Paths.writeBufferKOpsPath wbPaths
, Paths.writeBufferBlobPath wbPaths
, Paths.writeBufferChecksumsPath wbPaths
] $ \fsPath -> do
fsPathExists <- FS.doesFileExist hfs fsPath
when fsPathExists $ FS.removeFile hfs fsPath

{-------------------------------------------------------------------------------
RunData
-------------------------------------------------------------------------------}
Expand Down
9 changes: 3 additions & 6 deletions src/Database/LSMTree.hs
Original file line number Diff line number Diff line change
Expand Up @@ -469,21 +469,18 @@ retrieveBlobs (Internal.Session' (sesh :: Internal.Session m h)) refs =
-------------------------------------------------------------------------------}

{-# SPECIALISE createSnapshot ::
ResolveValue v
=> Common.SnapshotLabel
Common.SnapshotLabel
-> SnapshotName
-> Table IO k v b
-> IO () #-}
createSnapshot :: forall m k v b.
( IOLike m
, ResolveValue v
)
IOLike m
=> Common.SnapshotLabel
-> SnapshotName
-> Table m k v b
-> m ()
createSnapshot label snap (Internal.Table' t) =
void $ Internal.createSnapshot (resolve (Proxy @v)) snap label Internal.SnapFullTable t
void $ Internal.createSnapshot snap label Internal.SnapFullTable t

{-# SPECIALISE openSnapshot ::
ResolveValue v
Expand Down
79 changes: 32 additions & 47 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import Database.LSMTree.Internal.Config
import qualified Database.LSMTree.Internal.Cursor as Cursor
import Database.LSMTree.Internal.Entry (Entry, unNumEntries)
import Database.LSMTree.Internal.Entry (Entry)
import Database.LSMTree.Internal.Lookup (ByteCountDiscrepancy,
ResolveSerialisedValue, lookupsIO)
import Database.LSMTree.Internal.MergeSchedule
Expand Down Expand Up @@ -1126,28 +1126,26 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do
-------------------------------------------------------------------------------}

{-# SPECIALISE createSnapshot ::
ResolveSerialisedValue
-> SnapshotName
SnapshotName
-> SnapshotLabel
-> SnapshotTableType
-> Table IO h
-> IO () #-}
-- | See 'Database.LSMTree.Normal.createSnapshot''.
createSnapshot ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> SnapshotName
=> SnapshotName
-> SnapshotLabel
-> SnapshotTableType
-> Table m h
-> m ()
createSnapshot resolve snap label tableType t = do
createSnapshot snap label tableType t = do
traceWith (tableTracer t) $ TraceSnapshot snap
let conf = tableConfig t
withOpenTable t $ \thEnv ->
withTempRegistry $ \reg -> do -- TODO: use the temp registry for all side effects
let hfs = tableHasFS thEnv
let hfs = tableHasFS thEnv
hbio = tableHasBlockIO thEnv
uc = tableSessionUniqCounter thEnv

-- Guard that the snapshot does not exist already
let snapDir = Paths.namedSnapshotDir (tableSessionRoot thEnv) snap
Expand All @@ -1162,46 +1160,31 @@ createSnapshot resolve snap label tableType t = do
(FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir))
(\_ -> FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir))

-- For the temporary implementation it is okay to just flush the buffer
-- before taking the snapshot.
content <- modifyWithTempRegistry
(RW.unsafeAcquireWriteAccess (tableContent thEnv))
(atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv))
$ \innerReg content -> do
-- TODO: When we flush the buffer here, it might be underfull, which
-- could mess up the scheduling. The conservative approach is to supply
-- credits as if the buffer was full, and then flush the (possibly)
-- underfull buffer. However, note that this bit of code
-- here is probably going to change anyway because of #392
let credits = case confWriteBufferAlloc conf of
AllocNumEntries n -> Credits (unNumEntries n)
supplyCredits conf credits (tableLevels content)
content' <- flushWriteBuffer
(TraceMerge `contramap` tableTracer t)
conf
resolve
hfs
(tableHasBlockIO thEnv)
(tableSessionRoot thEnv)
(tableSessionUniqCounter thEnv)
innerReg
content
pure (content', content')
-- At this point, we've flushed the write buffer but we haven't created the
-- snapshot file yet. If an asynchronous exception happens beyond this
-- point, we'll take that loss, as the inner state of the table is still
-- consistent.
-- Duplicate references to the table content, so that resources do not disappear
-- from under our feet while taking a snapshot. These references are released
-- again after the snapshot files/directories are written.
content <- RW.withReadAccess (tableContent thEnv) (duplicateTableContent reg)

-- Snapshot the write buffer.
let activeDir = Paths.activeDir (tableSessionRoot thEnv)
let wb = tableWriteBuffer content
let wbb = tableWriteBufferBlobs content
snapWriteBufferNumber <- Paths.writeBufferNumber <$> snapshotWriteBuffer reg hfs hbio uc activeDir snapDir wb wbb

-- Convert to snapshot format
snapLevels <- toSnapLevels (tableLevels content)

-- Hard link runs into the named snapshot directory
snapLevels' <- snapshotRuns reg hbio snapDir snapLevels

let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapLevels'
let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBufferNumber snapLevels'
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData

-- Release the table content
releaseTableContent reg content

{-# SPECIALISE openSnapshot ::
Session IO h
-> SnapshotLabel
Expand All @@ -1226,6 +1209,7 @@ openSnapshot sesh label tableType override snap resolve = do
withTempRegistry $ \reg -> do
let hfs = sessionHasFS seshEnv
hbio = sessionHasBlockIO seshEnv
uc = sessionUniqCounter seshEnv

-- Guard that the snapshot exists
let snapDir = Paths.namedSnapshotDir (sessionRoot seshEnv) snap
Expand All @@ -1238,7 +1222,7 @@ openSnapshot sesh label tableType override snap resolve = do
Left e -> throwIO (ErrSnapshotDeserialiseFailure e snap)
Right x -> pure x

let SnapshotMetaData label' tableType' conf snapLevels = snapMetaData
let SnapshotMetaData label' tableType' conf snapWriteBuffer snapLevels = snapMetaData

unless (tableType == tableType') $
throwIO (ErrSnapshotWrongTableType snap tableType tableType')
Expand All @@ -1248,22 +1232,23 @@ openSnapshot sesh label tableType override snap resolve = do

let conf' = applyOverride override conf
am <- newArenaManager
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
incrUniqCounter (sessionUniqCounter seshEnv)
tableWriteBufferBlobs <- allocateTemp reg (WBB.new hfs blobpath)
releaseRef

let actDir = Paths.activeDir (sessionRoot seshEnv)
let activeDir = Paths.activeDir (sessionRoot seshEnv)

-- Read write buffer
let snapWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer
(tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths

-- Hard link runs into the active directory,
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir activeDir snapLevels

-- Convert from the snapshot format, restoring merge progress in the process
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels'
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve activeDir snapLevels'
releaseRuns reg snapLevels'

tableCache <- mkLevelsCache reg tableLevels
newWith reg sesh seshEnv conf' am $! TableContent {
tableWriteBuffer = WB.empty
tableWriteBuffer
, tableWriteBufferBlobs
, tableLevels
, tableCache
Expand Down
4 changes: 3 additions & 1 deletion src/Database/LSMTree/Internal/BlobFile.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Database.LSMTree.Internal.Serialise (SerialisedBlob (..))
import qualified System.FS.API as FS
import System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS
import System.FS.CallStack (HasCallStack)

-- | A handle to a file containing blobs.
--
Expand Down Expand Up @@ -50,9 +51,10 @@ instance NFData BlobSpan where

-- | Open the given file to make a 'BlobFile'. The finaliser will close and
-- delete the file.
{-# SPECIALISE openBlobFile :: HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-}
{-# SPECIALISE openBlobFile :: HasCallStack => HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-}
openBlobFile ::
PrimMonad m
=> HasCallStack
=> HasFS m h
-> FS.FsPath
-> FS.OpenMode
Expand Down
2 changes: 2 additions & 0 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ module Database.LSMTree.Internal.MergeSchedule (
, Credits (..)
, supplyCredits
, creditThresholdForLevel
-- * Exported for testing
, addWriteBufferEntries
) where

import Control.Concurrent.Class.MonadMVar.Strict
Expand Down
Loading

0 comments on commit 840c26a

Please sign in to comment.