Skip to content

Commit

Permalink
fix(issue392): process feedback on PR
Browse files Browse the repository at this point in the history
Co-authored-by: Joris Dral <[email protected]>
  • Loading branch information
wenkokke and jorisdral committed Jan 2, 2025
1 parent dbd0bcd commit 1efe0e0
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 249 deletions.
9 changes: 8 additions & 1 deletion doc/format-directory.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ Each snapshot contains a metadata file for the snapshot overall and a checksum:
* `${session}/snapshots/${name}/snapshot`
* `${session}/snapshots/${name}/snapshot.checksum`

plus three files for the serialised write buffer:

* `${session}/snapshots/${name}/${n}.keyops`
* `${session}/snapshots/${name}/${n}.blobs`
* `${session}/snapshots/${name}/${n}.checksum`

plus the five files for each LSM run in the snapshot:

* `${session}/snapshots/${name}/${n}.keyops`
Expand All @@ -85,7 +91,7 @@ plus the five files for each LSM run in the snapshot:
* `${session}/snapshots/${name}/${n}.index`
* `${session}/snapshots/${name}/${n}.checksum`

In this case the LSM run files are numbered from 0 within each snapshot.
In this case the serialised write buffer and the LSM run files are numbered from 0 within each snapshot.

## Snapshot metadata

Expand All @@ -97,6 +103,7 @@ The snapshot metadata file contains the following information:
- Page size for all the key/operations files
- Index type and parameters
* Key/value type information for dynamic-type sanity checking
* The number of the serialised write buffer files (should always be 0).
* The shape of the LSM tree overall (runs within levels etc), referencing
the numbered LSM runs.

Expand Down
52 changes: 51 additions & 1 deletion src-extras/Database/LSMTree/Extras/RunData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ module Database.LSMTree.Extras.RunData (
withRun
, withRuns
, unsafeFlushAsWriteBuffer
-- * Serialise write buffers
, withRunDataAsWriteBuffer
, withSerialisedWriteBuffer
-- * RunData
, RunData (..)
, SerialisedRunData
Expand All @@ -20,18 +23,24 @@ module Database.LSMTree.Extras.RunData (
, liftShrink2Map
) where

import Control.Exception (bracket)
import Control.Exception (bracket, bracket_)
import Control.Monad
import Control.RefCount
import Data.Bifoldable (Bifoldable (bifoldMap))
import Data.Bifunctor
import Data.Foldable (for_)
import qualified Data.Map as M
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Vector as V
import Data.Word (Word64)
import Database.LSMTree.Extras (showPowersOf10)
import Database.LSMTree.Extras.Generators ()
import Database.LSMTree.Internal.Entry
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import Database.LSMTree.Internal.MergeSchedule (addWriteBufferEntries)
import Database.LSMTree.Internal.Paths
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Run (Run, RunDataCaching (..))
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..),
Expand All @@ -40,7 +49,10 @@ import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import Database.LSMTree.Internal.WriteBufferWriter (writeWriteBuffer)
import qualified System.FS.API as FS
import System.FS.API
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API
import Test.QuickCheck

Expand Down Expand Up @@ -96,6 +108,44 @@ unsafeFlushAsWriteBuffer fs hbio fsPaths (RunData m) = do
releaseRef wbblobs
return run

{-------------------------------------------------------------------------------
Serialise write buffers
-------------------------------------------------------------------------------}

-- | Use 'SerialisedRunData' to 'WriteBuffer' and 'WriteBufferBlobs'.
withRunDataAsWriteBuffer ::
FS.HasFS IO h
-> ResolveSerialisedValue
-> WriteBufferFsPaths
-> SerialisedRunData
-> (WB.WriteBuffer -> Ref (WBB.WriteBufferBlobs IO h) -> IO a)
-> IO a
withRunDataAsWriteBuffer hfs f fsPaths rd action = do
let es = V.fromList . M.toList $ unRunData rd
let maxn = NumEntries $ V.length es
let wbbPath = Paths.writeBufferBlobPath fsPaths
bracket (WBB.new hfs wbbPath) releaseRef $ \wbb -> do
(wb, _) <- addWriteBufferEntries hfs f wbb maxn WB.empty es
action wb wbb

-- | Serialise a 'WriteBuffer' and 'WriteBufferBlobs' to disk and perform an 'IO' action.
withSerialisedWriteBuffer ::
FS.HasFS IO h
-> FS.HasBlockIO IO h
-> WriteBufferFsPaths
-> WB.WriteBuffer
-> Ref (WBB.WriteBufferBlobs IO h)
-> IO a
-> IO a
withSerialisedWriteBuffer hfs hbio wbPaths wb wbb =
bracket_ (writeWriteBuffer hfs hbio wbPaths wb wbb) $ do
for_ [ Paths.writeBufferKOpsPath wbPaths
, Paths.writeBufferBlobPath wbPaths
, Paths.writeBufferChecksumsPath wbPaths
] $ \fsPath -> do
fsPathExists <- FS.doesFileExist hfs fsPath
when fsPathExists $ FS.removeFile hfs fsPath

{-------------------------------------------------------------------------------
RunData
-------------------------------------------------------------------------------}
Expand Down
13 changes: 8 additions & 5 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,8 +1160,10 @@ createSnapshot snap label tableType t = do
(FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir))
(\_ -> FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir))

-- Get the table content.
content <- RW.withReadAccess (tableContent thEnv) pure
-- Duplicate references to the table content, so that resources do not disappear
-- from under our feet while taking a snapshot. These references are released
-- again after the snapshot files/directories are written.
content <- RW.withReadAccess (tableContent thEnv) (duplicateTableContent reg)

-- Snapshot the write buffer.
let activeDir = Paths.activeDir (tableSessionRoot thEnv)
Expand All @@ -1180,6 +1182,9 @@ createSnapshot snap label tableType t = do
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData

-- Release the table content
releaseTableContent reg content

{-# SPECIALISE openSnapshot ::
Session IO h
-> SnapshotLabel
Expand Down Expand Up @@ -1231,10 +1236,8 @@ openSnapshot sesh label tableType override snap resolve = do
let activeDir = Paths.activeDir (sessionRoot seshEnv)

-- Read write buffer
activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc
let activeWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getActiveDir activeDir) activeWriteBufferNumber
let snapWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer
(tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths
(tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths

-- Hard link runs into the active directory,
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir activeDir snapLevels
Expand Down
12 changes: 0 additions & 12 deletions src/Database/LSMTree/Internal/Entry.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module Database.LSMTree.Internal.Entry (
, hasBlob
, onValue
, onBlobRef
, traverseBlobRef
, NumEntries (..)
, unNumEntries
-- * Value resolution/merging
Expand Down Expand Up @@ -48,17 +47,6 @@ onBlobRef def g = \case
Mupdate{} -> def
Delete -> def

traverseBlobRef ::
Applicative t
=> (blobref -> t blobref')
-> Entry v blobref
-> t (Entry v blobref')
traverseBlobRef f = \case
Insert v -> pure (Insert v)
InsertWithBlob v blobref -> InsertWithBlob v <$> f blobref
Mupdate v -> pure (Mupdate v)
Delete -> pure Delete

instance Bifunctor Entry where
first f = \case
Insert v -> Insert (f v)
Expand Down
66 changes: 19 additions & 47 deletions src/Database/LSMTree/Internal/Paths.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ module Database.LSMTree.Internal.Paths (
, toChecksumsFile
, fromChecksumsFile
-- * Checksums for WriteBuffer files
, checksumFileNamesForWriteBufferFiles
, toChecksumsFileForWriteBufferFiles
, fromChecksumsFileForWriteBufferFiles
-- * ForRunFiles abstraction
, ForKOps (..)
, ForBlob (..)
Expand All @@ -45,22 +43,18 @@ module Database.LSMTree.Internal.Paths (
, forRunIndexRaw
-- * WriteBuffer paths
, WriteBufferFsPaths (WrapRunFsPaths, WriteBufferFsPaths, writeBufferDir, writeBufferNumber)
, pathsForWriteBufferFiles
, writeBufferKOpsPath
, writeBufferBlobPath
, writeBufferChecksumsPath
, writeBufferFilePathWithExt
-- * ForWriteBufferFiles abstraction
, ForWriteBufferFiles (..)
, forWriteBufferKOpsRaw
, forWriteBufferBlobRaw
, writeBufferFileExts
) where

import Control.Applicative (Applicative (..))
import Control.DeepSeq (NFData (..))
import Data.Bifunctor (Bifunctor (..))
import qualified Data.ByteString.Char8 as BS
import Data.Foldable (toList)
import Data.Function ((&))
import qualified Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.String (IsString (..))
Expand Down Expand Up @@ -228,7 +222,7 @@ runFileExts = ForRunFiles {
}

{-------------------------------------------------------------------------------
Checksums
Checksums For Run Files
-------------------------------------------------------------------------------}

checksumFileNamesForRunFiles :: ForRunFiles CRC.ChecksumsFileName
Expand All @@ -243,18 +237,6 @@ fromChecksumsFile file = for checksumFileNamesForRunFiles $ \name ->
Just crc -> Right crc
Nothing -> Left ("key not found: " <> show name)

checksumFileNamesForWriteBufferFiles :: ForWriteBufferFiles CRC.ChecksumsFileName
checksumFileNamesForWriteBufferFiles = fmap (CRC.ChecksumsFileName . BS.pack) writeBufferFileExts

toChecksumsFileForWriteBufferFiles :: ForWriteBufferFiles CRC.CRC32C -> CRC.ChecksumsFile
toChecksumsFileForWriteBufferFiles = Map.fromList . toList . liftA2 (,) checksumFileNamesForWriteBufferFiles

fromChecksumsFileForWriteBufferFiles :: CRC.ChecksumsFile -> Either String (ForWriteBufferFiles CRC.CRC32C)
fromChecksumsFileForWriteBufferFiles file = for checksumFileNamesForWriteBufferFiles $ \name ->
case Map.lookup name file of
Just crc -> Right crc
Nothing -> Left ("key not found: " <> show name)

{-------------------------------------------------------------------------------
Marker newtypes for individual elements of the ForRunFiles and the
ForWriteBufferFiles abstractions
Expand Down Expand Up @@ -314,15 +296,17 @@ pattern WriteBufferFsPaths {writeBufferDir, writeBufferNumber} = WrapRunFsPaths

{-# COMPLETE WriteBufferFsPaths #-}

-- | Paths to all files associated with this run, except 'runChecksumsPath'.
pathsForWriteBufferFiles :: WriteBufferFsPaths -> ForWriteBufferFiles FsPath
pathsForWriteBufferFiles fsPaths = fmap (writeBufferFilePathWithExt fsPaths) writeBufferFileExts
writeBufferKOpsExt :: String
writeBufferKOpsExt = unForKOps . forRunKOps $ runFileExts

writeBufferBlobExt :: String
writeBufferBlobExt = unForBlob . forRunBlob $ runFileExts

writeBufferKOpsPath :: WriteBufferFsPaths -> FsPath
writeBufferKOpsPath = unForKOps . forWriteBufferKOps . pathsForWriteBufferFiles
writeBufferKOpsPath = flip writeBufferFilePathWithExt writeBufferKOpsExt

writeBufferBlobPath :: WriteBufferFsPaths -> FsPath
writeBufferBlobPath = unForBlob . forWriteBufferBlob . pathsForWriteBufferFiles
writeBufferBlobPath = flip writeBufferFilePathWithExt writeBufferBlobExt

writeBufferChecksumsPath :: WriteBufferFsPaths -> FsPath
writeBufferChecksumsPath = flip writeBufferFilePathWithExt "checksums"
Expand All @@ -331,28 +315,16 @@ writeBufferFilePathWithExt :: WriteBufferFsPaths -> String -> FsPath
writeBufferFilePathWithExt (WriteBufferFsPaths dir n) ext =
dir </> mkFsPath [show n] <.> ext

writeBufferFileExts :: ForWriteBufferFiles String
writeBufferFileExts = ForWriteBufferFiles
{ forWriteBufferKOps = forRunKOps runFileExts
, forWriteBufferBlob = forRunBlob runFileExts
}

{-------------------------------------------------------------------------------
ForWriteBuffer abstraction
Checksums For Run Files
-------------------------------------------------------------------------------}

-- | Stores someting for each run file (except the checksums file), allowing to
-- easily do something for all of them without mixing them up.
data ForWriteBufferFiles a = ForWriteBufferFiles { forWriteBufferKOps :: !(ForKOps a), forWriteBufferBlob :: !(ForBlob a) }
deriving stock (Show, Foldable, Functor, Traversable)

forWriteBufferKOpsRaw :: ForWriteBufferFiles a -> a
forWriteBufferKOpsRaw = unForKOps . forWriteBufferKOps

forWriteBufferBlobRaw :: ForWriteBufferFiles a -> a
forWriteBufferBlobRaw = unForBlob . forWriteBufferBlob

instance Applicative ForWriteBufferFiles where
pure x = ForWriteBufferFiles (ForKOps x) (ForBlob x)
ForWriteBufferFiles (ForKOps f1) (ForBlob f2) <*> ForWriteBufferFiles (ForKOps x1) (ForBlob x2) =
ForWriteBufferFiles (ForKOps $ f1 x1) (ForBlob $ f2 x2)
toChecksumsFileForWriteBufferFiles :: (ForKOps CRC.CRC32C, ForBlob CRC.CRC32C) -> CRC.ChecksumsFile
toChecksumsFileForWriteBufferFiles checksums =
Map.fromList . toList $ checksums & bimap
((toChecksumsFileName writeBufferKOpsExt,) . unForKOps)
((toChecksumsFileName writeBufferBlobExt,) . unForBlob)
where
toChecksumsFileName :: String -> CRC.ChecksumsFileName
toChecksumsFileName = CRC.ChecksumsFileName . BS.pack
4 changes: 2 additions & 2 deletions src/Database/LSMTree/Internal/RunReader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ new !offsetKey
let fileSizeInPages = fileSize `div` toEnum pageSize
let indexedPages = getNumPages $ Run.sizeInPages readerRun
assert (indexedPages == fileSizeInPages) $ pure h
-- TODO: Why?
-- Double the file readahead window (only applies to this file descriptor)
-- Advise the OS that this file is being read sequentially, which will
-- double the readahead window in response (only for this file descriptor)
FS.hAdviseAll readerHasBlockIO readerKOpsHandle FS.AdviceSequential

(page, entryNo) <- seekFirstEntry readerKOpsHandle
Expand Down
Loading

0 comments on commit 1efe0e0

Please sign in to comment.