From 1efe0e057ec463ba5f65981e035849e06758ed4d Mon Sep 17 00:00:00 2001 From: Wen Kokke Date: Fri, 27 Dec 2024 13:34:38 +0000 Subject: [PATCH] fix(issue392): process feedback on PR Co-authored-by: Joris Dral --- doc/format-directory.md | 9 +- src-extras/Database/LSMTree/Extras/RunData.hs | 52 +++++++++- src/Database/LSMTree/Internal.hs | 13 ++- src/Database/LSMTree/Internal/Entry.hs | 12 --- src/Database/LSMTree/Internal/Paths.hs | 66 ++++--------- src/Database/LSMTree/Internal/RunReader.hs | 4 +- src/Database/LSMTree/Internal/Snapshot.hs | 91 ++++++++++------- .../LSMTree/Internal/WriteBufferBlobs.hs | 12 ++- .../LSMTree/Internal/WriteBufferReader.hs | 98 ++++++++++--------- .../LSMTree/Internal/WriteBufferWriter.hs | 82 ++++++++-------- test/Test/Database/LSMTree/Internal/Run.hs | 75 +++----------- 11 files changed, 265 insertions(+), 249 deletions(-) diff --git a/doc/format-directory.md b/doc/format-directory.md index ef2f229a8..44e9e1173 100644 --- a/doc/format-directory.md +++ b/doc/format-directory.md @@ -77,6 +77,12 @@ Each snapshot contains a metadata file for the snapshot overall and a checksum: * `${session}/snapshots/${name}/snapshot` * `${session}/snapshots/${name}/snapshot.checksum` +plus three files for the serialised write buffer: + + * `${session}/snapshots/${name}/${n}.keyops` + * `${session}/snapshots/${name}/${n}.blobs` + * `${session}/snapshots/${name}/${n}.checksum` + plus the five files for each LSM run in the snapshot: * `${session}/snapshots/${name}/${n}.keyops` @@ -85,7 +91,7 @@ plus the five files for each LSM run in the snapshot: * `${session}/snapshots/${name}/${n}.index` * `${session}/snapshots/${name}/${n}.checksum` -In this case the LSM run files are numbered from 0 within each snapshot. +In this case the serialised write buffer and the LSM run files are numbered from 0 within each snapshot. ## Snapshot metadata @@ -97,6 +103,7 @@ The snapshot metadata file contains the following information: - Page size for all the key/operations files - Index type and parameters * Key/value type information for dynamic-type sanity checking + * The number of the serialised write buffer files (should always be 0). * The shape of the LSM tree overall (runs within levels etc), referencing the numbered LSM runs. diff --git a/src-extras/Database/LSMTree/Extras/RunData.hs b/src-extras/Database/LSMTree/Extras/RunData.hs index e421ada28..620e60952 100644 --- a/src-extras/Database/LSMTree/Extras/RunData.hs +++ b/src-extras/Database/LSMTree/Extras/RunData.hs @@ -6,6 +6,9 @@ module Database.LSMTree.Extras.RunData ( withRun , withRuns , unsafeFlushAsWriteBuffer + -- * Serialise write buffers + , withRunDataAsWriteBuffer + , withSerialisedWriteBuffer -- * RunData , RunData (..) , SerialisedRunData @@ -20,18 +23,24 @@ module Database.LSMTree.Extras.RunData ( , liftShrink2Map ) where -import Control.Exception (bracket) +import Control.Exception (bracket, bracket_) import Control.Monad import Control.RefCount import Data.Bifoldable (Bifoldable (bifoldMap)) import Data.Bifunctor +import Data.Foldable (for_) +import qualified Data.Map as M import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map +import qualified Data.Vector as V import Data.Word (Word64) import Database.LSMTree.Extras (showPowersOf10) import Database.LSMTree.Extras.Generators () import Database.LSMTree.Internal.Entry +import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) +import Database.LSMTree.Internal.MergeSchedule (addWriteBufferEntries) import Database.LSMTree.Internal.Paths +import qualified Database.LSMTree.Internal.Paths as Paths import Database.LSMTree.Internal.Run (Run, RunDataCaching (..)) import qualified Database.LSMTree.Internal.Run as Run import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..), @@ -40,7 +49,10 @@ import Database.LSMTree.Internal.RunNumber import Database.LSMTree.Internal.Serialise import qualified Database.LSMTree.Internal.WriteBuffer as WB import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB +import Database.LSMTree.Internal.WriteBufferWriter (writeWriteBuffer) +import qualified System.FS.API as FS import System.FS.API +import qualified System.FS.BlockIO.API as FS import System.FS.BlockIO.API import Test.QuickCheck @@ -96,6 +108,44 @@ unsafeFlushAsWriteBuffer fs hbio fsPaths (RunData m) = do releaseRef wbblobs return run +{------------------------------------------------------------------------------- + Serialise write buffers +-------------------------------------------------------------------------------} + +-- | Use 'SerialisedRunData' to 'WriteBuffer' and 'WriteBufferBlobs'. +withRunDataAsWriteBuffer :: + FS.HasFS IO h + -> ResolveSerialisedValue + -> WriteBufferFsPaths + -> SerialisedRunData + -> (WB.WriteBuffer -> Ref (WBB.WriteBufferBlobs IO h) -> IO a) + -> IO a +withRunDataAsWriteBuffer hfs f fsPaths rd action = do + let es = V.fromList . M.toList $ unRunData rd + let maxn = NumEntries $ V.length es + let wbbPath = Paths.writeBufferBlobPath fsPaths + bracket (WBB.new hfs wbbPath) releaseRef $ \wbb -> do + (wb, _) <- addWriteBufferEntries hfs f wbb maxn WB.empty es + action wb wbb + +-- | Serialise a 'WriteBuffer' and 'WriteBufferBlobs' to disk and perform an 'IO' action. +withSerialisedWriteBuffer :: + FS.HasFS IO h + -> FS.HasBlockIO IO h + -> WriteBufferFsPaths + -> WB.WriteBuffer + -> Ref (WBB.WriteBufferBlobs IO h) + -> IO a + -> IO a +withSerialisedWriteBuffer hfs hbio wbPaths wb wbb = + bracket_ (writeWriteBuffer hfs hbio wbPaths wb wbb) $ do + for_ [ Paths.writeBufferKOpsPath wbPaths + , Paths.writeBufferBlobPath wbPaths + , Paths.writeBufferChecksumsPath wbPaths + ] $ \fsPath -> do + fsPathExists <- FS.doesFileExist hfs fsPath + when fsPathExists $ FS.removeFile hfs fsPath + {------------------------------------------------------------------------------- RunData -------------------------------------------------------------------------------} diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index 7239603fd..19ca2f344 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -1160,8 +1160,10 @@ createSnapshot snap label tableType t = do (FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir)) (\_ -> FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir)) - -- Get the table content. - content <- RW.withReadAccess (tableContent thEnv) pure + -- Duplicate references to the table content, so that resources do not disappear + -- from under our feet while taking a snapshot. These references are released + -- again after the snapshot files/directories are written. + content <- RW.withReadAccess (tableContent thEnv) (duplicateTableContent reg) -- Snapshot the write buffer. let activeDir = Paths.activeDir (tableSessionRoot thEnv) @@ -1180,6 +1182,9 @@ createSnapshot snap label tableType t = do SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData + -- Release the table content + releaseTableContent reg content + {-# SPECIALISE openSnapshot :: Session IO h -> SnapshotLabel @@ -1231,10 +1236,8 @@ openSnapshot sesh label tableType override snap resolve = do let activeDir = Paths.activeDir (sessionRoot seshEnv) -- Read write buffer - activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc - let activeWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getActiveDir activeDir) activeWriteBufferNumber let snapWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer - (tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths + (tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths -- Hard link runs into the active directory, snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir activeDir snapLevels diff --git a/src/Database/LSMTree/Internal/Entry.hs b/src/Database/LSMTree/Internal/Entry.hs index d9bb89854..94951ad88 100644 --- a/src/Database/LSMTree/Internal/Entry.hs +++ b/src/Database/LSMTree/Internal/Entry.hs @@ -3,7 +3,6 @@ module Database.LSMTree.Internal.Entry ( , hasBlob , onValue , onBlobRef - , traverseBlobRef , NumEntries (..) , unNumEntries -- * Value resolution/merging @@ -48,17 +47,6 @@ onBlobRef def g = \case Mupdate{} -> def Delete -> def -traverseBlobRef :: - Applicative t - => (blobref -> t blobref') - -> Entry v blobref - -> t (Entry v blobref') -traverseBlobRef f = \case - Insert v -> pure (Insert v) - InsertWithBlob v blobref -> InsertWithBlob v <$> f blobref - Mupdate v -> pure (Mupdate v) - Delete -> pure Delete - instance Bifunctor Entry where first f = \case Insert v -> Insert (f v) diff --git a/src/Database/LSMTree/Internal/Paths.hs b/src/Database/LSMTree/Internal/Paths.hs index 05a57c82c..488388ae1 100644 --- a/src/Database/LSMTree/Internal/Paths.hs +++ b/src/Database/LSMTree/Internal/Paths.hs @@ -30,9 +30,7 @@ module Database.LSMTree.Internal.Paths ( , toChecksumsFile , fromChecksumsFile -- * Checksums for WriteBuffer files - , checksumFileNamesForWriteBufferFiles , toChecksumsFileForWriteBufferFiles - , fromChecksumsFileForWriteBufferFiles -- * ForRunFiles abstraction , ForKOps (..) , ForBlob (..) @@ -45,22 +43,18 @@ module Database.LSMTree.Internal.Paths ( , forRunIndexRaw -- * WriteBuffer paths , WriteBufferFsPaths (WrapRunFsPaths, WriteBufferFsPaths, writeBufferDir, writeBufferNumber) - , pathsForWriteBufferFiles , writeBufferKOpsPath , writeBufferBlobPath , writeBufferChecksumsPath , writeBufferFilePathWithExt - -- * ForWriteBufferFiles abstraction - , ForWriteBufferFiles (..) - , forWriteBufferKOpsRaw - , forWriteBufferBlobRaw - , writeBufferFileExts ) where import Control.Applicative (Applicative (..)) import Control.DeepSeq (NFData (..)) +import Data.Bifunctor (Bifunctor (..)) import qualified Data.ByteString.Char8 as BS import Data.Foldable (toList) +import Data.Function ((&)) import qualified Data.Map as Map import Data.Maybe (fromMaybe) import Data.String (IsString (..)) @@ -228,7 +222,7 @@ runFileExts = ForRunFiles { } {------------------------------------------------------------------------------- - Checksums + Checksums For Run Files -------------------------------------------------------------------------------} checksumFileNamesForRunFiles :: ForRunFiles CRC.ChecksumsFileName @@ -243,18 +237,6 @@ fromChecksumsFile file = for checksumFileNamesForRunFiles $ \name -> Just crc -> Right crc Nothing -> Left ("key not found: " <> show name) -checksumFileNamesForWriteBufferFiles :: ForWriteBufferFiles CRC.ChecksumsFileName -checksumFileNamesForWriteBufferFiles = fmap (CRC.ChecksumsFileName . BS.pack) writeBufferFileExts - -toChecksumsFileForWriteBufferFiles :: ForWriteBufferFiles CRC.CRC32C -> CRC.ChecksumsFile -toChecksumsFileForWriteBufferFiles = Map.fromList . toList . liftA2 (,) checksumFileNamesForWriteBufferFiles - -fromChecksumsFileForWriteBufferFiles :: CRC.ChecksumsFile -> Either String (ForWriteBufferFiles CRC.CRC32C) -fromChecksumsFileForWriteBufferFiles file = for checksumFileNamesForWriteBufferFiles $ \name -> - case Map.lookup name file of - Just crc -> Right crc - Nothing -> Left ("key not found: " <> show name) - {------------------------------------------------------------------------------- Marker newtypes for individual elements of the ForRunFiles and the ForWriteBufferFiles abstractions @@ -314,15 +296,17 @@ pattern WriteBufferFsPaths {writeBufferDir, writeBufferNumber} = WrapRunFsPaths {-# COMPLETE WriteBufferFsPaths #-} --- | Paths to all files associated with this run, except 'runChecksumsPath'. -pathsForWriteBufferFiles :: WriteBufferFsPaths -> ForWriteBufferFiles FsPath -pathsForWriteBufferFiles fsPaths = fmap (writeBufferFilePathWithExt fsPaths) writeBufferFileExts +writeBufferKOpsExt :: String +writeBufferKOpsExt = unForKOps . forRunKOps $ runFileExts + +writeBufferBlobExt :: String +writeBufferBlobExt = unForBlob . forRunBlob $ runFileExts writeBufferKOpsPath :: WriteBufferFsPaths -> FsPath -writeBufferKOpsPath = unForKOps . forWriteBufferKOps . pathsForWriteBufferFiles +writeBufferKOpsPath = flip writeBufferFilePathWithExt writeBufferKOpsExt writeBufferBlobPath :: WriteBufferFsPaths -> FsPath -writeBufferBlobPath = unForBlob . forWriteBufferBlob . pathsForWriteBufferFiles +writeBufferBlobPath = flip writeBufferFilePathWithExt writeBufferBlobExt writeBufferChecksumsPath :: WriteBufferFsPaths -> FsPath writeBufferChecksumsPath = flip writeBufferFilePathWithExt "checksums" @@ -331,28 +315,16 @@ writeBufferFilePathWithExt :: WriteBufferFsPaths -> String -> FsPath writeBufferFilePathWithExt (WriteBufferFsPaths dir n) ext = dir mkFsPath [show n] <.> ext -writeBufferFileExts :: ForWriteBufferFiles String -writeBufferFileExts = ForWriteBufferFiles - { forWriteBufferKOps = forRunKOps runFileExts - , forWriteBufferBlob = forRunBlob runFileExts - } {------------------------------------------------------------------------------- - ForWriteBuffer abstraction + Checksums For Run Files -------------------------------------------------------------------------------} --- | Stores someting for each run file (except the checksums file), allowing to --- easily do something for all of them without mixing them up. -data ForWriteBufferFiles a = ForWriteBufferFiles { forWriteBufferKOps :: !(ForKOps a), forWriteBufferBlob :: !(ForBlob a) } - deriving stock (Show, Foldable, Functor, Traversable) - -forWriteBufferKOpsRaw :: ForWriteBufferFiles a -> a -forWriteBufferKOpsRaw = unForKOps . forWriteBufferKOps - -forWriteBufferBlobRaw :: ForWriteBufferFiles a -> a -forWriteBufferBlobRaw = unForBlob . forWriteBufferBlob - -instance Applicative ForWriteBufferFiles where - pure x = ForWriteBufferFiles (ForKOps x) (ForBlob x) - ForWriteBufferFiles (ForKOps f1) (ForBlob f2) <*> ForWriteBufferFiles (ForKOps x1) (ForBlob x2) = - ForWriteBufferFiles (ForKOps $ f1 x1) (ForBlob $ f2 x2) +toChecksumsFileForWriteBufferFiles :: (ForKOps CRC.CRC32C, ForBlob CRC.CRC32C) -> CRC.ChecksumsFile +toChecksumsFileForWriteBufferFiles checksums = + Map.fromList . toList $ checksums & bimap + ((toChecksumsFileName writeBufferKOpsExt,) . unForKOps) + ((toChecksumsFileName writeBufferBlobExt,) . unForBlob) + where + toChecksumsFileName :: String -> CRC.ChecksumsFileName + toChecksumsFileName = CRC.ChecksumsFileName . BS.pack diff --git a/src/Database/LSMTree/Internal/RunReader.hs b/src/Database/LSMTree/Internal/RunReader.hs index c04b44a5b..396eee0f6 100644 --- a/src/Database/LSMTree/Internal/RunReader.hs +++ b/src/Database/LSMTree/Internal/RunReader.hs @@ -112,8 +112,8 @@ new !offsetKey let fileSizeInPages = fileSize `div` toEnum pageSize let indexedPages = getNumPages $ Run.sizeInPages readerRun assert (indexedPages == fileSizeInPages) $ pure h - -- TODO: Why? - -- Double the file readahead window (only applies to this file descriptor) + -- Advise the OS that this file is being read sequentially, which will + -- double the readahead window in response (only for this file descriptor) FS.hAdviseAll readerHasBlockIO readerKOpsHandle FS.AdviceSequential (page, entryNo) <- seekFirstEntry readerKOpsHandle diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index c38043da3..8e10b5967 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -47,7 +47,7 @@ import qualified Database.LSMTree.Internal.Merge as Merge import Database.LSMTree.Internal.MergeSchedule import Database.LSMTree.Internal.MergingRun (NumRuns (..)) import qualified Database.LSMTree.Internal.MergingRun as MR -import Database.LSMTree.Internal.Paths (ActiveDir (..), +import Database.LSMTree.Internal.Paths (ActiveDir (..), ForKOps (..), NamedSnapshotDir (..), RunFsPaths (..), WriteBufferFsPaths (..), pathsForRunFiles, runChecksumsPath, writeBufferBlobPath, @@ -56,13 +56,14 @@ import Database.LSMTree.Internal.Run (Run) import qualified Database.LSMTree.Internal.Run as Run import Database.LSMTree.Internal.RunNumber import Database.LSMTree.Internal.UniqCounter (UniqCounter, - incrUniqCounter, uniqueToRunNumber) + incrUniqCounter, uniqueToRunNumber, uniqueToWord64) import Database.LSMTree.Internal.WriteBuffer (WriteBuffer) import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs) +import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB import qualified Database.LSMTree.Internal.WriteBufferReader as WBR import qualified Database.LSMTree.Internal.WriteBufferWriter as WBW import qualified System.FS.API as FS -import System.FS.API (HasFS) +import System.FS.API (HasFS, (<.>), ()) import qualified System.FS.API.Lazy as FSL import qualified System.FS.BlockIO.API as FS import System.FS.BlockIO.API (HasBlockIO) @@ -109,7 +110,7 @@ data SnapshotMetaData = SnapshotMetaData { -- opened: see 'TableConfigOverride'. , snapMetaConfig :: !TableConfig -- | The write buffer. - , snapWriteBuffer :: !(RunNumber) + , snapWriteBuffer :: !RunNumber -- | The shape of the levels of the LSM tree. , snapMetaLevels :: !(SnapLevels RunNumber) } @@ -253,13 +254,25 @@ snapshotWriteBuffer reg hfs hbio uc activeDir snapDir wb wbb = do -- Write the write buffer and write buffer blobs to the active directory. activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc let activeWriteBufferPaths = WriteBufferFsPaths (getActiveDir activeDir) activeWriteBufferNumber - WBW.writeWriteBuffer hfs hbio activeWriteBufferPaths wb wbb + allocateTemp reg + (WBW.writeWriteBuffer hfs hbio activeWriteBufferPaths wb wbb) + -- TODO: it should probably be the responsibility of writeWriteBuffer to do + -- cleanup + $ \() -> do + -- TODO: check files exist before removing them + FS.removeFile hfs (writeBufferKOpsPath activeWriteBufferPaths) + FS.removeFile hfs (writeBufferBlobPath activeWriteBufferPaths) -- Hard link the write buffer and write buffer blobs to the snapshot directory. - snapWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc - let snapWriteBufferPaths = WriteBufferFsPaths (getNamedSnapshotDir snapDir) snapWriteBufferNumber - hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferKOpsPath activeWriteBufferPaths) (writeBufferKOpsPath snapWriteBufferPaths) - hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferBlobPath activeWriteBufferPaths) (writeBufferBlobPath snapWriteBufferPaths) - hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferChecksumsPath activeWriteBufferPaths) (writeBufferChecksumsPath snapWriteBufferPaths) + let snapWriteBufferPaths = WriteBufferFsPaths (getNamedSnapshotDir snapDir) activeWriteBufferNumber + hardLink reg hfs hbio HardLinkDurable + (writeBufferKOpsPath activeWriteBufferPaths) + (writeBufferKOpsPath snapWriteBufferPaths) + hardLink reg hfs hbio HardLinkDurable + (writeBufferBlobPath activeWriteBufferPaths) + (writeBufferBlobPath snapWriteBufferPaths) + hardLink reg hfs hbio HardLinkDurable + (writeBufferChecksumsPath activeWriteBufferPaths) + (writeBufferChecksumsPath snapWriteBufferPaths) pure snapWriteBufferPaths {-# SPECIALISE @@ -268,7 +281,8 @@ snapshotWriteBuffer reg hfs hbio uc activeDir snapDir wb wbb = do -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h - -> WriteBufferFsPaths + -> UniqCounter IO + -> ActiveDir -> WriteBufferFsPaths -> IO (WriteBuffer, Ref (WriteBufferBlobs IO h)) #-} @@ -278,17 +292,26 @@ openWriteBuffer :: -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h - -> WriteBufferFsPaths + -> UniqCounter m + -> ActiveDir -> WriteBufferFsPaths -> m (WriteBuffer, Ref (WriteBufferBlobs m h)) -openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths = do - -- Hard link the write buffer keyops and checksum files to the snapshot directory. - hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferKOpsPath snapWriteBufferPaths) (writeBufferKOpsPath activeWriteBufferPaths) - hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferChecksumsPath snapWriteBufferPaths) (writeBufferChecksumsPath activeWriteBufferPaths) - -- Copy the write buffer blobs file to the snapshot directory. - copyFileTemp reg hfs hbio (writeBufferBlobPath snapWriteBufferPaths) (writeBufferBlobPath activeWriteBufferPaths) - -- Read write buffer - WBR.readWriteBuffer reg resolve hfs hbio activeWriteBufferPaths +openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths = do + -- Copy the write buffer blobs file to the active directory and open it. + activeWriteBufferNumber <- uniqueToWord64 <$> incrUniqCounter uc + let activeWriteBufferBlobPath = + getActiveDir activeDir FS.mkFsPath [show activeWriteBufferNumber] <.> "wbblobs" + copyFile reg hfs hbio (writeBufferBlobPath snapWriteBufferPaths) activeWriteBufferBlobPath + writeBufferBlobs <- + allocateTemp reg + (WBB.open hfs activeWriteBufferBlobPath FS.AllowExisting) + releaseRef + -- Read write buffer key/ops + let kOpsPath = ForKOps (writeBufferKOpsPath snapWriteBufferPaths) + writeBuffer <- + withRef writeBufferBlobs $ \wbb -> + WBR.readWriteBuffer resolve hfs hbio kOpsPath (WBB.blobFile wbb) + pure (writeBuffer, writeBufferBlobs) {------------------------------------------------------------------------------- Runs @@ -476,15 +499,11 @@ hardLinkRunFiles :: hardLinkRunFiles reg hfs hbio dur sourceRunFsPaths targetRunFsPaths = do let sourcePaths = pathsForRunFiles sourceRunFsPaths targetPaths = pathsForRunFiles targetRunFsPaths - sequenceA_ (hardLinkTemp reg hfs hbio dur <$> sourcePaths <*> targetPaths) - hardLinkTemp reg hfs hbio dur (runChecksumsPath sourceRunFsPaths) (runChecksumsPath targetRunFsPaths) - -{------------------------------------------------------------------------------- - Hard link file --------------------------------------------------------------------------------} + sequenceA_ (hardLink reg hfs hbio dur <$> sourcePaths <*> targetPaths) + hardLink reg hfs hbio dur (runChecksumsPath sourceRunFsPaths) (runChecksumsPath targetRunFsPaths) {-# SPECIALISE - hardLinkTemp :: + hardLink :: TempRegistry IO -> HasFS IO h -> HasBlockIO IO h @@ -493,9 +512,9 @@ hardLinkRunFiles reg hfs hbio dur sourceRunFsPaths targetRunFsPaths = do -> FS.FsPath -> IO () #-} --- | @'hardLinkTemp' reg hfs hbio dur sourcePath targetPath@ creates a hard link +-- | @'hardLink' reg hfs hbio dur sourcePath targetPath@ creates a hard link -- from @sourcePath@ to @targetPath@. -hardLinkTemp :: +hardLink :: (MonadMask m, MonadMVar m) => TempRegistry m -> HasFS m h @@ -504,7 +523,7 @@ hardLinkTemp :: -> FS.FsPath -> FS.FsPath -> m () -hardLinkTemp reg hfs hbio dur sourcePath targetPath = do +hardLink reg hfs hbio dur sourcePath targetPath = do allocateTemp reg (FS.createHardLink hbio sourcePath targetPath) (\_ -> FS.removeFile hfs targetPath) @@ -516,16 +535,16 @@ hardLinkTemp reg hfs hbio dur sourcePath targetPath = do -------------------------------------------------------------------------------} {-# SPECIALISE - copyFileTemp :: + copyFile :: TempRegistry IO -> HasFS IO h -> HasBlockIO IO h -> FS.FsPath -> FS.FsPath -> IO () - #-} --- | @'copyFile' hfs hbio source target@ copies the @source@ path to the @target@ path. -copyFileTemp :: + #-} +-- | @'copyFile' reg hfs hbio source target@ copies the @source@ path to the @target@ path. +copyFile :: (MonadMask m, MonadMVar m) => TempRegistry m -> HasFS m h @@ -533,8 +552,8 @@ copyFileTemp :: -> FS.FsPath -> FS.FsPath -> m () -copyFileTemp reg hfs _hbio sourcePath targetPath = - flip (allocateTemp reg) (\_ -> FS.removeFile hfs targetPath) $ +copyFile reg hfs _hbio sourcePath targetPath = + flip (allocateTemp reg) (\_ -> FS.removeFile hfs targetPath) $ FS.withFile hfs sourcePath FS.ReadMode $ \sourceHandle -> FS.withFile hfs targetPath (FS.WriteMode FS.MustBeNew) $ \targetHandle -> do bs <- FSL.hGetAll hfs sourceHandle diff --git a/src/Database/LSMTree/Internal/WriteBufferBlobs.hs b/src/Database/LSMTree/Internal/WriteBufferBlobs.hs index e3b6f4daa..31265872a 100644 --- a/src/Database/LSMTree/Internal/WriteBufferBlobs.hs +++ b/src/Database/LSMTree/Internal/WriteBufferBlobs.hs @@ -24,6 +24,7 @@ -- module Database.LSMTree.Internal.WriteBufferBlobs ( WriteBufferBlobs (..), + fromBlobFile, new, open, addBlob, @@ -136,7 +137,16 @@ open :: open fs blobFileName blobFileAllowExisting = do -- Must use read/write mode because we write blobs when adding, but -- we can also be asked to retrieve blobs at any time. - blobFile <- openBlobFile fs blobFileName (FS.ReadWriteMode blobFileAllowExisting) + fromBlobFile fs =<< openBlobFile fs blobFileName (FS.ReadWriteMode blobFileAllowExisting) + +{-# SPECIALISE fromBlobFile :: HasFS IO h -> Ref (BlobFile IO h) -> IO (Ref (WriteBufferBlobs IO h)) #-} +-- | Make a `WriteBufferBlobs` from a `BlobFile` and set the file pointer to the end of the file. +fromBlobFile :: + (PrimMonad m, MonadMask m) + => HasFS m h + -> Ref (BlobFile m h) + -> m (Ref (WriteBufferBlobs m h)) +fromBlobFile fs blobFile = do blobFilePointer <- newFilePointer -- Set the blob file pointer to the end of the file blobFileSize <- withRef blobFile $ FS.hGetSize fs . blobFileHandle diff --git a/src/Database/LSMTree/Internal/WriteBufferReader.hs b/src/Database/LSMTree/Internal/WriteBufferReader.hs index 59f9e3ab2..2b4a9b855 100644 --- a/src/Database/LSMTree/Internal/WriteBufferReader.hs +++ b/src/Database/LSMTree/Internal/WriteBufferReader.hs @@ -9,17 +9,17 @@ import Control.Monad.Class.MonadST (MonadST (..)) import Control.Monad.Class.MonadSTM (MonadSTM (..)) import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..)) import Control.Monad.Primitive (PrimMonad (..)) -import Control.RefCount (Ref, releaseRef) -import Control.TempRegistry +import Control.RefCount (Ref, dupRef, releaseRef) import Data.Primitive.MutVar (MutVar, newMutVar, readMutVar, writeMutVar) import Data.Primitive.PrimVar import Data.Word (Word16) -import Database.LSMTree.Internal.BlobRef (RawBlobRef (..)) +import Database.LSMTree.Internal.BlobFile (BlobFile) +import Database.LSMTree.Internal.BlobRef (RawBlobRef (..), + mkRawBlobRef) import qualified Database.LSMTree.Internal.Entry as E import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) import Database.LSMTree.Internal.Paths -import qualified Database.LSMTree.Internal.Paths as Paths import Database.LSMTree.Internal.RawPage import Database.LSMTree.Internal.RunReader (Entry (..), Result (..), mkEntryOverflow, readDiskPage, readOverflowPages, @@ -27,77 +27,79 @@ import Database.LSMTree.Internal.RunReader (Entry (..), Result (..), import Database.LSMTree.Internal.Serialise (SerialisedValue) import Database.LSMTree.Internal.WriteBuffer (WriteBuffer) import qualified Database.LSMTree.Internal.WriteBuffer as WB -import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs) -import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB import qualified System.FS.API as FS import System.FS.API (HasFS) import qualified System.FS.BlockIO.API as FS import System.FS.BlockIO.API (HasBlockIO) {-# SPECIALISE - readWriteBuffer :: - TempRegistry IO - -> ResolveSerialisedValue - -> HasFS IO h - -> HasBlockIO IO h - -> WriteBufferFsPaths - -> IO (WriteBuffer, Ref (WriteBufferBlobs IO h)) + readWriteBuffer :: + ResolveSerialisedValue + -> HasFS IO h + -> HasBlockIO IO h + -> ForKOps FS.FsPath + -> Ref (BlobFile IO h) + -> IO WriteBuffer #-} +-- | Read a serialised `WriteBuffer` back into memory. +-- +-- NOTE: The `BlobFile` argument /must be/ the blob file associated with the +-- write buffer; @`readWriteBuffer`@ does not check this. readWriteBuffer :: (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) - => TempRegistry m - -> ResolveSerialisedValue + => ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h - -> WriteBufferFsPaths - -> m (WriteBuffer, Ref (WriteBufferBlobs m h)) -readWriteBuffer reg resolve hfs hbio wbPaths = - bracket (new reg hfs hbio wbPaths) close $ \reader@WriteBufferReader{..} -> - (,) <$> readEntries reader <*> pure readerWriteBufferBlobs + -> ForKOps FS.FsPath + -> Ref (BlobFile m h) + -> m WriteBuffer +readWriteBuffer resolve hfs hbio kOpsPath blobFile = + bracket (new hfs hbio kOpsPath blobFile) close $ readEntries where readEntries reader = readEntriesAcc WB.empty where readEntriesAcc acc = next reader >>= \case Empty -> pure acc - ReadEntry key entry -> readEntriesAcc $ WB.addEntry resolve key (rawBlobRefSpan <$> toFullEntry entry) acc + ReadEntry key entry -> readEntriesAcc $ + WB.addEntry resolve key (rawBlobRefSpan <$> toFullEntry entry) acc --- | Allows reading the k\/ops of a run incrementally, using its own read-only --- file handle and in-memory cache of the current disk page. +-- | Allows reading the k\/ops of a serialised write buffer incrementally, +-- using its own read-only file handle and in-memory cache of the current disk page. -- -- New pages are loaded when trying to read their first entry. data WriteBufferReader m h = WriteBufferReader { -- | The disk page currently being read. If it is 'Nothing', the reader -- is considered closed. - readerCurrentPage :: !(MutVar (PrimState m) (Maybe RawPage)) + readerCurrentPage :: !(MutVar (PrimState m) (Maybe RawPage)) -- | The index of the entry to be returned by the next call to 'next'. - , readerCurrentEntryNo :: !(PrimVar (PrimState m) Word16) - , readerKOpsHandle :: !(FS.Handle h) - , readerWriteBufferBlobs :: !(Ref (WriteBufferBlobs m h)) - , readerHasFS :: !(HasFS m h) - , readerHasBlockIO :: !(HasBlockIO m h) + , readerCurrentEntryNo :: !(PrimVar (PrimState m) Word16) + , readerKOpsHandle :: !(FS.Handle h) + , readerBlobFile :: !(Ref (BlobFile m h)) + , readerHasFS :: !(HasFS m h) + , readerHasBlockIO :: !(HasBlockIO m h) } {-# SPECIALISE - new :: - TempRegistry IO - -> HasFS IO h - -> HasBlockIO IO h - -> WriteBufferFsPaths - -> IO (WriteBufferReader IO h) + new :: + HasFS IO h + -> HasBlockIO IO h + -> ForKOps FS.FsPath + -> Ref (BlobFile IO h) + -> IO (WriteBufferReader IO h) #-} -- | See 'Database.LSMTree.Internal.RunReader.new'. new :: forall m h. (MonadMVar m, MonadST m, MonadMask m) - => TempRegistry m - -> HasFS m h + => HasFS m h -> HasBlockIO m h - -> WriteBufferFsPaths + -> ForKOps FS.FsPath + -> Ref (BlobFile m h) -> m (WriteBufferReader m h) -new reg readerHasFS readerHasBlockIO fsPaths = do - (readerKOpsHandle :: FS.Handle h) <- FS.hOpen readerHasFS (Paths.writeBufferKOpsPath fsPaths) FS.ReadMode +new readerHasFS readerHasBlockIO kOpsPath blobFile = do + readerKOpsHandle <- FS.hOpen readerHasFS (unForKOps kOpsPath) FS.ReadMode -- Double the file readahead window (only applies to this file descriptor) FS.hAdviseAll readerHasBlockIO readerKOpsHandle FS.AdviceSequential - readerWriteBufferBlobs <- allocateTemp reg (WBB.open readerHasFS (Paths.writeBufferBlobPath fsPaths) FS.AllowExisting) releaseRef + readerBlobFile <- dupRef blobFile -- Load first page from disk, if it exists. readerCurrentEntryNo <- newPrimVar (0 :: Word16) firstPage <- readDiskPage readerHasFS readerKOpsHandle @@ -139,17 +141,23 @@ next WriteBufferReader {..} = do IndexEntry key entry -> do modifyPrimVar readerCurrentEntryNo (+1) let entry' :: E.Entry SerialisedValue (RawBlobRef m h) - entry' = fmap (WBB.mkRawBlobRef readerWriteBufferBlobs) entry + entry' = fmap (mkRawBlobRef readerBlobFile) entry let rawEntry = Entry entry' return (ReadEntry key rawEntry) IndexEntryOverflow key entry lenSuffix -> do -- TODO: we know that we need the next page, could already load? modifyPrimVar readerCurrentEntryNo (+1) let entry' :: E.Entry SerialisedValue (RawBlobRef m h) - entry' = fmap (WBB.mkRawBlobRef readerWriteBufferBlobs) entry + entry' = fmap (mkRawBlobRef readerBlobFile) entry overflowPages <- readOverflowPages readerHasFS readerKOpsHandle lenSuffix let rawEntry = mkEntryOverflow entry' page lenSuffix overflowPages return (ReadEntry key rawEntry) -close :: WriteBufferReader m h -> m () -close WriteBufferReader{..} = FS.hClose readerHasFS readerKOpsHandle +{-# SPECIALISE close :: WriteBufferReader IO h -> IO () #-} +close :: + (MonadMask m, PrimMonad m) + => WriteBufferReader m h + -> m () +close WriteBufferReader{..} = do + FS.hClose readerHasFS readerKOpsHandle + releaseRef readerBlobFile diff --git a/src/Database/LSMTree/Internal/WriteBufferWriter.hs b/src/Database/LSMTree/Internal/WriteBufferWriter.hs index f8902eafa..7e3764d12 100644 --- a/src/Database/LSMTree/Internal/WriteBufferWriter.hs +++ b/src/Database/LSMTree/Internal/WriteBufferWriter.hs @@ -25,11 +25,10 @@ import Database.LSMTree.Internal.Entry (Entry) import Database.LSMTree.Internal.PageAcc (PageAcc) import qualified Database.LSMTree.Internal.PageAcc as PageAcc import qualified Database.LSMTree.Internal.PageAcc1 as PageAcc -import Database.LSMTree.Internal.Paths (ForWriteBufferFiles (..), - WriteBufferFsPaths, forWriteBufferBlobRaw, - forWriteBufferKOpsRaw, pathsForWriteBufferFiles, - toChecksumsFileForWriteBufferFiles, - writeBufferChecksumsPath) +import Database.LSMTree.Internal.Paths (ForBlob (..), ForKOps (..), + WriteBufferFsPaths, toChecksumsFileForWriteBufferFiles, + writeBufferBlobPath, writeBufferChecksumsPath, + writeBufferKOpsPath) import Database.LSMTree.Internal.RawOverflowPage (RawOverflowPage) import Database.LSMTree.Internal.RawPage (RawPage) import Database.LSMTree.Internal.Serialise (SerialisedKey, @@ -45,14 +44,14 @@ import System.FS.BlockIO.API (HasBlockIO) {-# SPECIALISE - writeWriteBuffer :: - HasFS IO h - -> HasBlockIO IO h - -> WriteBufferFsPaths - -> WriteBuffer - -> Ref (WriteBufferBlobs IO h) - -> IO () - #-} + writeWriteBuffer :: + HasFS IO h + -> HasBlockIO IO h + -> WriteBufferFsPaths + -> WriteBuffer + -> Ref (WriteBufferBlobs IO h) + -> IO () + #-} -- | Write a 'WriteBuffer' to disk. writeWriteBuffer :: (MonadSTM m, MonadST m, MonadThrow m) @@ -78,14 +77,15 @@ data WriteBufferWriter m h = WriteBufferWriter -- | The byte offset within the blob file for the next blob to be written. writerBlobOffset :: !(PrimVar (PrimState m) Word64), -- | The (write mode) file handles. - writerHandles :: !(ForWriteBufferFiles (ChecksumHandle (PrimState m) h)), + writerKOpsHandle :: !(ForKOps (ChecksumHandle (PrimState m) h)), + writerBlobHandle :: !(ForBlob (ChecksumHandle (PrimState m) h)), writerHasFS :: !(HasFS m h), writerHasBlockIO :: !(HasBlockIO m h) } {-# SPECIALISE new :: - HasFS IO h + HasFS IO h -> HasBlockIO IO h -> WriteBufferFsPaths -> IO (WriteBufferWriter IO h) @@ -104,8 +104,8 @@ new :: new hfs hbio fsPaths = do writerPageAcc <- ST.stToIO PageAcc.newPageAcc writerBlobOffset <- newPrimVar 0 - writerHandles <- - traverse (makeHandle hfs) (pathsForWriteBufferFiles fsPaths) + writerKOpsHandle <- ForKOps <$> makeHandle hfs (writeBufferKOpsPath fsPaths) + writerBlobHandle <- ForBlob <$> makeHandle hfs (writeBufferBlobPath fsPaths) return WriteBufferWriter { writerFsPaths = fsPaths, writerHasFS = hfs, @@ -114,10 +114,10 @@ new hfs hbio fsPaths = do } {-# SPECIALISE - unsafeFinalise :: - Bool - -> WriteBufferWriter IO h - -> IO (HasFS IO h, HasBlockIO IO h, WriteBufferFsPaths) + unsafeFinalise :: + Bool + -> WriteBufferWriter IO h + -> IO (HasFS IO h, HasBlockIO IO h, WriteBufferFsPaths) #-} -- | Finalise an incremental 'WriteBufferWriter'. -- @@ -134,26 +134,29 @@ unsafeFinalise :: unsafeFinalise dropCaches WriteBufferWriter {..} = do -- write final bits mPage <- ST.stToIO $ flushPageIfNonEmpty writerPageAcc - for_ mPage $ writeRawPage writerHasFS (forWriteBufferKOps writerHandles) - checksums <- toChecksumsFileForWriteBufferFiles <$> traverse readChecksum writerHandles + for_ mPage $ writeRawPage writerHasFS writerKOpsHandle + kOpsChecksum <- traverse readChecksum writerKOpsHandle + blobChecksum <- traverse readChecksum writerBlobHandle + let checksums = toChecksumsFileForWriteBufferFiles (kOpsChecksum, blobChecksum) FS.withFile writerHasFS (writeBufferChecksumsPath writerFsPaths) (FS.WriteMode FS.MustBeNew) $ \h -> do CRC.writeChecksumsFile' writerHasFS h checksums FS.hDropCacheAll writerHasBlockIO h -- drop the KOps and blobs files from the cache if asked for when dropCaches $ do - dropCache writerHasBlockIO (forWriteBufferKOpsRaw writerHandles) - dropCache writerHasBlockIO (forWriteBufferBlobRaw writerHandles) - for_ writerHandles $ closeHandle writerHasFS + dropCache writerHasBlockIO (unForKOps writerKOpsHandle) + dropCache writerHasBlockIO (unForBlob writerBlobHandle) + closeHandle writerHasFS (unForKOps writerKOpsHandle) + closeHandle writerHasFS (unForBlob writerBlobHandle) return (writerHasFS, writerHasBlockIO, writerFsPaths) {-# SPECIALIZE - addKeyOp :: - WriteBufferWriter IO h - -> SerialisedKey - -> Entry SerialisedValue (RawBlobRef IO h) - -> IO () - #-} + addKeyOp :: + WriteBufferWriter IO h + -> SerialisedKey + -> Entry SerialisedValue (RawBlobRef IO h) + -> IO () + #-} -- | See 'Database.LSMTree.Internal.RunBuilder.addKeyOp'. addKeyOp :: (MonadST m, MonadSTM m, MonadThrow m) @@ -163,16 +166,16 @@ addKeyOp :: -> m () addKeyOp WriteBufferWriter{..} key op = do -- TODO: consider optimisation described in 'Database.LSMTree.Internal.RunBuilder.addKeyOp'. - op' <- traverse (copyBlob writerHasFS writerBlobOffset (forWriteBufferBlob writerHandles)) op + op' <- traverse (copyBlob writerHasFS writerBlobOffset writerBlobHandle) op if PageAcc.entryWouldFitInPage key op then do mPage <- ST.stToIO $ addSmallKeyOp writerPageAcc key op' - for_ mPage $ writeRawPage writerHasFS (forWriteBufferKOps writerHandles) + for_ mPage $ writeRawPage writerHasFS writerKOpsHandle else do (pages, overflowPages) <- ST.stToIO $ addLargeKeyOp writerPageAcc key op' -- TODO: consider optimisation described in 'Database.LSMTree.Internal.RunBuilder.addKeyOp'. - for_ pages $ writeRawPage writerHasFS (forWriteBufferKOps writerHandles) - writeRawOverflowPages writerHasFS (forWriteBufferKOps writerHandles) overflowPages + for_ pages $ writeRawPage writerHasFS writerKOpsHandle + writeRawOverflowPages writerHasFS writerKOpsHandle overflowPages -- | See 'Database.LSMTree.Internal.RunAcc.addSmallKeyOp'. addSmallKeyOp :: @@ -229,8 +232,9 @@ flushPageIfNonEmpty pageAcc = do else pure Nothing -- | Internal helper. See 'Database.LSMTree.Internal.RunAcc.selectPagesAndChunks'. -selectPages :: Maybe RawPage - -> RawPage - -> [RawPage] +selectPages :: + Maybe RawPage + -> RawPage + -> [RawPage] selectPages mPagePre page = maybeToList mPagePre ++ [page] diff --git a/test/Test/Database/LSMTree/Internal/Run.hs b/test/Test/Database/LSMTree/Internal/Run.hs index ec25f8ab2..b5db368b9 100644 --- a/test/Test/Database/LSMTree/Internal/Run.hs +++ b/test/Test/Database/LSMTree/Internal/Run.hs @@ -5,27 +5,20 @@ module Test.Database.LSMTree.Internal.Run ( tests, ) where -import Control.Exception (bracket, bracket_) -import Control.Monad (when) import Control.RefCount import Control.TempRegistry (withTempRegistry) import Data.ByteString (ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Short as SBS import Data.Coerce (coerce) -import Data.Foldable (for_) -import qualified Data.Map as M import qualified Data.Map.Strict as Map import Data.Maybe (fromJust) import qualified Data.Primitive.ByteArray as BA -import qualified Data.Vector as V import Database.LSMTree.Extras.Generators (KeyForIndexCompact (..)) import Database.LSMTree.Extras.RunData import Database.LSMTree.Internal.BlobRef (BlobSpan (..)) import qualified Database.LSMTree.Internal.CRC32C as CRC import Database.LSMTree.Internal.Entry -import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) -import Database.LSMTree.Internal.MergeSchedule (addWriteBufferEntries) import Database.LSMTree.Internal.Paths (RunFsPaths (..), WriteBufferFsPaths (..)) import qualified Database.LSMTree.Internal.Paths as Paths @@ -35,12 +28,8 @@ import Database.LSMTree.Internal.Run as Run import Database.LSMTree.Internal.RunNumber import Database.LSMTree.Internal.Serialise import Database.LSMTree.Internal.Snapshot -import Database.LSMTree.Internal.WriteBuffer (WriteBuffer) -import qualified Database.LSMTree.Internal.WriteBuffer as WB -import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs) import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB import Database.LSMTree.Internal.WriteBufferReader (readWriteBuffer) -import Database.LSMTree.Internal.WriteBufferWriter (writeWriteBuffer) import qualified FormatPage as Proto import System.FilePath import qualified System.FS.API as FS @@ -231,28 +220,28 @@ prop_WriteAndOpen fs hbio wb = return (property True) -- | Writing and loading a 'WriteBuffer' gives the same in-memory --- representation as the original run. +-- representation as the original write buffer. prop_WriteAndOpenWriteBuffer :: FS.HasFS IO h -> FS.HasBlockIO IO h -> RunData KeyForIndexCompact SerialisedValue SerialisedBlob -> IO Property prop_WriteAndOpenWriteBuffer hfs hbio rd = do - withTempRegistry $ \reg -> do - -- Serialise run data as write buffer: - let srd = serialiseRunData rd - let inPaths = WrapRunFsPaths $ simplePath 1111 - let resolve (SerialisedValue x) (SerialisedValue y) = SerialisedValue (x <> y) - withRunDataAsWriteBuffer hfs resolve inPaths srd $ \wb wbb -> do - -- Write write buffer to disk: - let wbPaths = WrapRunFsPaths $ simplePath 1312 - withSerialisedWriteBuffer hfs hbio wbPaths wb wbb $ do - -- Read write buffer from disk: - (wb', wbb') <- readWriteBuffer reg resolve hfs hbio wbPaths + -- Serialise run data as write buffer: + let srd = serialiseRunData rd + let inPaths = WrapRunFsPaths $ simplePath 1111 + let resolve (SerialisedValue x) (SerialisedValue y) = SerialisedValue (x <> y) + withRunDataAsWriteBuffer hfs resolve inPaths srd $ \wb wbb -> do + -- Write write buffer to disk: + let wbPaths = WrapRunFsPaths $ simplePath 1312 + withSerialisedWriteBuffer hfs hbio wbPaths wb wbb $ do + -- Read write buffer from disk: + let kOpsPath = Paths.ForKOps (Paths.writeBufferKOpsPath wbPaths) + withRef wbb $ \wbb' -> do + wb' <- readWriteBuffer resolve hfs hbio kOpsPath (WBB.blobFile wbb') assertEqual "k/ops" wb wb' - releaseRef wbb' - -- TODO: return a proper Property instead of using assertEqual etc. - return (property True) + -- TODO: return a proper Property instead of using assertEqual etc. + return (property True) -- | Writing run data to the disk via 'writeWriteBuffer' gives the same key/ops -- and blob files as when written out as a run. @@ -290,37 +279,3 @@ prop_WriteRunEqWriteWriteBuffer hfs hbio rd = do assertEqual "writtenRunBlob/writtenWriteBufferBlob" rdBlob wbBlob -- TODO: return a proper Property instead of using assertEqual etc. return (property True) - --- | Use 'SerialisedRunData' to 'WriteBuffer' and 'WriteBufferBlobs'. -withRunDataAsWriteBuffer :: - FS.HasFS IO h - -> ResolveSerialisedValue - -> WriteBufferFsPaths - -> SerialisedRunData - -> (WriteBuffer -> Ref (WriteBufferBlobs IO h) -> IO a) - -> IO a -withRunDataAsWriteBuffer hfs f fsPaths rd action = do - let es = V.fromList . M.toList $ unRunData rd - let maxn = NumEntries $ V.length es - let wbbPath = Paths.writeBufferBlobPath fsPaths - bracket (WBB.new hfs wbbPath) releaseRef $ \wbb -> do - (wb, _) <- addWriteBufferEntries hfs f wbb maxn WB.empty es - action wb wbb - --- | Serialise a 'WriteBuffer' and 'WriteBufferBlobs' to disk and perform an 'IO' action. -withSerialisedWriteBuffer :: - FS.HasFS IO h - -> FS.HasBlockIO IO h - -> WriteBufferFsPaths - -> WriteBuffer - -> Ref (WriteBufferBlobs IO h) - -> IO a - -> IO a -withSerialisedWriteBuffer hfs hbio wbPaths wb wbb = - bracket_ (writeWriteBuffer hfs hbio wbPaths wb wbb) $ do - for_ [ Paths.writeBufferKOpsPath wbPaths - , Paths.writeBufferBlobPath wbPaths - , Paths.writeBufferChecksumsPath wbPaths - ] $ \fsPath -> do - fsPathExists <- FS.doesFileExist hfs fsPath - when fsPathExists $ FS.removeFile hfs fsPath