Skip to content

Commit

Permalink
Exit on WAL commit failure
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Sep 20, 2023
1 parent a960f04 commit feb7e0f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 35 deletions.
6 changes: 5 additions & 1 deletion cmd/litefs/mount_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ type MountCommand struct {

Config Config

OS litefs.OS
OS litefs.OS
Exit func(int)

Store *litefs.Store
Leaser litefs.Leaser
FileSystem *fuse.FileSystem
Expand All @@ -52,6 +54,7 @@ func NewMountCommand() *MountCommand {
execCh: make(chan error),
Config: NewConfig(),
OS: &internal.SystemOS{},
Exit: os.Exit,
}
}

Expand Down Expand Up @@ -351,6 +354,7 @@ func (c *MountCommand) initConsul(ctx context.Context) (err error) {
func (c *MountCommand) initStore(ctx context.Context) error {
c.Store = litefs.NewStore(c.Config.Data.Dir, c.Config.Lease.Candidate)
c.Store.OS = c.OS
c.Store.Exit = c.Exit
c.Store.StrictVerify = c.Config.StrictVerify
c.Store.Compress = c.Config.Data.Compress
c.Store.Retention = c.Config.Data.Retention
Expand Down
55 changes: 55 additions & 0 deletions cmd/litefs/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
main "github.com/superfly/litefs/cmd/litefs"
"github.com/superfly/litefs/internal"
"github.com/superfly/litefs/internal/testingutil"
"github.com/superfly/litefs/mock"
"github.com/superfly/ltx"
"golang.org/x/exp/slog"
"golang.org/x/net/http2"
Expand Down Expand Up @@ -328,6 +329,60 @@ func TestSingleNode_DropDB(t *testing.T) {
}
}

func TestSingleNode_ErrCommitWAL(t *testing.T) {
if !testingutil.IsWALMode() {
t.Skip("test failure only applies to WAL mode, skipping")
}

mos := mock.NewOS()
dir := t.TempDir()
cmd0 := newMountCommand(t, dir, nil)
cmd0.OS = mos
runMountCommand(t, cmd0)
db := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db"))

if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil {
t.Fatal(err)
} else if _, err := db.Exec(`INSERT INTO t VALUES (100)`); err != nil {
t.Fatal(err)
}

var exitCode int
cmd0.Store.Exit = func(code int) {
exitCode = code
}
mos.OpenFunc = func(name string) (*os.File, error) {
if base := filepath.Base(name); base == "wal" {
return nil, fmt.Errorf("marker")
}
return os.Open(name)
}
if _, err := db.Exec(`INSERT INTO t VALUES (200)`); err != nil {
t.Fatal(err)
}

if got, want := exitCode, 99; got != want {
t.Fatalf("exit=%d, want=%d", got, want)
}
if err := db.Close(); err != nil {
t.Fatal(err)
} else if err := cmd0.Close(); err != nil {
t.Fatal(err)
}

// Restart the mount command.
cmd0 = runMountCommand(t, newMountCommand(t, dir, nil))
db = testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db"))

// Ensure the second insert was not processed.
var x int
if err := db.QueryRow(`SELECT SUM(x) FROM t`).Scan(&x); err != nil {
t.Fatal(err)
} else if got, want := x, 100; got != want {
t.Fatalf("x=%d, want %d", got, want)
}
}

func TestSingleNode_BackupClient(t *testing.T) {
t.Run("OK", func(t *testing.T) {
cmd0 := newMountCommand(t, t.TempDir(), nil)
Expand Down
41 changes: 9 additions & 32 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1518,19 +1518,17 @@ func (db *DB) CommitWAL(ctx context.Context) (err error) {
TraceLog.Printf("[CommitWALBegin(%s)]: prev=%s offset=%d salt1=%08x salt2=%08x chksum1=%08x chksum2=%08x remote=%v",
db.name, prevPos, db.wal.offset, db.wal.salt1, db.wal.salt2, db.wal.chksum1, db.wal.chksum2, db.HasRemoteHaltLock())

// Ensure the WAL is truncated back to the previous offset if an error
// occurs so that the WAL pages do not get checkpointed back to the main
// database file. If we can't truncate then we have to abort the process.
//
// On restart, the recovery will ensure the LTX files and WAL file are
// synced up being proceeding so that is safe.
// WAL commit occurs when the write lock is released so returning an error
// doesn't affect the SQLite client and it will continue operating as usual.
// For now, we need to fatally stop LiteFS when we encounter an error in the
// final commit phase as we don't have control of the SQLite clients. This
// prevents SQLite and LiteFS from becoming out of sync. On next startup,
// LiteFS will perform a recovery and sync the database & LTX files correctly.
defer func() {
if err != nil {
TraceLog.Printf("[CommitWAL.TruncateOnError(%s)]: offset=%d\n", db.name, db.wal.offset)

if e := db.truncateWALFileAfterError(db.WALPath(), db.wal.offset); e != nil {
panic("litefs.DB.CommitWAL(): failed to truncate WAL file after commit error: " + e.Error())
}
TraceLog.Printf("[FATAL(%s)]: err=%d\n", db.name, err)
log.Printf("fatal error occurred while committing WAL to %q: %s\n", db.name, err)
db.store.Exit(99)
}
}()

Expand Down Expand Up @@ -1762,27 +1760,6 @@ func (db *DB) CommitWAL(ctx context.Context) (err error) {
return nil
}

// truncateWALFileAfterError is called when CommitWAL() fails and we need to
// revert to the original position. It is wrapped in a function so we can panic
// in the caller if this fails.
func (db *DB) truncateWALFileAfterError(name string, offset int64) error {
f, err := db.os.OpenFile(name, os.O_RDWR, 0o666)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer func() { _ = f.Close() }()

if err := f.Truncate(offset); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
return f.Close()
}

// readPage reads the latest version of the page before the current transaction.
func (db *DB) readPage(dbFile, walFile *os.File, pgno uint32, buf []byte) error {
// Read from previous position in WAL, if available.
Expand Down
6 changes: 4 additions & 2 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type Store struct {
g errgroup.Group

// The operating system interface to use for system calls. Defaults to SystemOS.
OS OS
OS OS
Exit func(int)

// Client used to connect to other LiteFS instances.
Client Client
Expand Down Expand Up @@ -152,7 +153,8 @@ func NewStore(path string, candidate bool) *Store {
readyCh: make(chan struct{}),
demoteCh: make(chan struct{}),

OS: &internal.SystemOS{},
OS: &internal.SystemOS{},
Exit: os.Exit,

ReconnectDelay: DefaultReconnectDelay,
DemoteDelay: DefaultDemoteDelay,
Expand Down

0 comments on commit feb7e0f

Please sign in to comment.