Skip to content

Commit

Permalink
Exit on partial ltx
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Oct 3, 2023
1 parent 8a42ed5 commit e851250
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 9 deletions.
36 changes: 36 additions & 0 deletions cmd/litefs/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/fs"
"log"
"math/rand"
"net"
Expand Down Expand Up @@ -1575,6 +1576,41 @@ func TestMultiNode_EnsureReadOnlyReplica(t *testing.T) {
}
}

func TestMultiNode_ErrApplyLTX(t *testing.T) {
cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil))
waitForPrimary(t, cmd0)

mos := mock.NewOS()
cmd1 := newMountCommand(t, t.TempDir(), cmd0)
cmd1.OS = mos
runMountCommand(t, cmd1)
db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db"))

ch := make(chan int)
cmd1.Store.Exit = func(code int) {
ch <- code
}
mos.OpenFileFunc = func(op, name string, flag int, perm fs.FileMode) (*os.File, error) {
if op == "UPDATESHM" {
return nil, fmt.Errorf("marker")
}
return os.OpenFile(name, flag, perm)
}

if _, err := db0.Exec(`CREATE TABLE t (x)`); err != nil {
t.Fatal(err)
}

select {
case exitCode := <-ch:
if got, want := exitCode, 99; got != want {
t.Fatalf("code=%v, want %v", got, want)
}
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for exit code")
}
}

func TestMultiNode_Halt(t *testing.T) {
t.Run("Commit", func(t *testing.T) {
cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil))
Expand Down
23 changes: 17 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func (db *DB) CheckpointNoLock(ctx context.Context) (err error) {
db.wal.chksums = make(map[uint32][]ltx.Checksum)

// Update the SHM file.
if err := db.updateSHM(ctx); err != nil {
if err := db.updateSHM(); err != nil {
return fmt.Errorf("update shm: %w", err)
}

Expand Down Expand Up @@ -2429,11 +2429,11 @@ func (db *DB) ApplyLTX(ctx context.Context, path string) error {
}
defer guard.Unlock()

return db.ApplyLTXNoLock(ctx, path)
return db.ApplyLTXNoLock(path)
}

// ApplyLTXNoLock applies an LTX file to the database.
func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
func (db *DB) ApplyLTXNoLock(path string) (retErr error) {
var hdr ltx.Header
var trailer ltx.Trailer
prevDBMode := db.Mode()
Expand Down Expand Up @@ -2469,6 +2469,17 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
defer func() { _ = dbFile.Close() }()
}

// After this point, a partial failure will result in a partially written
// database. We don't have the ability to signal to the client that a failure
// occurred so we need to exit.
defer func() {
if retErr != nil {
TraceLog.Printf("[FATAL(%s)]: err=%d\n", db.name, retErr)
log.Printf("fatal error occurred while applying ltx to %q, exiting: %s\n", db.name, retErr)
db.store.Exit(99)
}
}()

pageBuf := make([]byte, dec.Header().PageSize)
for i := 0; ; i++ {
// Read pgno & page data from LTX file.
Expand Down Expand Up @@ -2548,7 +2559,7 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
}

// Rewrite SHM so that the transaction is visible.
if err := db.updateSHM(ctx); err != nil {
if err := db.updateSHM(); err != nil {
return fmt.Errorf("update shm: %w", err)
}

Expand Down Expand Up @@ -2583,7 +2594,7 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
}

// updateSHM recomputes the SHM header for a replica node (with no WAL frames).
func (db *DB) updateSHM(ctx context.Context) error {
func (db *DB) updateSHM() error {
// This lock prevents an issue where triggering SHM invalidation in FUSE
// causes a write to be issued through the mmap which overwrites our change.
// This lock blocks that from occurring.
Expand Down Expand Up @@ -2780,7 +2791,7 @@ func (db *DB) Import(ctx context.Context, r io.Reader) error {
return err
}

return db.ApplyLTXNoLock(ctx, db.LTXPath(pos.TXID, pos.TXID))
return db.ApplyLTXNoLock(db.LTXPath(pos.TXID, pos.TXID))
}

// importToLTX reads a SQLite database and writes it to the next LTX file.
Expand Down
2 changes: 1 addition & 1 deletion http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (s *Server) handlePostTx(w http.ResponseWriter, r *http.Request) {
}

// Apply transaction to database.
if err := db.ApplyLTXNoLock(r.Context(), ltxPath); err != nil {
if err := db.ApplyLTXNoLock(ltxPath); err != nil {
Error(w, r, fmt.Errorf("cannot apply ltx: %s", err), http.StatusInternalServerError)
return
}
Expand Down
4 changes: 2 additions & 2 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ func (s *Store) restoreDBFromBackup(ctx context.Context, name string) (newPos lt
}

// Apply transaction to database.
if err := db.ApplyLTXNoLock(ctx, ltxPath); err != nil {
if err := db.ApplyLTXNoLock(ltxPath); err != nil {
return ltx.Pos{}, fmt.Errorf("cannot apply ltx: %s", err)
}
newPos = db.Pos()
Expand Down Expand Up @@ -1605,7 +1605,7 @@ func (s *Store) processLTXStreamFrame(ctx context.Context, frame *LTXStreamFrame
}

// Attempt to apply the LTX file to the database.
if err := db.ApplyLTXNoLock(ctx, path); err != nil {
if err := db.ApplyLTXNoLock(path); err != nil {
return fmt.Errorf("apply ltx: %w", err)
}

Expand Down

0 comments on commit e851250

Please sign in to comment.