From d68478f602e76563c023479cecbe6397a2533be9 Mon Sep 17 00:00:00 2001 From: Wen Kokke Date: Mon, 9 Dec 2024 19:02:54 +0000 Subject: [PATCH] All tests passing --- src/Database/LSMTree.hs | 9 +- src/Database/LSMTree/Internal.hs | 43 +++-- src/Database/LSMTree/Internal/BlobFile.hs | 4 +- src/Database/LSMTree/Internal/Snapshot.hs | 156 ++++++++++++++++-- .../LSMTree/Internal/WriteBufferBlobs.hs | 28 +++- .../LSMTree/Internal/WriteBufferReader.hs | 97 +++++------ src/Database/LSMTree/Monoidal.hs | 9 +- src/Database/LSMTree/Normal.hs | 2 +- test/Database/LSMTree/Model/Session.hs | 13 +- test/Test/Database/LSMTree/Internal/Run.hs | 29 ++-- 10 files changed, 252 insertions(+), 138 deletions(-) diff --git a/src/Database/LSMTree.hs b/src/Database/LSMTree.hs index 54c7bbe17..b1ac95dc6 100644 --- a/src/Database/LSMTree.hs +++ b/src/Database/LSMTree.hs @@ -469,21 +469,18 @@ retrieveBlobs (Internal.Session' (sesh :: Internal.Session m h)) refs = -------------------------------------------------------------------------------} {-# SPECIALISE createSnapshot :: - ResolveValue v - => Common.SnapshotLabel + Common.SnapshotLabel -> SnapshotName -> Table IO k v b -> IO () #-} createSnapshot :: forall m k v b. - ( IOLike m - , ResolveValue v - ) + IOLike m => Common.SnapshotLabel -> SnapshotName -> Table m k v b -> m () createSnapshot label snap (Internal.Table' t) = - void $ Internal.createSnapshot (resolve (Proxy @v)) snap label Internal.SnapFullTable t + void $ Internal.createSnapshot snap label Internal.SnapFullTable t {-# SPECIALISE openSnapshot :: ResolveValue v diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index 6a5240152..35fd2f777 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -121,12 +121,10 @@ import Database.LSMTree.Internal.Snapshot.Codec import Database.LSMTree.Internal.UniqCounter import qualified Database.LSMTree.Internal.WriteBuffer as WB import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB -import qualified Database.LSMTree.Internal.WriteBufferWriter as WBW import qualified System.FS.API as FS import System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS) import qualified System.FS.BlockIO.API as FS import System.FS.BlockIO.API (HasBlockIO) -import Database.LSMTree.Internal.WriteBufferReader (readWriteBuffer) {------------------------------------------------------------------------------- Existentials @@ -1128,8 +1126,7 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do -------------------------------------------------------------------------------} {-# SPECIALISE createSnapshot :: - ResolveSerialisedValue - -> SnapshotName + SnapshotName -> SnapshotLabel -> SnapshotTableType -> Table IO h @@ -1137,18 +1134,18 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do -- | See 'Database.LSMTree.Normal.createSnapshot''. createSnapshot :: (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) - => ResolveSerialisedValue - -> SnapshotName + => SnapshotName -> SnapshotLabel -> SnapshotTableType -> Table m h -> m () -createSnapshot resolve snap label tableType t = do +createSnapshot snap label tableType t = do traceWith (tableTracer t) $ TraceSnapshot snap withOpenTable t $ \thEnv -> withTempRegistry $ \reg -> do -- TODO: use the temp registry for all side effects - let hfs = tableHasFS thEnv + let hfs = tableHasFS thEnv hbio = tableHasBlockIO thEnv + uc = tableSessionUniqCounter thEnv -- Guard that the snapshot does not exist already let snapDir = Paths.namedSnapshotDir (tableSessionRoot thEnv) snap @@ -1164,10 +1161,11 @@ createSnapshot resolve snap label tableType t = do -- Get the table content. content <- RW.withReadAccess (tableContent thEnv) pure - -- Write the write buffer. - snapWriteBuffer <- uniqueToRunNumber <$> incrUniqCounter (tableSessionUniqCounter thEnv) - let wbPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer - WBW.writeWriteBuffer hfs hbio wbPaths (tableWriteBuffer content) (tableWriteBufferBlobs content) + -- Snapshot the write buffer. + let activeDir = Paths.activeDir (tableSessionRoot thEnv) + let wb = tableWriteBuffer content + let wbb = tableWriteBufferBlobs content + snapWriteBufferNumber <- Paths.writeBufferNumber <$> snapshotWriteBuffer reg hfs hbio uc activeDir snapDir wb wbb -- Convert to snapshot format snapLevels <- toSnapLevels (tableLevels content) @@ -1175,7 +1173,7 @@ createSnapshot resolve snap label tableType t = do -- Hard link runs into the named snapshot directory snapLevels' <- snapshotRuns reg hbio snapDir snapLevels - let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBuffer snapLevels' + let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBufferNumber snapLevels' SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData @@ -1204,6 +1202,7 @@ openSnapshot sesh label tableType override snap resolve = do withTempRegistry $ \reg -> do let hfs = sessionHasFS seshEnv hbio = sessionHasBlockIO seshEnv + uc = sessionUniqCounter seshEnv -- Guard that the snapshot exists let snapDir = Paths.namedSnapshotDir (sessionRoot seshEnv) snap @@ -1226,22 +1225,20 @@ openSnapshot sesh label tableType override snap resolve = do let conf' = applyOverride override conf am <- newArenaManager - blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$> - incrUniqCounter (sessionUniqCounter seshEnv) - tableWriteBufferBlobs <- allocateTemp reg (WBB.new hfs blobpath) - releaseRef - let actDir = Paths.activeDir (sessionRoot seshEnv) + let activeDir = Paths.activeDir (sessionRoot seshEnv) -- Read write buffer - let wbPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer - tableWriteBuffer <- readWriteBuffer hfs hbio resolve tableWriteBufferBlobs wbPaths + activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc + let activeWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getActiveDir activeDir) activeWriteBufferNumber + let snapWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer + (tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths -- Hard link runs into the active directory, - snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels - + snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir activeDir snapLevels + -- Convert from the snapshot format, restoring merge progress in the process - tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels' + tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve activeDir snapLevels' releaseRuns reg snapLevels' tableCache <- mkLevelsCache reg tableLevels diff --git a/src/Database/LSMTree/Internal/BlobFile.hs b/src/Database/LSMTree/Internal/BlobFile.hs index 24c93d40b..3753223db 100644 --- a/src/Database/LSMTree/Internal/BlobFile.hs +++ b/src/Database/LSMTree/Internal/BlobFile.hs @@ -20,6 +20,7 @@ import Database.LSMTree.Internal.Serialise (SerialisedBlob (..)) import qualified System.FS.API as FS import System.FS.API (HasFS) import qualified System.FS.BlockIO.API as FS +import System.FS.CallStack (HasCallStack) -- | A handle to a file containing blobs. -- @@ -50,9 +51,10 @@ instance NFData BlobSpan where -- | Open the given file to make a 'BlobFile'. The finaliser will close and -- delete the file. -{-# SPECIALISE openBlobFile :: HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-} +{-# SPECIALISE openBlobFile :: HasCallStack => HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-} openBlobFile :: PrimMonad m + => HasCallStack => HasFS m h -> FS.FsPath -> FS.OpenMode diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index 2faae9683..681b47c61 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -12,6 +12,9 @@ module Database.LSMTree.Internal.Snapshot ( , SpentCredits (..) -- * Conversion to levels snapshot format , toSnapLevels + -- * Write buffer + , snapshotWriteBuffer + , openWriteBuffer -- * Runs , snapshotRuns , openRuns @@ -26,7 +29,7 @@ module Database.LSMTree.Internal.Snapshot ( import Control.Concurrent.Class.MonadMVar.Strict import Control.Concurrent.Class.MonadSTM (MonadSTM) import Control.DeepSeq (NFData (..)) -import Control.Monad (when) +import Control.Monad (void, when) import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadThrow (MonadMask) import Control.Monad.Primitive (PrimMonad) @@ -45,15 +48,22 @@ import Database.LSMTree.Internal.MergeSchedule import Database.LSMTree.Internal.MergingRun (NumRuns (..)) import qualified Database.LSMTree.Internal.MergingRun as MR import Database.LSMTree.Internal.Paths (ActiveDir (..), - NamedSnapshotDir (..), RunFsPaths (..), pathsForRunFiles, - runChecksumsPath) + NamedSnapshotDir (..), RunFsPaths (..), + WriteBufferFsPaths (..), pathsForRunFiles, + runChecksumsPath, writeBufferBlobPath, + writeBufferChecksumsPath, writeBufferKOpsPath) import Database.LSMTree.Internal.Run (Run) import qualified Database.LSMTree.Internal.Run as Run import Database.LSMTree.Internal.RunNumber import Database.LSMTree.Internal.UniqCounter (UniqCounter, incrUniqCounter, uniqueToRunNumber) +import Database.LSMTree.Internal.WriteBuffer (WriteBuffer) +import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs) +import qualified Database.LSMTree.Internal.WriteBufferReader as WBR +import qualified Database.LSMTree.Internal.WriteBufferWriter as WBW import qualified System.FS.API as FS import System.FS.API (HasFS) +import qualified System.FS.API.Lazy as FSL import qualified System.FS.BlockIO.API as FS import System.FS.BlockIO.API (HasBlockIO) @@ -212,6 +222,74 @@ toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m spentCredits <- readPrimVar spentCreditsVar pure (SnapOngoingMerge rs (SpentCredits spentCredits) (Merge.mergeLevel m)) +{------------------------------------------------------------------------------- + Write Buffer +-------------------------------------------------------------------------------} + +{-# SPECIALISE + snapshotWriteBuffer :: + TempRegistry IO + -> HasFS IO h + -> HasBlockIO IO h + -> UniqCounter IO + -> ActiveDir + -> NamedSnapshotDir + -> WriteBuffer + -> Ref (WriteBufferBlobs IO h) + -> IO WriteBufferFsPaths + #-} +snapshotWriteBuffer :: + (MonadMVar m, MonadSTM m, MonadST m, MonadMask m) + => TempRegistry m + -> HasFS m h + -> HasBlockIO m h + -> UniqCounter m + -> ActiveDir + -> NamedSnapshotDir + -> WriteBuffer + -> Ref (WriteBufferBlobs m h) + -> m WriteBufferFsPaths +snapshotWriteBuffer reg hfs hbio uc activeDir snapDir wb wbb = do + -- Write the write buffer and write buffer blobs to the active directory. + activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc + let activeWriteBufferPaths = WriteBufferFsPaths (getActiveDir activeDir) activeWriteBufferNumber + WBW.writeWriteBuffer hfs hbio activeWriteBufferPaths wb wbb + -- Hard link the write buffer and write buffer blobs to the snapshot directory. + snapWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc + let snapWriteBufferPaths = WriteBufferFsPaths (getNamedSnapshotDir snapDir) snapWriteBufferNumber + hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferKOpsPath activeWriteBufferPaths) (writeBufferKOpsPath snapWriteBufferPaths) + hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferBlobPath activeWriteBufferPaths) (writeBufferBlobPath snapWriteBufferPaths) + hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferChecksumsPath activeWriteBufferPaths) (writeBufferChecksumsPath snapWriteBufferPaths) + pure snapWriteBufferPaths + +{-# SPECIALISE + openWriteBuffer :: + TempRegistry IO + -> ResolveSerialisedValue + -> HasFS IO h + -> HasBlockIO IO h + -> WriteBufferFsPaths + -> WriteBufferFsPaths + -> IO (WriteBuffer, Ref (WriteBufferBlobs IO h)) + #-} +openWriteBuffer :: + (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) + => TempRegistry m + -> ResolveSerialisedValue + -> HasFS m h + -> HasBlockIO m h + -> WriteBufferFsPaths + -> WriteBufferFsPaths + -> m (WriteBuffer, Ref (WriteBufferBlobs m h)) +openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths = do + -- Hard link the write buffer keyops and checksum files to the snapshot directory. + hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferKOpsPath snapWriteBufferPaths) (writeBufferKOpsPath activeWriteBufferPaths) + hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferChecksumsPath snapWriteBufferPaths) (writeBufferChecksumsPath activeWriteBufferPaths) + -- Copy the write buffer blobs file to the snapshot directory. + copyFileTemp reg hfs hbio (writeBufferBlobPath snapWriteBufferPaths) (writeBufferBlobPath activeWriteBufferPaths) + -- Read write buffer + WBR.readWriteBuffer reg resolve hfs hbio activeWriteBufferPaths + {------------------------------------------------------------------------------- Runs -------------------------------------------------------------------------------} @@ -398,12 +476,66 @@ hardLinkRunFiles :: hardLinkRunFiles reg hfs hbio dur sourceRunFsPaths targetRunFsPaths = do let sourcePaths = pathsForRunFiles sourceRunFsPaths targetPaths = pathsForRunFiles targetRunFsPaths - sequenceA_ (hardLinkTemp <$> sourcePaths <*> targetPaths) - hardLinkTemp (runChecksumsPath sourceRunFsPaths) (runChecksumsPath targetRunFsPaths) - where - hardLinkTemp sourcePath targetPath = do - allocateTemp reg - (FS.createHardLink hbio sourcePath targetPath) - (\_ -> FS.removeFile hfs targetPath) - when (dur == HardLinkDurable) $ - FS.synchroniseFile hfs hbio targetPath + sequenceA_ (hardLinkTemp reg hfs hbio dur <$> sourcePaths <*> targetPaths) + hardLinkTemp reg hfs hbio dur (runChecksumsPath sourceRunFsPaths) (runChecksumsPath targetRunFsPaths) + +{------------------------------------------------------------------------------- + Hard link file +-------------------------------------------------------------------------------} + +{-# SPECIALISE + hardLinkTemp :: + TempRegistry IO + -> HasFS IO h + -> HasBlockIO IO h + -> HardLinkDurable + -> FS.FsPath + -> FS.FsPath + -> IO () + #-} +-- | @'hardLinkTemp' reg hfs hbio dur sourcePath targetPath@ creates a hard link +-- from @sourcePath@ to @targetPath@. +hardLinkTemp :: + (MonadMask m, MonadMVar m) + => TempRegistry m + -> HasFS m h + -> HasBlockIO m h + -> HardLinkDurable + -> FS.FsPath + -> FS.FsPath + -> m () +hardLinkTemp reg hfs hbio dur sourcePath targetPath = do + allocateTemp reg + (FS.createHardLink hbio sourcePath targetPath) + (\_ -> FS.removeFile hfs targetPath) + when (dur == HardLinkDurable) $ + FS.synchroniseFile hfs hbio targetPath + +{------------------------------------------------------------------------------- + Copy file +-------------------------------------------------------------------------------} + +{-# SPECIALISE + copyFileTemp :: + TempRegistry IO + -> HasFS IO h + -> HasBlockIO IO h + -> FS.FsPath + -> FS.FsPath + -> IO () + #-} +-- | @'copyFile' hfs hbio source target@ copies the @source@ path to the @target@ path. +copyFileTemp :: + (MonadMask m, MonadMVar m) + => TempRegistry m + -> HasFS m h + -> HasBlockIO m h + -> FS.FsPath + -> FS.FsPath + -> m () +copyFileTemp reg hfs _hbio sourcePath targetPath = + flip (allocateTemp reg) (\_ -> FS.removeFile hfs targetPath) $ + FS.withFile hfs sourcePath FS.ReadMode $ \sourceHandle -> + FS.withFile hfs targetPath (FS.WriteMode FS.MustBeNew) $ \targetHandle -> do + bs <- FSL.hGetAll hfs sourceHandle + void $ FSL.hPutAll hfs targetHandle bs diff --git a/src/Database/LSMTree/Internal/WriteBufferBlobs.hs b/src/Database/LSMTree/Internal/WriteBufferBlobs.hs index 880c44151..e3b6f4daa 100644 --- a/src/Database/LSMTree/Internal/WriteBufferBlobs.hs +++ b/src/Database/LSMTree/Internal/WriteBufferBlobs.hs @@ -25,6 +25,7 @@ module Database.LSMTree.Internal.WriteBufferBlobs ( WriteBufferBlobs (..), new, + open, addBlob, mkRawBlobRef, mkWeakBlobRef, @@ -33,6 +34,7 @@ module Database.LSMTree.Internal.WriteBufferBlobs ( ) where import Control.DeepSeq (NFData (..)) +import Control.Monad (void) import Control.Monad.Class.MonadThrow import Control.Monad.Primitive (PrimMonad, PrimState) import Control.RefCount @@ -116,15 +118,29 @@ instance RefCounted m (WriteBufferBlobs m h) where getRefCounter = writeBufRefCounter {-# SPECIALISE new :: HasFS IO h -> FS.FsPath -> IO (Ref (WriteBufferBlobs IO h)) #-} -new :: (PrimMonad m, MonadMask m) - => HasFS m h - -> FS.FsPath - -> m (Ref (WriteBufferBlobs m h)) -new fs blobFileName = do +new :: + (PrimMonad m, MonadMask m) + => HasFS m h + -> FS.FsPath + -> m (Ref (WriteBufferBlobs m h)) +new fs blobFileName = open fs blobFileName FS.MustBeNew + +{-# SPECIALISE open :: HasFS IO h -> FS.FsPath -> FS.AllowExisting -> IO (Ref (WriteBufferBlobs IO h)) #-} +-- | Open a `WriteBufferBlobs` file and sets the file pointer to the end of the file. +open :: + (PrimMonad m, MonadMask m) + => HasFS m h + -> FS.FsPath + -> FS.AllowExisting + -> m (Ref (WriteBufferBlobs m h)) +open fs blobFileName blobFileAllowExisting = do -- Must use read/write mode because we write blobs when adding, but -- we can also be asked to retrieve blobs at any time. - blobFile <- openBlobFile fs blobFileName (FS.ReadWriteMode FS.MustBeNew) + blobFile <- openBlobFile fs blobFileName (FS.ReadWriteMode blobFileAllowExisting) blobFilePointer <- newFilePointer + -- Set the blob file pointer to the end of the file + blobFileSize <- withRef blobFile $ FS.hGetSize fs . blobFileHandle + void . updateFilePointer blobFilePointer . fromIntegral $ blobFileSize newRef (releaseRef blobFile) $ \writeBufRefCounter -> WriteBufferBlobs { blobFile, diff --git a/src/Database/LSMTree/Internal/WriteBufferReader.hs b/src/Database/LSMTree/Internal/WriteBufferReader.hs index 121415a44..59f9e3ab2 100644 --- a/src/Database/LSMTree/Internal/WriteBufferReader.hs +++ b/src/Database/LSMTree/Internal/WriteBufferReader.hs @@ -4,24 +4,22 @@ module Database.LSMTree.Internal.WriteBufferReader ( readWriteBuffer ) where +import Control.Concurrent.Class.MonadMVar.Strict import Control.Monad.Class.MonadST (MonadST (..)) import Control.Monad.Class.MonadSTM (MonadSTM (..)) -import Control.Monad.Class.MonadThrow (MonadCatch (..), MonadMask, - MonadThrow (..)) +import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..)) import Control.Monad.Primitive (PrimMonad (..)) import Control.RefCount (Ref, releaseRef) +import Control.TempRegistry import Data.Primitive.MutVar (MutVar, newMutVar, readMutVar, writeMutVar) import Data.Primitive.PrimVar -import qualified Data.Vector as V import Data.Word (Word16) -import Database.LSMTree.Internal.BlobFile (BlobFile, openBlobFile) -import Database.LSMTree.Internal.BlobRef (RawBlobRef (..), - mkRawBlobRef, readRawBlobRef) +import Database.LSMTree.Internal.BlobRef (RawBlobRef (..)) import qualified Database.LSMTree.Internal.Entry as E import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) -import Database.LSMTree.Internal.MergeSchedule (addWriteBufferEntries) import Database.LSMTree.Internal.Paths +import qualified Database.LSMTree.Internal.Paths as Paths import Database.LSMTree.Internal.RawPage import Database.LSMTree.Internal.RunReader (Entry (..), Result (..), mkEntryOverflow, readDiskPage, readOverflowPages, @@ -30,42 +28,38 @@ import Database.LSMTree.Internal.Serialise (SerialisedValue) import Database.LSMTree.Internal.WriteBuffer (WriteBuffer) import qualified Database.LSMTree.Internal.WriteBuffer as WB import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs) +import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB import qualified System.FS.API as FS import System.FS.API (HasFS) import qualified System.FS.BlockIO.API as FS import System.FS.BlockIO.API (HasBlockIO) -import qualified Database.LSMTree.Internal.Paths as Paths {-# SPECIALISE readWriteBuffer :: - HasFS IO h - -> HasBlockIO IO h + TempRegistry IO -> ResolveSerialisedValue - -> Ref (WriteBufferBlobs IO h) + -> HasFS IO h + -> HasBlockIO IO h -> WriteBufferFsPaths - -> IO WriteBuffer + -> IO (WriteBuffer, Ref (WriteBufferBlobs IO h)) #-} readWriteBuffer :: - (MonadMask m, MonadSTM m, MonadST m) - => HasFS m h - -> HasBlockIO m h + (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) + => TempRegistry m -> ResolveSerialisedValue - -> Ref (WriteBufferBlobs m h) + -> HasFS m h + -> HasBlockIO m h -> WriteBufferFsPaths - -> m WriteBuffer -readWriteBuffer hfs hbio f wbb fsPaths = - bracket (new hfs hbio fsPaths) close $ \reader -> do - let readEntry = - E.traverseBlobRef (readRawBlobRef hfs) . toFullEntry - let readEntries = next reader >>= \case - Empty -> pure [] - ReadEntry key entry -> - readEntry entry >>= \entry' -> - ((key, entry') :) <$> readEntries - es <- V.fromList <$> readEntries - -- TODO: We cannot derive the number of entries from the in-memory index. - let maxn = E.NumEntries $ V.length es - fst <$> addWriteBufferEntries hfs f wbb maxn WB.empty es + -> m (WriteBuffer, Ref (WriteBufferBlobs m h)) +readWriteBuffer reg resolve hfs hbio wbPaths = + bracket (new reg hfs hbio wbPaths) close $ \reader@WriteBufferReader{..} -> + (,) <$> readEntries reader <*> pure readerWriteBufferBlobs + where + readEntries reader = readEntriesAcc WB.empty + where + readEntriesAcc acc = next reader >>= \case + Empty -> pure acc + ReadEntry key entry -> readEntriesAcc $ WB.addEntry resolve key (rawBlobRefSpan <$> toFullEntry entry) acc -- | Allows reading the k\/ops of a run incrementally, using its own read-only -- file handle and in-memory cache of the current disk page. @@ -74,34 +68,36 @@ readWriteBuffer hfs hbio f wbb fsPaths = data WriteBufferReader m h = WriteBufferReader { -- | The disk page currently being read. If it is 'Nothing', the reader -- is considered closed. - readerCurrentPage :: !(MutVar (PrimState m) (Maybe RawPage)) + readerCurrentPage :: !(MutVar (PrimState m) (Maybe RawPage)) -- | The index of the entry to be returned by the next call to 'next'. - , readerCurrentEntryNo :: !(PrimVar (PrimState m) Word16) - , readerKOpsHandle :: !(FS.Handle h) - , readerBlobFile :: !(Ref (BlobFile m h)) - , readerHasFS :: !(HasFS m h) - , readerHasBlockIO :: !(HasBlockIO m h) + , readerCurrentEntryNo :: !(PrimVar (PrimState m) Word16) + , readerKOpsHandle :: !(FS.Handle h) + , readerWriteBufferBlobs :: !(Ref (WriteBufferBlobs m h)) + , readerHasFS :: !(HasFS m h) + , readerHasBlockIO :: !(HasBlockIO m h) } {-# SPECIALISE new :: - HasFS IO h + TempRegistry IO + -> HasFS IO h -> HasBlockIO IO h -> WriteBufferFsPaths -> IO (WriteBufferReader IO h) #-} -- | See 'Database.LSMTree.Internal.RunReader.new'. new :: forall m h. - (MonadCatch m, MonadSTM m, MonadST m) - => HasFS m h + (MonadMVar m, MonadST m, MonadMask m) + => TempRegistry m + -> HasFS m h -> HasBlockIO m h -> WriteBufferFsPaths -> m (WriteBufferReader m h) -new readerHasFS readerHasBlockIO fsPaths = do +new reg readerHasFS readerHasBlockIO fsPaths = do (readerKOpsHandle :: FS.Handle h) <- FS.hOpen readerHasFS (Paths.writeBufferKOpsPath fsPaths) FS.ReadMode -- Double the file readahead window (only applies to this file descriptor) FS.hAdviseAll readerHasBlockIO readerKOpsHandle FS.AdviceSequential - readerBlobFile <- openBlobFile readerHasFS (Paths.writeBufferBlobPath fsPaths) FS.ReadMode + readerWriteBufferBlobs <- allocateTemp reg (WBB.open readerHasFS (Paths.writeBufferBlobPath fsPaths) FS.AllowExisting) releaseRef -- Load first page from disk, if it exists. readerCurrentEntryNo <- newPrimVar (0 :: Word16) firstPage <- readDiskPage readerHasFS readerKOpsHandle @@ -143,28 +139,17 @@ next WriteBufferReader {..} = do IndexEntry key entry -> do modifyPrimVar readerCurrentEntryNo (+1) let entry' :: E.Entry SerialisedValue (RawBlobRef m h) - entry' = fmap (mkRawBlobRef readerBlobFile) entry + entry' = fmap (WBB.mkRawBlobRef readerWriteBufferBlobs) entry let rawEntry = Entry entry' return (ReadEntry key rawEntry) IndexEntryOverflow key entry lenSuffix -> do -- TODO: we know that we need the next page, could already load? modifyPrimVar readerCurrentEntryNo (+1) let entry' :: E.Entry SerialisedValue (RawBlobRef m h) - entry' = fmap (mkRawBlobRef readerBlobFile) entry + entry' = fmap (WBB.mkRawBlobRef readerWriteBufferBlobs) entry overflowPages <- readOverflowPages readerHasFS readerKOpsHandle lenSuffix let rawEntry = mkEntryOverflow entry' page lenSuffix overflowPages return (ReadEntry key rawEntry) -{-# SPECIALISE - close :: - WriteBufferReader IO h - -> IO () - #-} --- | See 'Database.LSMTree.Internal.RunReader.close'. -close :: - (MonadMask m, PrimMonad m) - => WriteBufferReader m h - -> m () -close WriteBufferReader{..} = do - FS.hClose readerHasFS readerKOpsHandle - releaseRef readerBlobFile +close :: WriteBufferReader m h -> m () +close WriteBufferReader{..} = FS.hClose readerHasFS readerKOpsHandle diff --git a/src/Database/LSMTree/Monoidal.hs b/src/Database/LSMTree/Monoidal.hs index 00b461406..ab82df9e7 100644 --- a/src/Database/LSMTree/Monoidal.hs +++ b/src/Database/LSMTree/Monoidal.hs @@ -524,8 +524,7 @@ mupserts t = updates t . fmap (second Mupsert) -------------------------------------------------------------------------------} {-# SPECIALISE createSnapshot :: - ResolveValue v - => Common.SnapshotLabel + Common.SnapshotLabel -> SnapshotName -> Table IO k v -> IO () #-} @@ -552,15 +551,13 @@ mupserts t = updates t . fmap (second Mupsert) -- the snapshot names are distinct (otherwise this would be a race). -- createSnapshot :: forall m k v. - ( IOLike m - , ResolveValue v - ) + IOLike m => Common.SnapshotLabel -> SnapshotName -> Table m k v -> m () createSnapshot label snap (Internal.MonoidalTable t) = - Internal.createSnapshot (resolve @v Proxy) snap label Internal.SnapMonoidalTable t + Internal.createSnapshot snap label Internal.SnapMonoidalTable t {-# SPECIALISE openSnapshot :: ResolveValue v diff --git a/src/Database/LSMTree/Normal.hs b/src/Database/LSMTree/Normal.hs index b214b4161..e05ca418f 100644 --- a/src/Database/LSMTree/Normal.hs +++ b/src/Database/LSMTree/Normal.hs @@ -677,7 +677,7 @@ createSnapshot :: forall m k v b. -> Table m k v b -> m () createSnapshot label snap (Internal.NormalTable t) = - Internal.createSnapshot const snap label Internal.SnapNormalTable t + Internal.createSnapshot snap label Internal.SnapNormalTable t {-# SPECIALISE openSnapshot :: Session IO diff --git a/test/Database/LSMTree/Model/Session.hs b/test/Database/LSMTree/Model/Session.hs index 50f4c6368..af8ea4227 100644 --- a/test/Database/LSMTree/Model/Session.hs +++ b/test/Database/LSMTree/Model/Session.hs @@ -458,19 +458,8 @@ createSnapshot :: -> Table k v b -> m () createSnapshot label name t@Table{..} = do - (updc, table) <- guardTableIsOpen t + (_updc, table) <- guardTableIsOpen t snaps <- gets snapshots - -- TODO: For the moment we allow snapshot to invalidate blob refs. - -- Ideally we should change the implementation to not invalidate on - -- snapshot, and then we can remove the artificial invalidation from - -- the model (i.e. delete the lines below that increments updc). - -- Furthermore, we invalidate them _before_ checking if there is a - -- duplicate snapshot. This is a bit barmy, but it matches the - -- implementation. The implementation should be fixed. - -- TODO: See https://github.com/IntersectMBO/lsm-tree/issues/392 - modify (\m -> m { - tables = Map.insert tableID (updc + 1, toSomeTable table) (tables m) - }) when (Map.member name snaps) $ throwError ErrSnapshotExists modify (\m -> m { diff --git a/test/Test/Database/LSMTree/Internal/Run.hs b/test/Test/Database/LSMTree/Internal/Run.hs index f1dcf93b6..ec25f8ab2 100644 --- a/test/Test/Database/LSMTree/Internal/Run.hs +++ b/test/Test/Database/LSMTree/Internal/Run.hs @@ -238,22 +238,21 @@ prop_WriteAndOpenWriteBuffer :: -> RunData KeyForIndexCompact SerialisedValue SerialisedBlob -> IO Property prop_WriteAndOpenWriteBuffer hfs hbio rd = do - -- Serialise run data as write buffer: - let srd = serialiseRunData rd - let inPaths = WrapRunFsPaths $ simplePath 1111 - let f (SerialisedValue x) (SerialisedValue y) = SerialisedValue (x <> y) - withRunDataAsWriteBuffer hfs f inPaths srd $ \wb wbb -> do - -- Write write buffer to disk: - let wbPaths = WrapRunFsPaths $ simplePath 1312 - withSerialisedWriteBuffer hfs hbio wbPaths wb wbb $ do - -- Read write buffer from disk: - let outPaths = WrapRunFsPaths $ simplePath 2222 - let outBlobPath = Paths.writeBufferBlobPath outPaths - bracket (WBB.new hfs outBlobPath) releaseRef $ \wbb' -> do - wb' <- readWriteBuffer hfs hbio f wbb' wbPaths + withTempRegistry $ \reg -> do + -- Serialise run data as write buffer: + let srd = serialiseRunData rd + let inPaths = WrapRunFsPaths $ simplePath 1111 + let resolve (SerialisedValue x) (SerialisedValue y) = SerialisedValue (x <> y) + withRunDataAsWriteBuffer hfs resolve inPaths srd $ \wb wbb -> do + -- Write write buffer to disk: + let wbPaths = WrapRunFsPaths $ simplePath 1312 + withSerialisedWriteBuffer hfs hbio wbPaths wb wbb $ do + -- Read write buffer from disk: + (wb', wbb') <- readWriteBuffer reg resolve hfs hbio wbPaths assertEqual "k/ops" wb wb' - -- TODO: return a proper Property instead of using assertEqual etc. - return (property True) + releaseRef wbb' + -- TODO: return a proper Property instead of using assertEqual etc. + return (property True) -- | Writing run data to the disk via 'writeWriteBuffer' gives the same key/ops -- and blob files as when written out as a run.