Skip to content

Commit

Permalink
Incremental log replication instead of crazy re-apply on every mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
maxpert committed Oct 12, 2022
1 parent de70aa7 commit 2f4b42a
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 103 deletions.
2 changes: 2 additions & 0 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (

var Cleanup = flag.Bool("cleanup", false, "Only cleanup marmot triggers and changelogs")
var SaveSnapshot = flag.Bool("save-snapshot", false, "Only take snapshot and upload")

var EnableSnapshot = flag.Bool("enable-snapshot", true, "Restore snapshot at boot")
var SeqMapPath = flag.String("seq-map-path", "/tmp/seq-map.cbor", "Path to stream sequence map")
var DBPathString = flag.String("db-path", "/tmp/marmot.db", "Path to SQLite database")
var NodeID = flag.Uint64("node-id", rand.Uint64(), "Node ID")
var NatsAddr = flag.String("nats-url", nats.DefaultURL, "NATS server URL")
Expand Down
95 changes: 54 additions & 41 deletions db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,46 @@ type ColumnInfo struct {
IsPrimaryKey bool `db:"pk"`
}

func RestoreFrom(destPath, bkFilePath string) error {
dnsTpl := "%s?_journal_mode=wal&_foreign_keys=false&_busy_timeout=30000&_txlock=%s"
dns := fmt.Sprintf(dnsTpl, destPath, snapshotTransactionMode)
destDB, dest, err := pool.OpenRaw(dns)
if err != nil {
return err
}
defer dest.Close()

dns = fmt.Sprintf(dnsTpl, bkFilePath, snapshotTransactionMode)
srcDB, src, err := pool.OpenRaw(dns)
if err != nil {
return err
}
defer src.Close()

dgSQL := goqu.New("sqlite", destDB)
sgSQL := goqu.New("sqlite", srcDB)

// Source locking is required so that any lock related metadata is mirrored in destination
// Transacting on both src and dest in immediate mode makes sure nobody
// else is modifying or interacting with DB
err = sgSQL.WithTx(func(dtx *goqu.TxDatabase) error {
return dgSQL.WithTx(func(_ *goqu.TxDatabase) error {
return copyFile(destPath, bkFilePath)
})
})

if err != nil {
return err
}

err = performCheckpoint(dgSQL)
if err != nil {
return err
}

return nil
}

func GetAllDBTables(path string) ([]string, error) {
connectionStr := fmt.Sprintf("%s?_journal_mode=wal", path)
conn, rawConn, err := pool.OpenRaw(connectionStr)
Expand Down Expand Up @@ -67,6 +107,17 @@ func OpenStreamDB(path string) (*SqliteStreamDB, error) {
return nil, err
}

conn, err := dbPool.Borrow()
if err != nil {
return nil, err
}
defer conn.Return()

err = performCheckpoint(conn.DB())
if err != nil {
return nil, err
}

ret := &SqliteStreamDB{
pool: dbPool,
dbPath: path,
Expand Down Expand Up @@ -231,46 +282,6 @@ func (conn *SqliteStreamDB) BackupTo(bkFilePath string) error {
return nil
}

func (conn *SqliteStreamDB) RestoreFrom(bkFilePath string) error {
dnsTpl := "%s?_journal_mode=wal&_foreign_keys=false&_busy_timeout=30000&_txlock=%s"
dns := fmt.Sprintf(dnsTpl, conn.dbPath, snapshotTransactionMode)
destDB, dest, err := pool.OpenRaw(dns)
if err != nil {
return err
}
defer dest.Close()

dns = fmt.Sprintf(dnsTpl, bkFilePath, snapshotTransactionMode)
srcDB, src, err := pool.OpenRaw(dns)
if err != nil {
return err
}
defer src.Close()

dgSQL := goqu.New("sqlite", destDB)
sgSQL := goqu.New("sqlite", srcDB)

// Source locking is required so that any lock related metadata is mirrored in destination
// Transacting on both src and dest in immediate mode makes sure nobody
// else is modifying or interacting with DB
err = sgSQL.WithTx(func(dtx *goqu.TxDatabase) error {
return dgSQL.WithTx(func(_ *goqu.TxDatabase) error {
return copyFile(conn.dbPath, bkFilePath)
})
})

if err != nil {
return err
}

err = performCheckpoint(dgSQL)
if err != nil {
return err
}

return nil
}

func (conn *SqliteStreamDB) GetRawConnection() *sqlite3.SQLiteConn {
return conn.rawConnection
}
Expand All @@ -286,7 +297,7 @@ func copyFile(toPath, fromPath string) error {
}
defer fi.Close()

fo, err := os.OpenFile(toPath, os.O_WRONLY|os.O_TRUNC, 0)
fo, err := os.OpenFile(toPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC|os.O_SYNC, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -317,6 +328,8 @@ func listDBTables(names *[]string, gSQL *goqu.TxDatabase) error {

func performCheckpoint(gSQL *goqu.Database) error {
rBusy, rLog, rCheckpoint := int64(1), int64(0), int64(0)
log.Debug().Msg("Forcing WAL checkpoint")

for rBusy != 0 {
row := gSQL.QueryRow("PRAGMA wal_checkpoint(truncate);")
err := row.Scan(&rBusy, &rLog, &rCheckpoint)
Expand Down
79 changes: 79 additions & 0 deletions logstream/replication_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package logstream

import (
"errors"
"io"
"os"
"sync"

"github.com/fxamacker/cbor/v2"
"github.com/maxpert/marmot/cfg"
)

var ErrNotInitialized = errors.New("not initialized")

type replicationState struct {
seq map[string]uint64
lock *sync.RWMutex
fl *os.File
}

func (r *replicationState) init() error {
fl, err := os.OpenFile(*cfg.SeqMapPath, os.O_RDWR|os.O_CREATE|os.O_SYNC, 0666)
if err != nil {
return err
}

r.seq = make(map[string]uint64)
r.lock = &sync.RWMutex{}
r.fl = fl

idx, err := fl.Seek(0, io.SeekEnd)
if idx < 1 {
return nil
}

_, err = fl.Seek(0, io.SeekStart)
if err != nil {
return err
}

return cbor.NewDecoder(fl).Decode(&r.seq)
}

func (r *replicationState) save(streamName string, seq uint64) (uint64, error) {
r.lock.Lock()
defer r.lock.Unlock()
if r.fl == nil {
return 0, ErrNotInitialized
}

if old, found := r.seq[streamName]; found && seq <= old {
return old, nil
}

_, err := r.fl.Seek(0, io.SeekStart)
if err != nil {
return 0, err
}
defer r.fl.Sync()

r.seq[streamName] = seq
err = cbor.NewEncoder(r.fl).Encode(r.seq)
if err != nil {
return 0, err
}

return seq, nil
}

func (r *replicationState) get(streamName string) uint64 {
r.lock.RLock()
defer r.lock.RUnlock()

if old, found := r.seq[streamName]; found {
return old
}

return 0
}
Loading

0 comments on commit 2f4b42a

Please sign in to comment.