Skip to content

Commit

Permalink
fix(issue392): set blob file pointer to end
Browse files Browse the repository at this point in the history
This commit fixes the error in the open function for WriteBufferBlobs, but
also contains a significant amount of refactoring based on feedback on the PR.
  • Loading branch information
wenkokke committed Jan 2, 2025
1 parent 923f2dd commit dbd0bcd
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 138 deletions.
9 changes: 3 additions & 6 deletions src/Database/LSMTree.hs
Original file line number Diff line number Diff line change
Expand Up @@ -469,21 +469,18 @@ retrieveBlobs (Internal.Session' (sesh :: Internal.Session m h)) refs =
-------------------------------------------------------------------------------}

{-# SPECIALISE createSnapshot ::
ResolveValue v
=> Common.SnapshotLabel
Common.SnapshotLabel
-> SnapshotName
-> Table IO k v b
-> IO () #-}
createSnapshot :: forall m k v b.
( IOLike m
, ResolveValue v
)
IOLike m
=> Common.SnapshotLabel
-> SnapshotName
-> Table m k v b
-> m ()
createSnapshot label snap (Internal.Table' t) =
void $ Internal.createSnapshot (resolve (Proxy @v)) snap label Internal.SnapFullTable t
void $ Internal.createSnapshot snap label Internal.SnapFullTable t

{-# SPECIALISE openSnapshot ::
ResolveValue v
Expand Down
43 changes: 20 additions & 23 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,10 @@ import Database.LSMTree.Internal.Snapshot.Codec
import Database.LSMTree.Internal.UniqCounter
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified Database.LSMTree.Internal.WriteBufferWriter as WBW
import qualified System.FS.API as FS
import System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS)
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (HasBlockIO)
import Database.LSMTree.Internal.WriteBufferReader (readWriteBuffer)

{-------------------------------------------------------------------------------
Existentials
Expand Down Expand Up @@ -1128,27 +1126,26 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do
-------------------------------------------------------------------------------}

{-# SPECIALISE createSnapshot ::
ResolveSerialisedValue
-> SnapshotName
SnapshotName
-> SnapshotLabel
-> SnapshotTableType
-> Table IO h
-> IO () #-}
-- | See 'Database.LSMTree.Normal.createSnapshot''.
createSnapshot ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> SnapshotName
=> SnapshotName
-> SnapshotLabel
-> SnapshotTableType
-> Table m h
-> m ()
createSnapshot resolve snap label tableType t = do
createSnapshot snap label tableType t = do
traceWith (tableTracer t) $ TraceSnapshot snap
withOpenTable t $ \thEnv ->
withTempRegistry $ \reg -> do -- TODO: use the temp registry for all side effects
let hfs = tableHasFS thEnv
let hfs = tableHasFS thEnv
hbio = tableHasBlockIO thEnv
uc = tableSessionUniqCounter thEnv

-- Guard that the snapshot does not exist already
let snapDir = Paths.namedSnapshotDir (tableSessionRoot thEnv) snap
Expand All @@ -1166,18 +1163,19 @@ createSnapshot resolve snap label tableType t = do
-- Get the table content.
content <- RW.withReadAccess (tableContent thEnv) pure

-- Write the write buffer.
snapWriteBuffer <- uniqueToRunNumber <$> incrUniqCounter (tableSessionUniqCounter thEnv)
let wbPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer
WBW.writeWriteBuffer hfs hbio wbPaths (tableWriteBuffer content) (tableWriteBufferBlobs content)
-- Snapshot the write buffer.
let activeDir = Paths.activeDir (tableSessionRoot thEnv)
let wb = tableWriteBuffer content
let wbb = tableWriteBufferBlobs content
snapWriteBufferNumber <- Paths.writeBufferNumber <$> snapshotWriteBuffer reg hfs hbio uc activeDir snapDir wb wbb

-- Convert to snapshot format
snapLevels <- toSnapLevels (tableLevels content)

-- Hard link runs into the named snapshot directory
snapLevels' <- snapshotRuns reg hbio snapDir snapLevels

let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBuffer snapLevels'
let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapWriteBufferNumber snapLevels'
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData
Expand Down Expand Up @@ -1206,6 +1204,7 @@ openSnapshot sesh label tableType override snap resolve = do
withTempRegistry $ \reg -> do
let hfs = sessionHasFS seshEnv
hbio = sessionHasBlockIO seshEnv
uc = sessionUniqCounter seshEnv

-- Guard that the snapshot exists
let snapDir = Paths.namedSnapshotDir (sessionRoot seshEnv) snap
Expand All @@ -1228,22 +1227,20 @@ openSnapshot sesh label tableType override snap resolve = do

let conf' = applyOverride override conf
am <- newArenaManager
blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$>
incrUniqCounter (sessionUniqCounter seshEnv)
tableWriteBufferBlobs <- allocateTemp reg (WBB.new hfs blobpath)
releaseRef

let actDir = Paths.activeDir (sessionRoot seshEnv)
let activeDir = Paths.activeDir (sessionRoot seshEnv)

-- Read write buffer
let wbPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer
tableWriteBuffer <- readWriteBuffer hfs hbio resolve tableWriteBufferBlobs wbPaths
activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc
let activeWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getActiveDir activeDir) activeWriteBufferNumber
let snapWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer
(tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths

-- Hard link runs into the active directory,
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir activeDir snapLevels

-- Convert from the snapshot format, restoring merge progress in the process
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels'
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve activeDir snapLevels'
releaseRuns reg snapLevels'

tableCache <- mkLevelsCache reg tableLevels
Expand Down
4 changes: 3 additions & 1 deletion src/Database/LSMTree/Internal/BlobFile.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Database.LSMTree.Internal.Serialise (SerialisedBlob (..))
import qualified System.FS.API as FS
import System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS
import System.FS.CallStack (HasCallStack)

-- | A handle to a file containing blobs.
--
Expand Down Expand Up @@ -50,9 +51,10 @@ instance NFData BlobSpan where

-- | Open the given file to make a 'BlobFile'. The finaliser will close and
-- delete the file.
{-# SPECIALISE openBlobFile :: HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-}
{-# SPECIALISE openBlobFile :: HasCallStack => HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-}
openBlobFile ::
PrimMonad m
=> HasCallStack
=> HasFS m h
-> FS.FsPath
-> FS.OpenMode
Expand Down
156 changes: 144 additions & 12 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ module Database.LSMTree.Internal.Snapshot (
, SpentCredits (..)
-- * Conversion to levels snapshot format
, toSnapLevels
-- * Write buffer
, snapshotWriteBuffer
, openWriteBuffer
-- * Runs
, snapshotRuns
, openRuns
Expand All @@ -26,7 +29,7 @@ module Database.LSMTree.Internal.Snapshot (
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM (MonadSTM)
import Control.DeepSeq (NFData (..))
import Control.Monad (when)
import Control.Monad (void, when)
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadThrow (MonadMask)
import Control.Monad.Primitive (PrimMonad)
Expand All @@ -45,15 +48,22 @@ import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.MergingRun (NumRuns (..))
import qualified Database.LSMTree.Internal.MergingRun as MR
import Database.LSMTree.Internal.Paths (ActiveDir (..),
NamedSnapshotDir (..), RunFsPaths (..), pathsForRunFiles,
runChecksumsPath)
NamedSnapshotDir (..), RunFsPaths (..),
WriteBufferFsPaths (..), pathsForRunFiles,
runChecksumsPath, writeBufferBlobPath,
writeBufferChecksumsPath, writeBufferKOpsPath)
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.UniqCounter (UniqCounter,
incrUniqCounter, uniqueToRunNumber)
import Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs)
import qualified Database.LSMTree.Internal.WriteBufferReader as WBR
import qualified Database.LSMTree.Internal.WriteBufferWriter as WBW
import qualified System.FS.API as FS
import System.FS.API (HasFS)
import qualified System.FS.API.Lazy as FSL
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (HasBlockIO)

Expand Down Expand Up @@ -212,6 +222,74 @@ toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m
spentCredits <- readPrimVar spentCreditsVar
pure (SnapOngoingMerge rs (SpentCredits spentCredits) (Merge.mergeLevel m))

{-------------------------------------------------------------------------------
Write Buffer
-------------------------------------------------------------------------------}

{-# SPECIALISE
snapshotWriteBuffer ::
TempRegistry IO
-> HasFS IO h
-> HasBlockIO IO h
-> UniqCounter IO
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs IO h)
-> IO WriteBufferFsPaths
#-}
snapshotWriteBuffer ::
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m)
=> TempRegistry m
-> HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m WriteBufferFsPaths
snapshotWriteBuffer reg hfs hbio uc activeDir snapDir wb wbb = do
-- Write the write buffer and write buffer blobs to the active directory.
activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc
let activeWriteBufferPaths = WriteBufferFsPaths (getActiveDir activeDir) activeWriteBufferNumber
WBW.writeWriteBuffer hfs hbio activeWriteBufferPaths wb wbb
-- Hard link the write buffer and write buffer blobs to the snapshot directory.
snapWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc
let snapWriteBufferPaths = WriteBufferFsPaths (getNamedSnapshotDir snapDir) snapWriteBufferNumber
hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferKOpsPath activeWriteBufferPaths) (writeBufferKOpsPath snapWriteBufferPaths)
hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferBlobPath activeWriteBufferPaths) (writeBufferBlobPath snapWriteBufferPaths)
hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferChecksumsPath activeWriteBufferPaths) (writeBufferChecksumsPath snapWriteBufferPaths)
pure snapWriteBufferPaths

{-# SPECIALISE
openWriteBuffer ::
TempRegistry IO
-> ResolveSerialisedValue
-> HasFS IO h
-> HasBlockIO IO h
-> WriteBufferFsPaths
-> WriteBufferFsPaths
-> IO (WriteBuffer, Ref (WriteBufferBlobs IO h))
#-}
openWriteBuffer ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> TempRegistry m
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> WriteBufferFsPaths
-> WriteBufferFsPaths
-> m (WriteBuffer, Ref (WriteBufferBlobs m h))
openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths = do
-- Hard link the write buffer keyops and checksum files to the snapshot directory.
hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferKOpsPath snapWriteBufferPaths) (writeBufferKOpsPath activeWriteBufferPaths)
hardLinkTemp reg hfs hbio NoHardLinkDurable (writeBufferChecksumsPath snapWriteBufferPaths) (writeBufferChecksumsPath activeWriteBufferPaths)
-- Copy the write buffer blobs file to the snapshot directory.
copyFileTemp reg hfs hbio (writeBufferBlobPath snapWriteBufferPaths) (writeBufferBlobPath activeWriteBufferPaths)
-- Read write buffer
WBR.readWriteBuffer reg resolve hfs hbio activeWriteBufferPaths

{-------------------------------------------------------------------------------
Runs
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -398,12 +476,66 @@ hardLinkRunFiles ::
hardLinkRunFiles reg hfs hbio dur sourceRunFsPaths targetRunFsPaths = do
let sourcePaths = pathsForRunFiles sourceRunFsPaths
targetPaths = pathsForRunFiles targetRunFsPaths
sequenceA_ (hardLinkTemp <$> sourcePaths <*> targetPaths)
hardLinkTemp (runChecksumsPath sourceRunFsPaths) (runChecksumsPath targetRunFsPaths)
where
hardLinkTemp sourcePath targetPath = do
allocateTemp reg
(FS.createHardLink hbio sourcePath targetPath)
(\_ -> FS.removeFile hfs targetPath)
when (dur == HardLinkDurable) $
FS.synchroniseFile hfs hbio targetPath
sequenceA_ (hardLinkTemp reg hfs hbio dur <$> sourcePaths <*> targetPaths)
hardLinkTemp reg hfs hbio dur (runChecksumsPath sourceRunFsPaths) (runChecksumsPath targetRunFsPaths)

{-------------------------------------------------------------------------------
Hard link file
-------------------------------------------------------------------------------}

{-# SPECIALISE
hardLinkTemp ::
TempRegistry IO
-> HasFS IO h
-> HasBlockIO IO h
-> HardLinkDurable
-> FS.FsPath
-> FS.FsPath
-> IO ()
#-}
-- | @'hardLinkTemp' reg hfs hbio dur sourcePath targetPath@ creates a hard link
-- from @sourcePath@ to @targetPath@.
hardLinkTemp ::
(MonadMask m, MonadMVar m)
=> TempRegistry m
-> HasFS m h
-> HasBlockIO m h
-> HardLinkDurable
-> FS.FsPath
-> FS.FsPath
-> m ()
hardLinkTemp reg hfs hbio dur sourcePath targetPath = do
allocateTemp reg
(FS.createHardLink hbio sourcePath targetPath)
(\_ -> FS.removeFile hfs targetPath)
when (dur == HardLinkDurable) $
FS.synchroniseFile hfs hbio targetPath

{-------------------------------------------------------------------------------
Copy file
-------------------------------------------------------------------------------}

{-# SPECIALISE
copyFileTemp ::
TempRegistry IO
-> HasFS IO h
-> HasBlockIO IO h
-> FS.FsPath
-> FS.FsPath
-> IO ()
#-}
-- | @'copyFile' hfs hbio source target@ copies the @source@ path to the @target@ path.
copyFileTemp ::
(MonadMask m, MonadMVar m)
=> TempRegistry m
-> HasFS m h
-> HasBlockIO m h
-> FS.FsPath
-> FS.FsPath
-> m ()
copyFileTemp reg hfs _hbio sourcePath targetPath =
flip (allocateTemp reg) (\_ -> FS.removeFile hfs targetPath) $
FS.withFile hfs sourcePath FS.ReadMode $ \sourceHandle ->
FS.withFile hfs targetPath (FS.WriteMode FS.MustBeNew) $ \targetHandle -> do
bs <- FSL.hGetAll hfs sourceHandle
void $ FSL.hPutAll hfs targetHandle bs
Loading

0 comments on commit dbd0bcd

Please sign in to comment.