Skip to content

Commit

Permalink
All tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
wenkokke committed Dec 9, 2024
1 parent df6995b commit f5ad722
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 82 deletions.
33 changes: 17 additions & 16 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,8 +1088,9 @@ createSnapshot resolve snap label tableType t = do
traceWith (tableTracer t) $ TraceSnapshot snap
withOpenTable t $ \thEnv ->
withTempRegistry $ \reg -> do -- TODO: use the temp registry for all side effects
let hfs = tableHasFS thEnv
let hbio = tableHasBlockIO thEnv
let hfs = tableHasFS thEnv
hbio = tableHasBlockIO thEnv
uc = tableSessionUniqCounter thEnv

-- Guard that the snapshot does not exist already
let snapDir = Paths.namedSnapshotDir (tableSessionRoot thEnv) snap
Expand All @@ -1105,18 +1106,19 @@ createSnapshot resolve snap label tableType t = do
-- Get the table content.
content <- RW.withReadAccess (tableContent thEnv) pure

-- Write the write buffer.
snapWriteBuffer <- uniqueToRunNumber <$> incrUniqCounter (tableSessionUniqCounter thEnv)
let wbPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer
WBW.writeWriteBuffer hfs hbio wbPaths (tableWriteBuffer content) (tableWriteBufferBlobs content)
-- Snapshot the write buffer.
let activeDir = Paths.activeDir (tableSessionRoot thEnv)
let wb = tableWriteBuffer content
let wbb = tableWriteBufferBlobs content
snapWriteBufferNumber <- Paths.writeBufferNumber <$> snapshotWriteBuffer hfs hbio uc activeDir snapDir wb wbb

-- Convert to snapshot format
snapLevels <- toSnapLevels (tableLevels content)

-- Hard link runs into the named snapshot directory
snapLevels' <- snapshotRuns reg snapDir snapLevels

let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBuffer snapLevels'
let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBufferNumber snapLevels'
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData
Expand Down Expand Up @@ -1145,6 +1147,7 @@ openSnapshot sesh label tableType override snap resolve = do
withTempRegistry $ \reg -> do
let hfs = sessionHasFS seshEnv
hbio = sessionHasBlockIO seshEnv
uc = sessionUniqCounter seshEnv

-- Guard that the snapshot exists
let snapDir = Paths.namedSnapshotDir (sessionRoot seshEnv) snap
Expand All @@ -1167,22 +1170,20 @@ openSnapshot sesh label tableType override snap resolve = do

let conf' = applyOverride override conf
am <- newArenaManager
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
incrUniqCounter (sessionUniqCounter seshEnv)
tableWriteBufferBlobs <- allocateTemp reg (WBB.new hfs blobpath)
releaseRef

let actDir = Paths.activeDir (sessionRoot seshEnv)
let activeDir = Paths.activeDir (sessionRoot seshEnv)

-- Read write buffer
let wbPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer
tableWriteBuffer <- readWriteBuffer hfs hbio resolve tableWriteBufferBlobs wbPaths
activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc
let activeWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getActiveDir activeDir) activeWriteBufferNumber
let snapWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer
(tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths

-- Hard link runs into the active directory,
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir activeDir snapLevels

-- Convert from the snapshot format, restoring merge progress in the process
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels'
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve activeDir snapLevels'

tableCache <- mkLevelsCache reg tableLevels
newWith reg sesh seshEnv conf' am $! TableContent {
Expand Down
4 changes: 3 additions & 1 deletion src/Database/LSMTree/Internal/BlobFile.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Database.LSMTree.Internal.Serialise (SerialisedBlob (..))
import qualified System.FS.API as FS
import System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS
import System.FS.CallStack (HasCallStack)

-- | A handle to a file containing blobs.
--
Expand Down Expand Up @@ -51,9 +52,10 @@ instance NFData BlobSpan where

-- | Open the given file to make a 'BlobFile'. The finaliser will close and
-- delete the file.
{-# SPECIALISE openBlobFile :: HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-}
{-# SPECIALISE openBlobFile :: HasCallStack => HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-}
openBlobFile ::
PrimMonad m
=> HasCallStack
=> HasFS m h
-> FS.FsPath
-> FS.OpenMode
Expand Down
95 changes: 94 additions & 1 deletion src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ module Database.LSMTree.Internal.Snapshot (
, SpentCredits (..)
-- * Conversion to levels snapshot format
, toSnapLevels
-- * Write buffer
, snapshotWriteBuffer
, openWriteBuffer
-- * Runs
, snapshotRuns
, openRuns
Expand Down Expand Up @@ -42,7 +45,7 @@ import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.Paths (ActiveDir (..),
NamedSnapshotDir (..), RunFsPaths (..), pathsForRunFiles,
runChecksumsPath)
runChecksumsPath, WriteBufferFsPaths (..), writeBufferKOpsPath, writeBufferBlobPath, writeBufferChecksumsPath)
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunNumber
Expand All @@ -52,6 +55,11 @@ import qualified System.FS.API as FS
import System.FS.API (HasFS)
import qualified System.FS.API.Lazy as FS
import System.FS.BlockIO.API (HasBlockIO)
import Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs)
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified Database.LSMTree.Internal.WriteBufferWriter as WBW
import qualified Database.LSMTree.Internal.WriteBufferReader as WBR

{-------------------------------------------------------------------------------
Snapshot metadata
Expand Down Expand Up @@ -186,6 +194,72 @@ toSnapMergingRunState (OngoingMerge rs (SpentCreditsVar spentCreditsVar) m) = do
spentCredits <- readPrimVar spentCreditsVar
pure (SnapOngoingMerge rs (SpentCredits spentCredits) (Merge.mergeLevel m))

{-------------------------------------------------------------------------------
Write Buffer
-------------------------------------------------------------------------------}

{-# SPECIALISE
snapshotWriteBuffer ::
HasFS IO h
-> HasBlockIO IO h
-> UniqCounter IO
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs IO h)
-> IO WriteBufferFsPaths
#-}
snapshotWriteBuffer ::
(MonadMVar m, MonadSTM m, MonadST m, MonadThrow m)
=> HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m WriteBufferFsPaths
snapshotWriteBuffer hfs hbio uc activeDir snapDir wb wbb = do
-- Write the write buffer and write buffer blobs to the active directory.
activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc
let activeWriteBufferPaths = WriteBufferFsPaths (getActiveDir activeDir) activeWriteBufferNumber
WBW.writeWriteBuffer hfs hbio activeWriteBufferPaths wb wbb
-- Hard link the write buffer and write buffer blobs to the snapshot directory.
snapWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc
let snapWriteBufferPaths = WriteBufferFsPaths (getNamedSnapshotDir snapDir) snapWriteBufferNumber
hardLink hfs hbio (writeBufferKOpsPath activeWriteBufferPaths) (writeBufferKOpsPath snapWriteBufferPaths)
hardLink hfs hbio (writeBufferBlobPath activeWriteBufferPaths) (writeBufferBlobPath snapWriteBufferPaths)
hardLink hfs hbio (writeBufferChecksumsPath activeWriteBufferPaths) (writeBufferChecksumsPath snapWriteBufferPaths)
pure snapWriteBufferPaths

{-# SPECIALISE
openWriteBuffer ::
TempRegistry IO
-> ResolveSerialisedValue
-> HasFS IO h
-> HasBlockIO IO h
-> WriteBufferFsPaths
-> WriteBufferFsPaths
-> IO (WriteBuffer, Ref (WriteBufferBlobs IO h))
#-}
openWriteBuffer ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> TempRegistry m
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> WriteBufferFsPaths
-> WriteBufferFsPaths
-> m (WriteBuffer, Ref (WriteBufferBlobs m h))
openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths = do
-- Hard link the write buffer keyops and checksum files to the snapshot directory.
hardLink hfs hbio (writeBufferKOpsPath snapWriteBufferPaths) (writeBufferKOpsPath activeWriteBufferPaths)
hardLink hfs hbio (writeBufferChecksumsPath snapWriteBufferPaths) (writeBufferChecksumsPath activeWriteBufferPaths)
-- Copy the write buffer blobs file to the snapshot directory.
copyFile hfs hbio (writeBufferBlobPath snapWriteBufferPaths) (writeBufferBlobPath activeWriteBufferPaths)
-- Read write buffer
WBR.readWriteBuffer reg resolve hfs hbio activeWriteBufferPaths

{-------------------------------------------------------------------------------
Runs
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -396,3 +470,22 @@ hardLink hfs _hbio sourcePath targetPath =
-- not, it is only a temporary placeholder hack.
bs <- FS.hGetAll hfs sourceHandle
void $ FS.hPutAll hfs targetHandle bs


{-------------------------------------------------------------------------------
Copy file
-------------------------------------------------------------------------------}

{-# SPECIALISE copyFile ::
HasFS IO h
-> HasBlockIO IO h
-> FS.FsPath
-> FS.FsPath
-> IO () #-}
-- | @'copyFile' hfs hbio source target@ copies the @source@ path to the @target@ path.
copyFile :: MonadThrow m => HasFS m h -> HasBlockIO m h -> FS.FsPath -> FS.FsPath -> m ()
copyFile hfs _hbio sourcePath targetPath =
FS.withFile hfs sourcePath FS.ReadMode $ \sourceHandle ->
FS.withFile hfs targetPath (FS.WriteMode FS.MustBeNew) $ \targetHandle -> do
bs <- FS.hGetAll hfs sourceHandle
void $ FS.hPutAll hfs targetHandle bs
23 changes: 17 additions & 6 deletions src/Database/LSMTree/Internal/WriteBufferBlobs.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
module Database.LSMTree.Internal.WriteBufferBlobs (
WriteBufferBlobs (..),
new,
open,
addBlob,
mkRawBlobRef,
mkWeakBlobRef,
Expand Down Expand Up @@ -117,14 +118,24 @@ instance RefCounted (WriteBufferBlobs m h) where
getRefCounter = writeBufRefCounter

{-# SPECIALISE new :: HasFS IO h -> FS.FsPath -> IO (Ref (WriteBufferBlobs IO h)) #-}
new :: (PrimMonad m, MonadMask m)
=> HasFS m h
-> FS.FsPath
-> m (Ref (WriteBufferBlobs m h))
new fs blobFileName = do
new ::
(PrimMonad m, MonadMask m)
=> HasFS m h
-> FS.FsPath
-> m (Ref (WriteBufferBlobs m h))
new fs blobFileName = open fs blobFileName FS.MustBeNew

{-# SPECIALISE new :: HasFS IO h -> FS.FsPath -> IO (Ref (WriteBufferBlobs IO h)) #-}
open ::
(PrimMonad m, MonadMask m)
=> HasFS m h
-> FS.FsPath
-> FS.AllowExisting
-> m (Ref (WriteBufferBlobs m h))
open fs blobFileName blobFileAllowExisting = do
-- Must use read/write mode because we write blobs when adding, but
-- we can also be asked to retrieve blobs at any time.
blobFile <- openBlobFile fs blobFileName (FS.ReadWriteMode FS.MustBeNew)
blobFile <- openBlobFile fs blobFileName (FS.ReadWriteMode blobFileAllowExisting)
blobFilePointer <- newFilePointer
newRef (releaseRef blobFile) $ \writeBufRefCounter ->
WriteBufferBlobs {
Expand Down
Loading

0 comments on commit f5ad722

Please sign in to comment.