Skip to content

Commit

Permalink
Fix Pos() deadlock with file system invalidation (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson authored Nov 29, 2022
1 parent a66fc96 commit 43de126
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 42 deletions.
58 changes: 17 additions & 41 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -33,14 +33,13 @@ const (

// DB represents a SQLite database.
type DB struct {
mu sync.Mutex
store *Store // parent store
name string // name of database
path string // full on-disk path
pageSize uint32 // database page size, if known
pageN uint32 // database size, in pages
pos Pos // current tx position
mode DBMode // database journaling mode (rollback, wal)
store *Store // parent store
name string // name of database
path string // full on-disk path
pageSize uint32 // database page size, if known
pageN uint32 // database size, in pages
pos atomic.Value // current tx position (Pos)
mode DBMode // database journaling mode (rollback, wal)

dirtyPageSet map[uint32]struct{}

Expand Down Expand Up @@ -83,6 +82,7 @@ func NewDB(store *Store, name string, path string) *DB {

Now: time.Now,
}
db.pos.Store(Pos{})
db.wal.frameOffsets = make(map[uint32]int64)
return db
}
Expand Down Expand Up @@ -137,23 +137,14 @@ func (db *DB) WALPath() string { return filepath.Join(db.path, "wal") }
// SHMPath returns the path to the underlying shared memory file.
func (db *DB) SHMPath() string { return filepath.Join(db.path, "shm") }

// PageSize returns the page size of the underlying database.
func (db *DB) PageSize() uint32 {
db.mu.Lock()
defer db.mu.Unlock()
return db.pageSize
}

// Pos returns the current transaction position of the database.
func (db *DB) Pos() Pos {
db.mu.Lock()
defer db.mu.Unlock()
return db.pos
return db.pos.Load().(Pos)
}

// setPos sets the current transaction position of the database.
func (db *DB) setPos(pos Pos) error {
db.pos = pos
db.pos.Store(pos)

// Invalidate page cache.
if invalidator := db.store.Invalidator; invalidator != nil {
Expand All @@ -163,7 +154,7 @@ func (db *DB) setPos(pos Pos) error {
}

// Update metrics.
dbTXIDMetricVec.WithLabelValues(db.name).Set(float64(db.pos.TXID))
dbTXIDMetricVec.WithLabelValues(db.name).Set(float64(pos.TXID))

return nil
}
Expand Down Expand Up @@ -491,8 +482,8 @@ func (db *DB) verifyDatabaseFile() error {
}

// Ensure database checksum matches checksum in current position.
if chksum != db.pos.PostApplyChecksum {
return fmt.Errorf("database checksum (%016x) does not match latest LTX checksum (%016x)", chksum, db.pos.PostApplyChecksum)
if pos := db.Pos(); chksum != pos.PostApplyChecksum {
return fmt.Errorf("database checksum (%016x) does not match latest LTX checksum (%016x)", chksum, pos.PostApplyChecksum)
}

return nil
Expand All @@ -513,9 +504,6 @@ func (db *DB) OpenLTXFile(txID uint64) (*os.File, error) {

// WriteDatabase writes data to the main database file.
func (db *DB) WriteDatabase(f *os.File, data []byte, offset int64) error {
db.mu.Lock()
defer db.mu.Unlock()

// Return an error if the current process is not the leader.
if !db.store.IsPrimary() {
return ErrReadOnlyReplica
Expand Down Expand Up @@ -568,9 +556,6 @@ func (db *DB) CreateWAL() (*os.File, error) {
// WriteWAL writes data to the WAL file. On final commit write, an LTX file is
// generated for the transaction.
func (db *DB) WriteWAL(f *os.File, data []byte, offset int64) error {
db.mu.Lock()
defer db.mu.Unlock()

// Return an error if the current process is not the leader.
if !db.store.IsPrimary() {
return ErrReadOnlyReplica
Expand Down Expand Up @@ -703,7 +688,7 @@ func (db *DB) CommitWAL() error {
prevPageN := binary.BigEndian.Uint32(page[SQLITE_DATABASE_SIZE_OFFSET:])

// Determine transaction ID of the in-process transaction.
pos := db.pos
pos := db.Pos()
txID := pos.TXID + 1

// Compute rolling checksum based off previous LTX database checksum.
Expand Down Expand Up @@ -908,9 +893,6 @@ func isByteSliceZero(b []byte) bool {

// CommitJournal deletes the journal file which commits or rolls back the transaction.
func (db *DB) CommitJournal(mode JournalMode) error {
db.mu.Lock()
defer db.mu.Unlock()

// Return an error if the current process is not the leader.
if !db.store.IsPrimary() {
return ErrReadOnlyReplica
Expand All @@ -933,7 +915,7 @@ func (db *DB) CommitJournal(mode JournalMode) error {
}

// Determine transaction ID of the in-process transaction.
pos := db.pos
pos := db.Pos()
txID := pos.TXID + 1

dbFile, err := os.Open(db.DatabasePath())
Expand Down Expand Up @@ -1262,9 +1244,6 @@ func (db *DB) ApplyLTX(ctx context.Context, path string) error {
return fmt.Errorf("sync database file: %w", err)
}

db.mu.Lock()
defer db.mu.Unlock()

db.mode = dbMode

if db.pageSize == 0 {
Expand Down Expand Up @@ -1433,14 +1412,12 @@ func (db *DB) WriteSnapshotTo(ctx context.Context, dst io.Writer) (header ltx.He
}

// Determine current position & snapshot overriding WAL frames.
db.mu.Lock()
pos := db.pos
pos := db.Pos()
pageSize, pageN := db.pageSize, db.pageN
walFrameOffsets := make(map[uint32]int64, len(db.wal.frameOffsets))
for k, v := range db.wal.frameOffsets {
walFrameOffsets[k] = v
}
db.mu.Unlock()

// Log transaction ID for the snapshot.
log.Printf("writing snapshot %q @ %s", db.name, ltx.FormatTXID(pos.TXID))
Expand Down Expand Up @@ -1560,7 +1537,6 @@ func (db *DB) EnforceRetention(ctx context.Context, minTime time.Time) error {

type dbVarJSON struct {
Name string `json:"name"`
PageSize uint32 `json:"pageSize"`
TXID string `json:"txid"`
Checksum string `json:"checksum"`

Expand Down
1 change: 0 additions & 1 deletion store.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,6 @@ func (v *StoreVar) String() string {

dbJSON := &dbVarJSON{
Name: db.Name(),
PageSize: db.PageSize(),
TXID: ltx.FormatTXID(pos.TXID),
Checksum: fmt.Sprintf("%016x", pos.PostApplyChecksum),
}
Expand Down

0 comments on commit 43de126

Please sign in to comment.