Skip to content

Commit

Permalink
Move WAL commit to after WAL_WRITE_LOCK release (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson authored Nov 29, 2022
1 parent 8e6a005 commit a66fc96
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 91 deletions.
225 changes: 158 additions & 67 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ type DB struct {

dirtyPageSet map[uint32]struct{}

walOffset int64 // offset of the start of the transaction
walFrameOffsets map[uint32]int64 // WAL frame offset of the last version of a given pgno before current tx
wal struct {
offset int64 // offset of the start of the transaction
byteOrder binary.ByteOrder // determine by WAL header magic
salt1, salt2 uint32 // current WAL header salt values
chksum1, chksum2 uint32 // WAL checksum values at wal.offset
frameOffsets map[uint32]int64 // WAL frame offset of the last version of a given pgno before current tx
}

// SQLite database locks
pendingLock RWMutex
Expand All @@ -69,16 +74,17 @@ type DB struct {

// NewDB returns a new instance of DB.
func NewDB(store *Store, name string, path string) *DB {
return &DB{
db := &DB{
store: store,
name: name,
path: path,

dirtyPageSet: make(map[uint32]struct{}),
walFrameOffsets: make(map[uint32]int64),
dirtyPageSet: make(map[uint32]struct{}),

Now: time.Now,
}
db.wal.frameOffsets = make(map[uint32]int64)
return db
}

// Name of the database name.
Expand Down Expand Up @@ -578,75 +584,108 @@ func (db *DB) WriteWAL(f *os.File, data []byte, offset int64) error {

// Reset WAL if header is overwritten.
if offset == 0 {
db.walOffset = WALHeaderSize
db.walFrameOffsets = make(map[uint32]int64)
// Require the initial header to be written fully.
if len(data) < WALHeaderSize {
return fmt.Errorf("short wal header write of %d bytes", len(data))
}

// Determine byte order of checksums.
switch magic := binary.BigEndian.Uint32(data[0:]); magic {
case 0x377f0682:
db.wal.byteOrder = binary.LittleEndian
case 0x377f0683:
db.wal.byteOrder = binary.BigEndian
default:
return fmt.Errorf("invalid wal header magic: %x", magic)
}

// Set remaining WAL fields.
db.wal.offset = WALHeaderSize
db.wal.salt1 = binary.BigEndian.Uint32(data[16:])
db.wal.salt2 = binary.BigEndian.Uint32(data[20:])
db.wal.chksum1 = binary.BigEndian.Uint32(data[24:])
db.wal.chksum2 = binary.BigEndian.Uint32(data[28:])
db.wal.frameOffsets = make(map[uint32]int64)
}

// Passthrough write to underlying WAL file.
if _, err := f.WriteAt(data, offset); err != nil {
return err
}

// If this write does not finish at the end of a frame, then exit.
walFrameSize := int64(WALFrameHeaderSize + db.pageSize)
endOffset := offset + int64(len(data))
if endOffset == WALHeaderSize || (endOffset-WALHeaderSize)%walFrameSize != 0 {
return nil
}

// Check if the frame is the commit record.
fhdr := make([]byte, WALFrameHeaderSize)
if _, err := f.Seek(endOffset-walFrameSize, io.SeekStart); err != nil {
return fmt.Errorf("seek wal frame start: %w", err)
} else if _, err := io.ReadFull(f, fhdr); err != nil {
return fmt.Errorf("seek wal frame header: %w", err)
}
commit := binary.BigEndian.Uint32(fhdr[4:8])
if commit == 0 {
return nil // not a commit frame, exit
}

if err := db.commitWAL(f, commit); err != nil {
return fmt.Errorf("commit wal: %w", err)
}
return nil
}

func (db *DB) buildTxFrameOffsets(walFile *os.File) (map[uint32]int64, error) {
func (db *DB) buildTxFrameOffsets(walFile *os.File) (_ map[uint32]int64, commit, chksum1, chksum2 uint32, endOffset int64, err error) {
m := make(map[uint32]int64)

if _, err := walFile.Seek(db.walOffset, io.SeekStart); err != nil {
return nil, fmt.Errorf("seek wal tx start: %w", err)
}

walFrameSize := WALFrameHeaderSize + int64(db.pageSize)
frame := make([]byte, walFrameSize)
offset := db.wal.offset
chksum1, chksum2 = db.wal.chksum1, db.wal.chksum2
frame := make([]byte, WALFrameHeaderSize+int64(db.pageSize))
for i := 0; ; i++ {
if _, err := io.ReadFull(walFile, frame); err != nil {
return nil, fmt.Errorf("read next frame: %w", err)
// Read frame data & exit if we hit the end of file.
if _, err := internal.ReadFullAt(walFile, frame, offset); err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, 0, 0, 0, 0, errNoTransaction
} else if err != nil {
return nil, 0, 0, 0, 0, fmt.Errorf("read wal frame: %w", err)
}
pgno := binary.BigEndian.Uint32(frame[0:4])
m[pgno] = db.walOffset + (int64(i) * walFrameSize)

if commit := binary.BigEndian.Uint32(frame[4:8]); commit != 0 {
return m, nil // end of tx
// If salt doesn't match then we've run into a previous WAL's data.
salt1 := binary.BigEndian.Uint32(frame[8:])
salt2 := binary.BigEndian.Uint32(frame[12:])
if db.wal.salt1 != salt1 || db.wal.salt2 != salt2 {
return nil, 0, 0, 0, 0, errNoTransaction
}

// Verify checksum
fchksum1 := binary.BigEndian.Uint32(frame[16:])
fchksum2 := binary.BigEndian.Uint32(frame[20:])
chksum1, chksum2 = WALChecksum(db.wal.byteOrder, chksum1, chksum2, frame[:8]) // frame header
chksum1, chksum2 = WALChecksum(db.wal.byteOrder, chksum1, chksum2, frame[24:]) // frame data
if chksum1 != fchksum1 || chksum2 != fchksum2 {
return nil, 0, 0, 0, 0, errNoTransaction
}

// Save the offset for the last version of the page to a map.
pgno := binary.BigEndian.Uint32(frame[0:])
m[pgno] = offset

// End of transaction, exit loop and return.
if commit = binary.BigEndian.Uint32(frame[4:]); commit != 0 {
return m, commit, chksum1, chksum2, offset + int64(len(frame)), nil
}

// Move to the next frame.
offset += int64(len(frame))
}
}

// commitWAL is called on the last write to the WAL page in a transaction.
// errNoTransaction is a marker error to indicate that a transaction could not
// be found at the current WAL offset.
var errNoTransaction = errors.New("no transaction")

// commitWAL is called when the client releases the WAL_WRITE_LOCK(120).
// The transaction data is copied from the WAL into an LTX file and committed.
func (db *DB) commitWAL(walFile *os.File, commit uint32) error {
func (db *DB) CommitWAL() error {
walFrameSize := int64(WALFrameHeaderSize + db.pageSize)

walFile, err := os.Open(db.WALPath())
if err != nil {
return fmt.Errorf("open wal file: %w", err)
}
defer func() { _ = walFile.Close() }()

// Sync WAL to disk as this avoids data loss issues with SYNCHRONOUS=normal
if err := walFile.Sync(); err != nil {
return fmt.Errorf("sync wal: %w", err)
}

// Build offset map for the last version of each page in the WAL transaction.
txFrameOffsets, err := db.buildTxFrameOffsets(walFile)
if err != nil {
// If txFrameOffsets has no entries then a transaction could not be found after db.wal.offset.
txFrameOffsets, commit, chksum1, chksum2, endOffset, err := db.buildTxFrameOffsets(walFile)
if err == errNoTransaction {
return nil
} else if err != nil {
return fmt.Errorf("build tx frame offsets: %w", err)
}

Expand Down Expand Up @@ -690,6 +729,10 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error {
MinTXID: txID,
MaxTXID: txID,
PreApplyChecksum: preApplyChecksum,
WALSalt1: db.wal.salt1,
WALSalt2: db.wal.salt2,
WALOffset: db.wal.offset,
WALSize: endOffset - db.wal.offset,
}); err != nil {
return fmt.Errorf("cannot encode ltx header: %s", err)
}
Expand All @@ -702,13 +745,10 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error {
sort.Slice(pgnos, func(i, j int) bool { return pgnos[i] < pgnos[j] })

frame := make([]byte, walFrameSize)
var maxOffset int64
for _, pgno := range pgnos {
// Read next frame from the WAL file.
offset := txFrameOffsets[pgno]
if _, err := walFile.Seek(offset, io.SeekStart); err != nil {
return fmt.Errorf("seek: %w", err)
} else if _, err := io.ReadFull(walFile, frame); err != nil {
if _, err := internal.ReadFullAt(walFile, frame, offset); err != nil {
return fmt.Errorf("read next frame: %w", err)
}
pgno := binary.BigEndian.Uint32(frame[0:4])
Expand All @@ -718,11 +758,6 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error {
return fmt.Errorf("cannot encode ltx page: pgno=%d err=%w", pgno, err)
}

// Track highest offset so we can know where the end of the transaction is in the WAL.
if offset > maxOffset {
maxOffset = offset
}

// Update rolling checksum.
postApplyChecksum ^= ltx.ChecksumPage(pgno, frame[WALFrameHeaderSize:])
}
Expand Down Expand Up @@ -767,11 +802,14 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error {

// Copy page offsets on commit.
for pgno, off := range txFrameOffsets {
db.walFrameOffsets[pgno] = off
db.wal.frameOffsets[pgno] = off
}

// Move the WAL position forward and reset the segment size.
db.pageN = commit
db.walOffset = maxOffset + walFrameSize
db.wal.offset = endOffset
db.wal.chksum1 = chksum1
db.wal.chksum2 = chksum2

// Update transaction for database.
if err := db.setPos(Pos{
Expand All @@ -794,6 +832,11 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error {
if chksum, err := db.checksum(dbFile, walFile); err != nil {
return fmt.Errorf("checksum (wal): %w", err)
} else if chksum != postApplyChecksum {

// TODO: Remove!!
fmt.Printf("verification failed (wal): %016x <> %016x\n", chksum, postApplyChecksum)
os.Exit(1)

return fmt.Errorf("verification failed (wal): %016x <> %016x", chksum, postApplyChecksum)
}
}
Expand All @@ -804,19 +847,21 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error {
// readPage reads the latest version of the page before the current transaction.
func (db *DB) readPage(dbFile, walFile *os.File, pgno uint32, buf []byte) error {
// Read from previous position in WAL, if available.
if off, ok := db.walFrameOffsets[pgno]; ok {
if _, err := walFile.Seek(off+WALFrameHeaderSize, io.SeekStart); err != nil {
return fmt.Errorf("seek wal page: %w", err)
} else if _, err := io.ReadFull(walFile, buf); err != nil {
if off, ok := db.wal.frameOffsets[pgno]; ok {
hdr := make([]byte, WALFrameHeaderSize)
if _, err := internal.ReadFullAt(walFile, hdr, off); err != nil {
return fmt.Errorf("read wal page hdr: %w", err)
}

if _, err := internal.ReadFullAt(walFile, buf, off+WALFrameHeaderSize); err != nil {
return fmt.Errorf("read wal page: %w", err)
}
return nil
}

// Otherwise read from the database file.
if _, err := dbFile.Seek(int64(pgno-1)*int64(db.pageSize), io.SeekStart); err != nil {
return fmt.Errorf("seek database page: %w", err)
} else if _, err := io.ReadFull(dbFile, buf); err != nil {
offset := int64(pgno-1) * int64(db.pageSize)
if _, err := internal.ReadFullAt(dbFile, buf, offset); err != nil {
return fmt.Errorf("read database page: %w", err)
}
return nil
Expand Down Expand Up @@ -1066,7 +1111,7 @@ func (db *DB) checksum(dbFile, walFile *os.File) (chksum uint64, err error) {
data := make([]byte, db.pageSize)
for pgno := uint32(1); pgno <= db.pageN; pgno++ {
// Read from either the database file or the WAL depending if the page exists in the WAL.
if offset, ok := db.walFrameOffsets[pgno]; !ok {
if offset, ok := db.wal.frameOffsets[pgno]; !ok {
if _, err := dbFile.Seek(int64(pgno-1)*int64(db.pageSize), io.SeekStart); err != nil {
return 0, fmt.Errorf("db seek (pgno=%d): %w", pgno, err)
} else if _, err := io.ReadFull(dbFile, data); err != nil {
Expand Down Expand Up @@ -1391,8 +1436,8 @@ func (db *DB) WriteSnapshotTo(ctx context.Context, dst io.Writer) (header ltx.He
db.mu.Lock()
pos := db.pos
pageSize, pageN := db.pageSize, db.pageN
walFrameOffsets := make(map[uint32]int64, len(db.walFrameOffsets))
for k, v := range db.walFrameOffsets {
walFrameOffsets := make(map[uint32]int64, len(db.wal.frameOffsets))
for k, v := range db.wal.frameOffsets {
walFrameOffsets[k] = v
}
db.mu.Unlock()
Expand Down Expand Up @@ -1762,6 +1807,16 @@ const (
LockTypeDMS = LockType(128)
)

// ContainsLockType returns true if a contains typ.
func ContainsLockType(a []LockType, typ LockType) bool {
for _, v := range a {
if v == typ {
return true
}
}
return false
}

// ParseDatabaseLockRange returns a list of SQLite database locks that are within a range.
func ParseDatabaseLockRange(start, end uint64) []LockType {
a := make([]LockType, 0, 3)
Expand Down Expand Up @@ -1831,6 +1886,42 @@ type GuardSet struct {
dms RWMutexGuard
}

// Pending returns a reference to the PENDING mutex guard.
func (s *GuardSet) Pending() *RWMutexGuard { return &s.pending }

// Shared returns a reference to the SHARED mutex guard.
func (s *GuardSet) Shared() *RWMutexGuard { return &s.shared }

// Reserved returns a reference to the RESERVED mutex guard.
func (s *GuardSet) Reserved() *RWMutexGuard { return &s.reserved }

// Write returns a reference to the WRITE mutex guard.
func (s *GuardSet) Write() *RWMutexGuard { return &s.write }

// Ckpt returns a reference to the CKPT mutex guard.
func (s *GuardSet) Ckpt() *RWMutexGuard { return &s.ckpt }

// Recover returns a reference to the RECOVER mutex guard.
func (s *GuardSet) Recover() *RWMutexGuard { return &s.recover }

// Read0 returns a reference to the READ0 mutex guard.
func (s *GuardSet) Read0() *RWMutexGuard { return &s.read0 }

// Read1 returns a reference to the READ1 mutex guard.
func (s *GuardSet) Read1() *RWMutexGuard { return &s.read1 }

// Read2 returns a reference to the READ2 mutex guard.
func (s *GuardSet) Read2() *RWMutexGuard { return &s.read2 }

// Read3 returns a reference to the READ3 mutex guard.
func (s *GuardSet) Read3() *RWMutexGuard { return &s.read3 }

// Read4 returns a reference to the READ4 mutex guard.
func (s *GuardSet) Read4() *RWMutexGuard { return &s.read4 }

// DMS returns a reference to the DMS mutex guard.
func (s *GuardSet) DMS() *RWMutexGuard { return &s.dms }

// Guard returns a guard by lock type. Panic on invalid lock type.
func (s *GuardSet) Guard(lockType LockType) *RWMutexGuard {
switch lockType {
Expand Down
Loading

0 comments on commit a66fc96

Please sign in to comment.