Skip to content

Commit

Permalink
Process suggestions on PR
Browse files Browse the repository at this point in the history
  • Loading branch information
wenkokke committed Dec 3, 2024
1 parent 05e241a commit 6a7eee5
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 91 deletions.
2 changes: 2 additions & 0 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,8 @@ createSnapshot resolve snap label tableType t = do
FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir)

-- Write the write buffer.
--
-- TODO: Seeing as the write buffer is pure, we probably don't need to keep the read lock lock while we're writing to disk.
RW.withReadAccess (tableContent thEnv) $ \content -> do
writeBufferRunNumber <- uniqueToRunNumber <$> incrUniqCounter (tableSessionUniqCounter thEnv)
let fsPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) writeBufferRunNumber
Expand Down
12 changes: 12 additions & 0 deletions src/Database/LSMTree/Internal/Entry.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Database.LSMTree.Internal.Entry (
, hasBlob
, onValue
, onBlobRef
, traverseBlobRef
, NumEntries (..)
, unNumEntries
-- * Value resolution/merging
Expand Down Expand Up @@ -47,6 +48,17 @@ onBlobRef def g = \case
Mupdate{} -> def
Delete -> def

traverseBlobRef ::
Applicative t
=> (blobref -> t blobref')
-> Entry v blobref
-> t (Entry v blobref')
traverseBlobRef f = \case
Insert v -> pure (Insert v)
InsertWithBlob v blobref -> InsertWithBlob v <$> f blobref
Mupdate v -> pure (Mupdate v)
Delete -> pure Delete

instance Bifunctor Entry where
first f = \case
Insert v -> Insert (f v)
Expand Down
2 changes: 2 additions & 0 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ module Database.LSMTree.Internal.MergeSchedule (
, supplyMergeCredits
, CreditThreshold (..)
, creditThresholdForLevel
-- * Exported for testing
, addWriteBufferEntries
) where

import Control.Concurrent.Class.MonadMVar.Strict
Expand Down
2 changes: 1 addition & 1 deletion src/Database/LSMTree/Internal/Paths.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module Database.LSMTree.Internal.Paths (
, forRunFilterRaw
, forRunIndexRaw
-- * WriteBuffer paths
, WriteBufferFsPaths (WriteBufferFsPaths, writeBufferDir, writeBufferNumber)
, WriteBufferFsPaths (WrapRunFsPaths, WriteBufferFsPaths, writeBufferDir, writeBufferNumber)
, pathsForWriteBufferFiles
, writeBufferKOpsPath
, writeBufferBlobPath
Expand Down
158 changes: 83 additions & 75 deletions src/Database/LSMTree/Internal/WriteBufferWriter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ import System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (HasBlockIO)


{-# SPECIALISE
writeWriteBuffer ::
HasFS IO h
-> HasBlockIO IO h
-> WriteBufferFsPaths
-> WriteBuffer
-> WriteBufferBlobs IO h
-> IO ()
#-}
-- | Write a 'WriteBuffer' to disk.
writeWriteBuffer ::
(MonadSTM m, MonadST m, MonadThrow m)
=> HasFS m h
Expand All @@ -59,111 +68,112 @@ writeWriteBuffer hfs hbio fsPaths buffer blobs = do
addKeyOp writer key (WBB.mkRawBlobRef blobs <$> op)
void $ unsafeFinalise True writer

-- data SerialisedWriteBuffer m h = SerialisedWriteBuffer
-- { -- | The file system paths for all the files used by the serialised write buffer.
-- serialisedWriteBufferFsPaths :: !WriteBufferFsPaths
-- -- | The (read mode) file handles.
-- , serialisedWriteBufferHandles :: !(ForWriteBufferFiles (FS.Handle h))
-- , serialisedWriteBufferHasFS :: !(HasFS m h)
-- , serialisedWriteBufferHasBlockIO :: !(HasBlockIO m h)
-- }

-- | The in-memory representation of an LSM 'WriteBuffer' that is in the process of being serialised to disk.
data WriteBufferWriter m h = WriteBufferWriter
{ -- | The file system paths for all the files used by the serialised write buffer.
writeBufferWriterFsPaths :: !WriteBufferFsPaths,
writerFsPaths :: !WriteBufferFsPaths,
-- | The page accumulator.
writeBufferWriterPageAcc :: !(PageAcc (PrimState m)),
writerPageAcc :: !(PageAcc (PrimState m)),
-- | The byte offset within the blob file for the next blob to be written.
writeBufferWriterBlobOffset :: !(PrimVar (PrimState m) Word64),
writerBlobOffset :: !(PrimVar (PrimState m) Word64),
-- | The (write mode) file handles.
writeBufferWriterHandles :: !(ForWriteBufferFiles (ChecksumHandle (PrimState m) h)),
writeBufferWriterHasFS :: !(HasFS m h),
writeBufferWriterHasBlockIO :: !(HasBlockIO m h)
writerHandles :: !(ForWriteBufferFiles (ChecksumHandle (PrimState m) h)),
writerHasFS :: !(HasFS m h),
writerHasBlockIO :: !(HasBlockIO m h)
}

{-# SPECIALISE new ::
HasFS IO h
-> HasBlockIO IO h
-> WriteBufferFsPaths
-> IO (WriteBufferWriter IO h) #-}
{-# SPECIALISE
new ::
HasFS IO h
-> HasBlockIO IO h
-> WriteBufferFsPaths
-> IO (WriteBufferWriter IO h)
#-}
-- | Create a 'WriteBufferWriter' to start serialising a 'WriteBuffer'.
--
-- See 'Database.LSMTree.Internal.RunBuilder.new'.
--
-- NOTE: 'new' assumes that the directory passed via 'WriteBufferFsPaths' exists.
new ::
(MonadST m, MonadSTM m)
=> HasFS m h
-> HasBlockIO m h
-> WriteBufferFsPaths
-> m (WriteBufferWriter m h)
new hfs hbio fsPaths = do
writeBufferWriterPageAcc <- ST.stToIO PageAcc.newPageAcc
writeBufferWriterBlobOffset <- newPrimVar 0
writeBufferWriterHandles <-
writerPageAcc <- ST.stToIO PageAcc.newPageAcc
writerBlobOffset <- newPrimVar 0
writerHandles <-
traverse (makeHandle hfs) (pathsForWriteBufferFiles fsPaths)
return WriteBufferWriter
{ writeBufferWriterFsPaths = fsPaths,
writeBufferWriterHasFS = hfs,
writeBufferWriterHasBlockIO = hbio,
{ writerFsPaths = fsPaths,
writerHasFS = hfs,
writerHasBlockIO = hbio,
..
}

-- | See 'RunBuilder.unsafeFinalise'.
-- Finalise an incremental 'WriteBufferWriter'.
{-# SPECIALISE
unsafeFinalise ::
Bool
-> WriteBufferWriter IO h
-> IO (HasFS IO h, HasBlockIO IO h, WriteBufferFsPaths)
#-}
-- | Finalise an incremental 'WriteBufferWriter'.
--
-- Do /not/ use a 'WriteBufferWriter' after finalising it.
--
-- See 'Database.LSMTree.Internal.RunBuilder.unsafeFinalise'.
--
-- TODO: Ensure proper cleanup even in presence of exceptions.
unsafeFinalise ::
(MonadST m, MonadSTM m, MonadThrow m)
=> Bool -- ^ drop caches
-> WriteBufferWriter m h
-> m (HasFS m h, HasBlockIO m h, WriteBufferFsPaths)
unsafeFinalise dropCaches WriteBufferWriter {..} = do
-- write final bits
mPage <- ST.stToIO $ flushPageIfNonEmpty writeBufferWriterPageAcc
for_ mPage $ writeRawPage writeBufferWriterHasFS (forWriteBufferKOps writeBufferWriterHandles)
checksums <- toChecksumsFileForWriteBufferFiles <$> traverse readChecksum writeBufferWriterHandles
FS.withFile writeBufferWriterHasFS (writeBufferChecksumsPath writeBufferWriterFsPaths) (FS.WriteMode FS.MustBeNew) $ \h -> do
CRC.writeChecksumsFile' writeBufferWriterHasFS h checksums
FS.hDropCacheAll writeBufferWriterHasBlockIO h
mPage <- ST.stToIO $ flushPageIfNonEmpty writerPageAcc
for_ mPage $ writeRawPage writerHasFS (forWriteBufferKOps writerHandles)
checksums <- toChecksumsFileForWriteBufferFiles <$> traverse readChecksum writerHandles
FS.withFile writerHasFS (writeBufferChecksumsPath writerFsPaths) (FS.WriteMode FS.MustBeNew) $ \h -> do
CRC.writeChecksumsFile' writerHasFS h checksums
FS.hDropCacheAll writerHasBlockIO h
-- drop the KOps and blobs files from the cache if asked for
when dropCaches $ do
dropCache writeBufferWriterHasBlockIO (forWriteBufferKOpsRaw writeBufferWriterHandles)
dropCache writeBufferWriterHasBlockIO (forWriteBufferBlobRaw writeBufferWriterHandles)
for_ writeBufferWriterHandles $ closeHandle writeBufferWriterHasFS
return (writeBufferWriterHasFS, writeBufferWriterHasBlockIO, writeBufferWriterFsPaths)


-- | See 'RunBuilder.addKeyOp'.
dropCache writerHasBlockIO (forWriteBufferKOpsRaw writerHandles)
dropCache writerHasBlockIO (forWriteBufferBlobRaw writerHandles)
for_ writerHandles $ closeHandle writerHasFS
return (writerHasFS, writerHasBlockIO, writerFsPaths)


{-# SPECIALIZE
addKeyOp ::
WriteBufferWriter IO h
-> SerialisedKey
-> Entry SerialisedValue (RawBlobRef IO h)
-> IO ()
#-}
-- | See 'Database.LSMTree.Internal.RunBuilder.addKeyOp'.
addKeyOp ::
(MonadST m, MonadSTM m, MonadThrow m)
=> WriteBufferWriter m h
-> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m ()
addKeyOp WriteBufferWriter{..} key op = do
op' <- traverse (copyBlob writeBufferWriterHasFS writeBufferWriterBlobOffset (forWriteBufferBlob writeBufferWriterHandles)) op
-- TODO: consider optimisation described in 'Database.LSMTree.Internal.RunBuilder.addKeyOp'.
op' <- traverse (copyBlob writerHasFS writerBlobOffset (forWriteBufferBlob writerHandles)) op
if PageAcc.entryWouldFitInPage key op
then do
mPage <- ST.stToIO $ addSmallKeyOp writeBufferWriterPageAcc key op'
for_ mPage $ writeRawPage writeBufferWriterHasFS (forWriteBufferKOps writeBufferWriterHandles)
mPage <- ST.stToIO $ addSmallKeyOp writerPageAcc key op'
for_ mPage $ writeRawPage writerHasFS (forWriteBufferKOps writerHandles)
else do
(pages, overflowPages) <- ST.stToIO $ addLargeKeyOp writeBufferWriterPageAcc key op'
--TODO: consider optimisation: use writev to write all pages in one go (see RunBuilder)
for_ pages $ writeRawPage writeBufferWriterHasFS (forWriteBufferKOps writeBufferWriterHandles)
writeRawOverflowPages writeBufferWriterHasFS (forWriteBufferKOps writeBufferWriterHandles) overflowPages

-- addLargeSerialisedKeyOp ::
-- (MonadST m, MonadSTM m)
-- => WriteBufferWriter m h
-- -> RawPage
-- -> [RawOverflowPage]
-- -> m ()
-- addLargeSerialisedKeyOp WriteBufferWriter{..} page overflowPages =
-- assert (RawPage.rawPageNumKeys page == 1) $
-- assert (RawPage.rawPageHasBlobSpanAt page 0 == 0) $
-- assert (RawPage.rawPageOverflowPages page > 0) $
-- assert (RawPage.rawPageOverflowPages page == length overflowPages) $ do
-- !pages <- ST.stToIO $ selectPages <$> flushPageIfNonEmpty writeBufferWriterPageAcc <*> pure page
-- for_ pages $ writeRawPage writeBufferWriterHasFS (forWriteBufferKOps writeBufferWriterHandles)
-- writeRawOverflowPages writeBufferWriterHasFS (forWriteBufferKOps writeBufferWriterHandles) overflowPages
(pages, overflowPages) <- ST.stToIO $ addLargeKeyOp writerPageAcc key op'
-- TODO: consider optimisation described in 'Database.LSMTree.Internal.RunBuilder.addKeyOp'.
for_ pages $ writeRawPage writerHasFS (forWriteBufferKOps writerHandles)
writeRawOverflowPages writerHasFS (forWriteBufferKOps writerHandles) overflowPages

-- | See 'RunAcc.addSmallKeyOp'.
-- | See 'Database.LSMTree.Internal.RunAcc.addSmallKeyOp'.
addSmallKeyOp ::
PageAcc s
-> SerialisedKey
Expand All @@ -177,13 +187,18 @@ addSmallKeyOp pageAcc key op =
not <$> PageAcc.pageAccAddElem pageAcc key op
if pageBoundaryNeeded
then do
-- We need a page boundary. If the current page is empty then we have
-- a boundary already, otherwise we need to flush the current page.
mPage <- flushPageIfNonEmpty pageAcc
-- The current page is now empty, either because it was already empty
-- or because we just flushed it. Adding the new key/op to an empty
-- page must now succeed, because we know it fits in a page.
added <- PageAcc.pageAccAddElem pageAcc key op
assert added $ pure mPage
else do
pure Nothing

-- | See 'RunAcc.addLargeKeyOp'.
-- | See 'Database.LSMTree.Internal.RunAcc.addLargeKeyOp'.
addLargeKeyOp
:: PageAcc s
-> SerialisedKey
Expand All @@ -196,12 +211,11 @@ addLargeKeyOp pageAcc key op =
mPagePre <- flushPageIfNonEmpty pageAcc
-- Make the new page and overflow pages. Add the span of pages to the index.
let (page, overflowPages) = PageAcc.singletonPage key op

-- Combine the results with anything we flushed before
let !pages = selectPages mPagePre page
return (pages, overflowPages)

-- | Internal helper. See 'RunAcc.flushPageIfNonEmpty'.
-- | Internal helper. See 'Database.LSMTree.Internal.RunAcc.flushPageIfNonEmpty'.
flushPageIfNonEmpty :: PageAcc s -> ST s (Maybe RawPage)
flushPageIfNonEmpty pageAcc = do
keysCount <- PageAcc.keysCountPageAcc pageAcc
Expand All @@ -213,15 +227,9 @@ flushPageIfNonEmpty pageAcc = do
pure $ Just page
else pure Nothing

-- | Internal helper. See 'RunAcc.selectPagesAndChunks'.
-- | Internal helper. See 'Database.LSMTree.Internal.RunAcc.selectPagesAndChunks'.
selectPages :: Maybe RawPage
-> RawPage
-> [RawPage]
selectPages mPagePre page =
maybeToList mPagePre ++ [page]

-- -- | See 'RunBuilder.close'.
-- close :: MonadSTM m => WriteBufferWriter m h -> m ()
-- close WriteBufferWriter {..} = do
-- for_ writeBufferWriterHandles $ closeHandle writeBufferWriterHasFS
-- for_ (pathsForWriteBufferFiles writeBufferWriterFsPaths) $ FS.removeFile writeBufferWriterHasFS
Loading

0 comments on commit 6a7eee5

Please sign in to comment.