diff --git a/db.go b/db.go index 6a5cad9..5a9db26 100644 --- a/db.go +++ b/db.go @@ -44,8 +44,13 @@ type DB struct { dirtyPageSet map[uint32]struct{} - walOffset int64 // offset of the start of the transaction - walFrameOffsets map[uint32]int64 // WAL frame offset of the last version of a given pgno before current tx + wal struct { + offset int64 // offset of the start of the transaction + byteOrder binary.ByteOrder // determine by WAL header magic + salt1, salt2 uint32 // current WAL header salt values + chksum1, chksum2 uint32 // WAL checksum values at wal.offset + frameOffsets map[uint32]int64 // WAL frame offset of the last version of a given pgno before current tx + } // SQLite database locks pendingLock RWMutex @@ -69,16 +74,17 @@ type DB struct { // NewDB returns a new instance of DB. func NewDB(store *Store, name string, path string) *DB { - return &DB{ + db := &DB{ store: store, name: name, path: path, - dirtyPageSet: make(map[uint32]struct{}), - walFrameOffsets: make(map[uint32]int64), + dirtyPageSet: make(map[uint32]struct{}), Now: time.Now, } + db.wal.frameOffsets = make(map[uint32]int64) + return db } // Name of the database name. @@ -578,8 +584,28 @@ func (db *DB) WriteWAL(f *os.File, data []byte, offset int64) error { // Reset WAL if header is overwritten. if offset == 0 { - db.walOffset = WALHeaderSize - db.walFrameOffsets = make(map[uint32]int64) + // Require the initial header to be written fully. + if len(data) < WALHeaderSize { + return fmt.Errorf("short wal header write of %d bytes", len(data)) + } + + // Determine byte order of checksums. + switch magic := binary.BigEndian.Uint32(data[0:]); magic { + case 0x377f0682: + db.wal.byteOrder = binary.LittleEndian + case 0x377f0683: + db.wal.byteOrder = binary.BigEndian + default: + return fmt.Errorf("invalid wal header magic: %x", magic) + } + + // Set remaining WAL fields. + db.wal.offset = WALHeaderSize + db.wal.salt1 = binary.BigEndian.Uint32(data[16:]) + db.wal.salt2 = binary.BigEndian.Uint32(data[20:]) + db.wal.chksum1 = binary.BigEndian.Uint32(data[24:]) + db.wal.chksum2 = binary.BigEndian.Uint32(data[28:]) + db.wal.frameOffsets = make(map[uint32]int64) } // Passthrough write to underlying WAL file. @@ -587,66 +613,79 @@ func (db *DB) WriteWAL(f *os.File, data []byte, offset int64) error { return err } - // If this write does not finish at the end of a frame, then exit. - walFrameSize := int64(WALFrameHeaderSize + db.pageSize) - endOffset := offset + int64(len(data)) - if endOffset == WALHeaderSize || (endOffset-WALHeaderSize)%walFrameSize != 0 { - return nil - } - - // Check if the frame is the commit record. - fhdr := make([]byte, WALFrameHeaderSize) - if _, err := f.Seek(endOffset-walFrameSize, io.SeekStart); err != nil { - return fmt.Errorf("seek wal frame start: %w", err) - } else if _, err := io.ReadFull(f, fhdr); err != nil { - return fmt.Errorf("seek wal frame header: %w", err) - } - commit := binary.BigEndian.Uint32(fhdr[4:8]) - if commit == 0 { - return nil // not a commit frame, exit - } - - if err := db.commitWAL(f, commit); err != nil { - return fmt.Errorf("commit wal: %w", err) - } return nil } -func (db *DB) buildTxFrameOffsets(walFile *os.File) (map[uint32]int64, error) { +func (db *DB) buildTxFrameOffsets(walFile *os.File) (_ map[uint32]int64, commit, chksum1, chksum2 uint32, endOffset int64, err error) { m := make(map[uint32]int64) - if _, err := walFile.Seek(db.walOffset, io.SeekStart); err != nil { - return nil, fmt.Errorf("seek wal tx start: %w", err) - } - - walFrameSize := WALFrameHeaderSize + int64(db.pageSize) - frame := make([]byte, walFrameSize) + offset := db.wal.offset + chksum1, chksum2 = db.wal.chksum1, db.wal.chksum2 + frame := make([]byte, WALFrameHeaderSize+int64(db.pageSize)) for i := 0; ; i++ { - if _, err := io.ReadFull(walFile, frame); err != nil { - return nil, fmt.Errorf("read next frame: %w", err) + // Read frame data & exit if we hit the end of file. + if _, err := internal.ReadFullAt(walFile, frame, offset); err == io.EOF || err == io.ErrUnexpectedEOF { + return nil, 0, 0, 0, 0, errNoTransaction + } else if err != nil { + return nil, 0, 0, 0, 0, fmt.Errorf("read wal frame: %w", err) } - pgno := binary.BigEndian.Uint32(frame[0:4]) - m[pgno] = db.walOffset + (int64(i) * walFrameSize) - if commit := binary.BigEndian.Uint32(frame[4:8]); commit != 0 { - return m, nil // end of tx + // If salt doesn't match then we've run into a previous WAL's data. + salt1 := binary.BigEndian.Uint32(frame[8:]) + salt2 := binary.BigEndian.Uint32(frame[12:]) + if db.wal.salt1 != salt1 || db.wal.salt2 != salt2 { + return nil, 0, 0, 0, 0, errNoTransaction + } + + // Verify checksum + fchksum1 := binary.BigEndian.Uint32(frame[16:]) + fchksum2 := binary.BigEndian.Uint32(frame[20:]) + chksum1, chksum2 = WALChecksum(db.wal.byteOrder, chksum1, chksum2, frame[:8]) // frame header + chksum1, chksum2 = WALChecksum(db.wal.byteOrder, chksum1, chksum2, frame[24:]) // frame data + if chksum1 != fchksum1 || chksum2 != fchksum2 { + return nil, 0, 0, 0, 0, errNoTransaction + } + + // Save the offset for the last version of the page to a map. + pgno := binary.BigEndian.Uint32(frame[0:]) + m[pgno] = offset + + // End of transaction, exit loop and return. + if commit = binary.BigEndian.Uint32(frame[4:]); commit != 0 { + return m, commit, chksum1, chksum2, offset + int64(len(frame)), nil } + + // Move to the next frame. + offset += int64(len(frame)) } } -// commitWAL is called on the last write to the WAL page in a transaction. +// errNoTransaction is a marker error to indicate that a transaction could not +// be found at the current WAL offset. +var errNoTransaction = errors.New("no transaction") + +// commitWAL is called when the client releases the WAL_WRITE_LOCK(120). // The transaction data is copied from the WAL into an LTX file and committed. -func (db *DB) commitWAL(walFile *os.File, commit uint32) error { +func (db *DB) CommitWAL() error { walFrameSize := int64(WALFrameHeaderSize + db.pageSize) + walFile, err := os.Open(db.WALPath()) + if err != nil { + return fmt.Errorf("open wal file: %w", err) + } + defer func() { _ = walFile.Close() }() + // Sync WAL to disk as this avoids data loss issues with SYNCHRONOUS=normal if err := walFile.Sync(); err != nil { return fmt.Errorf("sync wal: %w", err) } // Build offset map for the last version of each page in the WAL transaction. - txFrameOffsets, err := db.buildTxFrameOffsets(walFile) - if err != nil { + // If txFrameOffsets has no entries then a transaction could not be found after db.wal.offset. + txFrameOffsets, commit, chksum1, chksum2, endOffset, err := db.buildTxFrameOffsets(walFile) + if err == errNoTransaction { + return nil + } else if err != nil { return fmt.Errorf("build tx frame offsets: %w", err) } @@ -690,6 +729,10 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error { MinTXID: txID, MaxTXID: txID, PreApplyChecksum: preApplyChecksum, + WALSalt1: db.wal.salt1, + WALSalt2: db.wal.salt2, + WALOffset: db.wal.offset, + WALSize: endOffset - db.wal.offset, }); err != nil { return fmt.Errorf("cannot encode ltx header: %s", err) } @@ -702,13 +745,10 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error { sort.Slice(pgnos, func(i, j int) bool { return pgnos[i] < pgnos[j] }) frame := make([]byte, walFrameSize) - var maxOffset int64 for _, pgno := range pgnos { // Read next frame from the WAL file. offset := txFrameOffsets[pgno] - if _, err := walFile.Seek(offset, io.SeekStart); err != nil { - return fmt.Errorf("seek: %w", err) - } else if _, err := io.ReadFull(walFile, frame); err != nil { + if _, err := internal.ReadFullAt(walFile, frame, offset); err != nil { return fmt.Errorf("read next frame: %w", err) } pgno := binary.BigEndian.Uint32(frame[0:4]) @@ -718,11 +758,6 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error { return fmt.Errorf("cannot encode ltx page: pgno=%d err=%w", pgno, err) } - // Track highest offset so we can know where the end of the transaction is in the WAL. - if offset > maxOffset { - maxOffset = offset - } - // Update rolling checksum. postApplyChecksum ^= ltx.ChecksumPage(pgno, frame[WALFrameHeaderSize:]) } @@ -767,11 +802,14 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error { // Copy page offsets on commit. for pgno, off := range txFrameOffsets { - db.walFrameOffsets[pgno] = off + db.wal.frameOffsets[pgno] = off } + // Move the WAL position forward and reset the segment size. db.pageN = commit - db.walOffset = maxOffset + walFrameSize + db.wal.offset = endOffset + db.wal.chksum1 = chksum1 + db.wal.chksum2 = chksum2 // Update transaction for database. if err := db.setPos(Pos{ @@ -794,6 +832,11 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error { if chksum, err := db.checksum(dbFile, walFile); err != nil { return fmt.Errorf("checksum (wal): %w", err) } else if chksum != postApplyChecksum { + + // TODO: Remove!! + fmt.Printf("verification failed (wal): %016x <> %016x\n", chksum, postApplyChecksum) + os.Exit(1) + return fmt.Errorf("verification failed (wal): %016x <> %016x", chksum, postApplyChecksum) } } @@ -804,19 +847,21 @@ func (db *DB) commitWAL(walFile *os.File, commit uint32) error { // readPage reads the latest version of the page before the current transaction. func (db *DB) readPage(dbFile, walFile *os.File, pgno uint32, buf []byte) error { // Read from previous position in WAL, if available. - if off, ok := db.walFrameOffsets[pgno]; ok { - if _, err := walFile.Seek(off+WALFrameHeaderSize, io.SeekStart); err != nil { - return fmt.Errorf("seek wal page: %w", err) - } else if _, err := io.ReadFull(walFile, buf); err != nil { + if off, ok := db.wal.frameOffsets[pgno]; ok { + hdr := make([]byte, WALFrameHeaderSize) + if _, err := internal.ReadFullAt(walFile, hdr, off); err != nil { + return fmt.Errorf("read wal page hdr: %w", err) + } + + if _, err := internal.ReadFullAt(walFile, buf, off+WALFrameHeaderSize); err != nil { return fmt.Errorf("read wal page: %w", err) } return nil } // Otherwise read from the database file. - if _, err := dbFile.Seek(int64(pgno-1)*int64(db.pageSize), io.SeekStart); err != nil { - return fmt.Errorf("seek database page: %w", err) - } else if _, err := io.ReadFull(dbFile, buf); err != nil { + offset := int64(pgno-1) * int64(db.pageSize) + if _, err := internal.ReadFullAt(dbFile, buf, offset); err != nil { return fmt.Errorf("read database page: %w", err) } return nil @@ -1066,7 +1111,7 @@ func (db *DB) checksum(dbFile, walFile *os.File) (chksum uint64, err error) { data := make([]byte, db.pageSize) for pgno := uint32(1); pgno <= db.pageN; pgno++ { // Read from either the database file or the WAL depending if the page exists in the WAL. - if offset, ok := db.walFrameOffsets[pgno]; !ok { + if offset, ok := db.wal.frameOffsets[pgno]; !ok { if _, err := dbFile.Seek(int64(pgno-1)*int64(db.pageSize), io.SeekStart); err != nil { return 0, fmt.Errorf("db seek (pgno=%d): %w", pgno, err) } else if _, err := io.ReadFull(dbFile, data); err != nil { @@ -1391,8 +1436,8 @@ func (db *DB) WriteSnapshotTo(ctx context.Context, dst io.Writer) (header ltx.He db.mu.Lock() pos := db.pos pageSize, pageN := db.pageSize, db.pageN - walFrameOffsets := make(map[uint32]int64, len(db.walFrameOffsets)) - for k, v := range db.walFrameOffsets { + walFrameOffsets := make(map[uint32]int64, len(db.wal.frameOffsets)) + for k, v := range db.wal.frameOffsets { walFrameOffsets[k] = v } db.mu.Unlock() @@ -1762,6 +1807,16 @@ const ( LockTypeDMS = LockType(128) ) +// ContainsLockType returns true if a contains typ. +func ContainsLockType(a []LockType, typ LockType) bool { + for _, v := range a { + if v == typ { + return true + } + } + return false +} + // ParseDatabaseLockRange returns a list of SQLite database locks that are within a range. func ParseDatabaseLockRange(start, end uint64) []LockType { a := make([]LockType, 0, 3) @@ -1831,6 +1886,42 @@ type GuardSet struct { dms RWMutexGuard } +// Pending returns a reference to the PENDING mutex guard. +func (s *GuardSet) Pending() *RWMutexGuard { return &s.pending } + +// Shared returns a reference to the SHARED mutex guard. +func (s *GuardSet) Shared() *RWMutexGuard { return &s.shared } + +// Reserved returns a reference to the RESERVED mutex guard. +func (s *GuardSet) Reserved() *RWMutexGuard { return &s.reserved } + +// Write returns a reference to the WRITE mutex guard. +func (s *GuardSet) Write() *RWMutexGuard { return &s.write } + +// Ckpt returns a reference to the CKPT mutex guard. +func (s *GuardSet) Ckpt() *RWMutexGuard { return &s.ckpt } + +// Recover returns a reference to the RECOVER mutex guard. +func (s *GuardSet) Recover() *RWMutexGuard { return &s.recover } + +// Read0 returns a reference to the READ0 mutex guard. +func (s *GuardSet) Read0() *RWMutexGuard { return &s.read0 } + +// Read1 returns a reference to the READ1 mutex guard. +func (s *GuardSet) Read1() *RWMutexGuard { return &s.read1 } + +// Read2 returns a reference to the READ2 mutex guard. +func (s *GuardSet) Read2() *RWMutexGuard { return &s.read2 } + +// Read3 returns a reference to the READ3 mutex guard. +func (s *GuardSet) Read3() *RWMutexGuard { return &s.read3 } + +// Read4 returns a reference to the READ4 mutex guard. +func (s *GuardSet) Read4() *RWMutexGuard { return &s.read4 } + +// DMS returns a reference to the DMS mutex guard. +func (s *GuardSet) DMS() *RWMutexGuard { return &s.dms } + // Guard returns a guard by lock type. Panic on invalid lock type. func (s *GuardSet) Guard(lockType LockType) *RWMutexGuard { switch lockType { diff --git a/fuse/shm_node.go b/fuse/shm_node.go index 7005ac2..943b316 100644 --- a/fuse/shm_node.go +++ b/fuse/shm_node.go @@ -149,6 +149,8 @@ func (h *SHMHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fus } func (h *SHMHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { + // TODO(wal): If WAL_WRITE_LOCK(120) has an exclusive lock then call db.CommitWAL() + if gs := h.node.fsys.GuardSet(h.node.db, req.LockOwner); gs != nil { gs.UnlockSHM() } @@ -189,13 +191,25 @@ func (h *SHMHandle) LockWait(ctx context.Context, req *fuse.LockWaitRequest) err return fuse.Errno(syscall.ENOSYS) } -func (h *SHMHandle) Unlock(ctx context.Context, req *fuse.UnlockRequest) error { - for _, lockType := range litefs.ParseWALLockRange(req.Lock.Start, req.Lock.End) { - if gs := h.node.fsys.GuardSet(h.node.db, req.LockOwner); gs != nil { - gs.Guard(lockType).Unlock() +func (h *SHMHandle) Unlock(ctx context.Context, req *fuse.UnlockRequest) (err error) { + gs := h.node.fsys.GuardSet(h.node.db, req.LockOwner) + if gs == nil { + return nil + } + + // Process WAL if we have an exclusive lock on WAL_WRITE_LOCK. + lockTypes := litefs.ParseWALLockRange(req.Lock.Start, req.Lock.End) + if litefs.ContainsLockType(lockTypes, litefs.LockTypeWrite) && gs.Write().State() == litefs.RWMutexStateExclusive { + if err = h.node.db.CommitWAL(); err != nil { + log.Printf("commit wal error: %s", err) } } - return nil + + // Unlock specified locks. + for _, lockType := range lockTypes { + gs.Guard(lockType).Unlock() + } + return err } func (h *SHMHandle) QueryLock(ctx context.Context, req *fuse.QueryLockRequest, resp *fuse.QueryLockResponse) error { diff --git a/go.mod b/go.mod index 3b66b23..d80bdea 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/mattn/go-shellwords v1.0.12 github.com/mattn/go-sqlite3 v1.14.16-0.20220918133448-90900be5db1a github.com/prometheus/client_golang v1.13.0 - github.com/superfly/ltx v0.2.7 + github.com/superfly/ltx v0.2.8 golang.org/x/net v0.0.0-20220909164309-bea034e7d591 golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index c636fbb..6600692 100644 --- a/go.sum +++ b/go.sum @@ -307,8 +307,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/superfly/ltx v0.2.7 h1:nzfhhA3S9iX+zYLO5QkZnFZX6ryxqHpB0NChbJxEGks= -github.com/superfly/ltx v0.2.7/go.mod h1:aW5e3H7elNGtaW4Cax78hQV6ITIFiYEOeslDPdVwDi0= +github.com/superfly/ltx v0.2.8 h1:uUnQYAKz65hCFZ7qqI5Lin8J2BfBq36/x4k/GZlh09Q= +github.com/superfly/ltx v0.2.8/go.mod h1:aW5e3H7elNGtaW4Cax78hQV6ITIFiYEOeslDPdVwDi0= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c h1:u6SKchux2yDvFQnDHS3lPnIRmfVJ5Sxy3ao2SIdysLQ= github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM= diff --git a/litefs.go b/litefs.go index 64b4bc3..9c9f9c9 100644 --- a/litefs.go +++ b/litefs.go @@ -232,8 +232,8 @@ type WALReader struct { pageSize uint32 seq uint32 - salt0, salt1 uint32 - chksum0, chksum1 uint32 + salt1, salt2 uint32 + chksum1, chksum2 uint32 } // NewWALReader returns a new instance of WALReader. @@ -275,9 +275,9 @@ func (r *WALReader) ReadHeader() error { // If the header checksum doesn't match then we may have failed with // a partial WAL header write during checkpointing. - chksum0 := binary.BigEndian.Uint32(hdr[24:]) - chksum1 := binary.BigEndian.Uint32(hdr[28:]) - if v0, v1 := WALChecksum(r.bo, 0, 0, hdr[:24]); v0 != chksum0 || v1 != chksum1 { + chksum1 := binary.BigEndian.Uint32(hdr[24:]) + chksum2 := binary.BigEndian.Uint32(hdr[28:]) + if v0, v1 := WALChecksum(r.bo, 0, 0, hdr[:24]); v0 != chksum1 || v1 != chksum2 { return io.EOF } @@ -288,9 +288,9 @@ func (r *WALReader) ReadHeader() error { r.pageSize = binary.BigEndian.Uint32(hdr[8:]) r.seq = binary.BigEndian.Uint32(hdr[12:]) - r.salt0 = binary.BigEndian.Uint32(hdr[16:]) - r.salt1 = binary.BigEndian.Uint32(hdr[20:]) - r.chksum0, r.chksum1 = chksum0, chksum1 + r.salt1 = binary.BigEndian.Uint32(hdr[16:]) + r.salt2 = binary.BigEndian.Uint32(hdr[20:]) + r.chksum1, r.chksum2 = chksum1, chksum2 return nil } @@ -318,18 +318,18 @@ func (r *WALReader) ReadFrame(data []byte) (pgno, commit uint32, err error) { } // Verify salt matches the salt in the header. - salt0 := binary.BigEndian.Uint32(hdr[8:]) - salt1 := binary.BigEndian.Uint32(hdr[12:]) - if r.salt0 != salt0 || r.salt1 != salt1 { + salt1 := binary.BigEndian.Uint32(hdr[8:]) + salt2 := binary.BigEndian.Uint32(hdr[12:]) + if r.salt1 != salt1 || r.salt2 != salt2 { return 0, 0, io.EOF } // Verify the checksum is valid. - chksum0 := binary.BigEndian.Uint32(hdr[16:]) - chksum1 := binary.BigEndian.Uint32(hdr[20:]) - r.chksum0, r.chksum1 = WALChecksum(r.bo, r.chksum0, r.chksum1, hdr[:8]) // frame header - r.chksum0, r.chksum1 = WALChecksum(r.bo, r.chksum0, r.chksum1, data) // frame data - if r.chksum0 != chksum0 || r.chksum1 != chksum1 { + chksum1 := binary.BigEndian.Uint32(hdr[16:]) + chksum2 := binary.BigEndian.Uint32(hdr[20:]) + r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, hdr[:8]) // frame header + r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, data) // frame data + if r.chksum1 != chksum1 || r.chksum2 != chksum2 { return 0, 0, io.EOF }