From e85125033e70e2affecc9cb934821940f90f25f6 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 3 Oct 2023 16:11:13 -0600 Subject: [PATCH] Exit on partial ltx --- cmd/litefs/mount_test.go | 36 ++++++++++++++++++++++++++++++++++++ db.go | 23 +++++++++++++++++------ http/server.go | 2 +- store.go | 4 ++-- 4 files changed, 56 insertions(+), 9 deletions(-) diff --git a/cmd/litefs/mount_test.go b/cmd/litefs/mount_test.go index 90b7eef..229690b 100644 --- a/cmd/litefs/mount_test.go +++ b/cmd/litefs/mount_test.go @@ -11,6 +11,7 @@ import ( "encoding/json" "fmt" "io" + "io/fs" "log" "math/rand" "net" @@ -1575,6 +1576,41 @@ func TestMultiNode_EnsureReadOnlyReplica(t *testing.T) { } } +func TestMultiNode_ErrApplyLTX(t *testing.T) { + cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil)) + waitForPrimary(t, cmd0) + + mos := mock.NewOS() + cmd1 := newMountCommand(t, t.TempDir(), cmd0) + cmd1.OS = mos + runMountCommand(t, cmd1) + db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db")) + + ch := make(chan int) + cmd1.Store.Exit = func(code int) { + ch <- code + } + mos.OpenFileFunc = func(op, name string, flag int, perm fs.FileMode) (*os.File, error) { + if op == "UPDATESHM" { + return nil, fmt.Errorf("marker") + } + return os.OpenFile(name, flag, perm) + } + + if _, err := db0.Exec(`CREATE TABLE t (x)`); err != nil { + t.Fatal(err) + } + + select { + case exitCode := <-ch: + if got, want := exitCode, 99; got != want { + t.Fatalf("code=%v, want %v", got, want) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for exit code") + } +} + func TestMultiNode_Halt(t *testing.T) { t.Run("Commit", func(t *testing.T) { cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil)) diff --git a/db.go b/db.go index 945cddc..3bb010c 100644 --- a/db.go +++ b/db.go @@ -748,7 +748,7 @@ func (db *DB) CheckpointNoLock(ctx context.Context) (err error) { db.wal.chksums = make(map[uint32][]ltx.Checksum) // Update the SHM file. - if err := db.updateSHM(ctx); err != nil { + if err := db.updateSHM(); err != nil { return fmt.Errorf("update shm: %w", err) } @@ -2429,11 +2429,11 @@ func (db *DB) ApplyLTX(ctx context.Context, path string) error { } defer guard.Unlock() - return db.ApplyLTXNoLock(ctx, path) + return db.ApplyLTXNoLock(path) } // ApplyLTXNoLock applies an LTX file to the database. -func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error { +func (db *DB) ApplyLTXNoLock(path string) (retErr error) { var hdr ltx.Header var trailer ltx.Trailer prevDBMode := db.Mode() @@ -2469,6 +2469,17 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error { defer func() { _ = dbFile.Close() }() } + // After this point, a partial failure will result in a partially written + // database. We don't have the ability to signal to the client that a failure + // occurred so we need to exit. + defer func() { + if retErr != nil { + TraceLog.Printf("[FATAL(%s)]: err=%d\n", db.name, retErr) + log.Printf("fatal error occurred while applying ltx to %q, exiting: %s\n", db.name, retErr) + db.store.Exit(99) + } + }() + pageBuf := make([]byte, dec.Header().PageSize) for i := 0; ; i++ { // Read pgno & page data from LTX file. @@ -2548,7 +2559,7 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error { } // Rewrite SHM so that the transaction is visible. - if err := db.updateSHM(ctx); err != nil { + if err := db.updateSHM(); err != nil { return fmt.Errorf("update shm: %w", err) } @@ -2583,7 +2594,7 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error { } // updateSHM recomputes the SHM header for a replica node (with no WAL frames). -func (db *DB) updateSHM(ctx context.Context) error { +func (db *DB) updateSHM() error { // This lock prevents an issue where triggering SHM invalidation in FUSE // causes a write to be issued through the mmap which overwrites our change. // This lock blocks that from occurring. @@ -2780,7 +2791,7 @@ func (db *DB) Import(ctx context.Context, r io.Reader) error { return err } - return db.ApplyLTXNoLock(ctx, db.LTXPath(pos.TXID, pos.TXID)) + return db.ApplyLTXNoLock(db.LTXPath(pos.TXID, pos.TXID)) } // importToLTX reads a SQLite database and writes it to the next LTX file. diff --git a/http/server.go b/http/server.go index fe59168..bbcb5b4 100644 --- a/http/server.go +++ b/http/server.go @@ -481,7 +481,7 @@ func (s *Server) handlePostTx(w http.ResponseWriter, r *http.Request) { } // Apply transaction to database. - if err := db.ApplyLTXNoLock(r.Context(), ltxPath); err != nil { + if err := db.ApplyLTXNoLock(ltxPath); err != nil { Error(w, r, fmt.Errorf("cannot apply ltx: %s", err), http.StatusInternalServerError) return } diff --git a/store.go b/store.go index b68e12f..4ac1a73 100644 --- a/store.go +++ b/store.go @@ -1326,7 +1326,7 @@ func (s *Store) restoreDBFromBackup(ctx context.Context, name string) (newPos lt } // Apply transaction to database. - if err := db.ApplyLTXNoLock(ctx, ltxPath); err != nil { + if err := db.ApplyLTXNoLock(ltxPath); err != nil { return ltx.Pos{}, fmt.Errorf("cannot apply ltx: %s", err) } newPos = db.Pos() @@ -1605,7 +1605,7 @@ func (s *Store) processLTXStreamFrame(ctx context.Context, frame *LTXStreamFrame } // Attempt to apply the LTX file to the database. - if err := db.ApplyLTXNoLock(ctx, path); err != nil { + if err := db.ApplyLTXNoLock(path); err != nil { return fmt.Errorf("apply ltx: %w", err) }