Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event stream #401

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions cmd/litefs/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"database/sql"
_ "embed"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -2302,6 +2303,122 @@ func TestMultiNode_WriteSnapshot_LockingProtocol(t *testing.T) {
runMountCommand(t, cmd1)
}

func TestEventStream(t *testing.T) {
t.Run("Tx/Primary", func(t *testing.T) {
cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil))
db := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db"))

resp, err := http.Get(cmd0.HTTPServer.URL() + "/events")
if err != nil {
t.Fatal(err)
}
defer func() { _ = resp.Body.Close() }()

dec := json.NewDecoder(resp.Body)

var offset ltx.TXID
if testingutil.IsWALMode() {
offset = 1
}

var event litefs.Event
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "init"; got != want {
t.Fatalf("type=%s, want %s", got, want)
}

if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil {
t.Fatal(err)
}
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+1); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}

if _, err := db.Exec(`INSERT INTO t VALUES (100)`); err != nil {
t.Fatal(err)
}
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+2); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}
})

t.Run("Tx/Replica", func(t *testing.T) {
cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil))
waitForPrimary(t, cmd0)
cmd1 := runMountCommand(t, newMountCommand(t, t.TempDir(), cmd0))
db := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db"))

resp, err := http.Get(cmd1.HTTPServer.URL() + "/events")
if err != nil {
t.Fatal(err)
}
defer func() { _ = resp.Body.Close() }()

dec := json.NewDecoder(resp.Body)

var event litefs.Event
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "init"; got != want {
t.Fatalf("type=%s, want %s", got, want)
}

var offset ltx.TXID
if testingutil.IsWALMode() {
offset = 1

if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(1); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}
}

if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil {
t.Fatal(err)
}
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+1); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}

if _, err := db.Exec(`INSERT INTO t VALUES (100)`); err != nil {
t.Fatal(err)
}
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+2); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}
})
}

// Ensure multiple nodes can run in a cluster for an extended period of time.
func TestFunctional_OK(t *testing.T) {
if *funTime <= 0 {
Expand Down
57 changes: 55 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,19 @@ func (db *DB) CommitWAL(ctx context.Context) (err error) {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: enc.Header().PageSize,
Commit: enc.Header().Commit,
Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(),
},
})

// Perform full checksum verification, if set. For testing only.
if db.store.StrictVerify {
if chksum, err := db.onDiskChecksum(dbFile, walFile); err != nil {
Expand Down Expand Up @@ -2109,6 +2122,19 @@ func (db *DB) CommitJournal(ctx context.Context, mode JournalMode) (err error) {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: enc.Header().PageSize,
Commit: enc.Header().Commit,
Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(),
},
})

// Calculate checksum for entire database.
if db.store.StrictVerify {
if chksum, err := db.onDiskChecksum(dbFile, nil); err != nil {
Expand Down Expand Up @@ -2237,6 +2263,19 @@ func (db *DB) Drop(ctx context.Context) (err error) {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: enc.Header().PageSize,
Commit: enc.Header().Commit,
Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(),
},
})

return nil
}

Expand Down Expand Up @@ -2521,10 +2560,11 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
}

// Update transaction for database.
if err := db.setPos(ltx.Pos{
pos := ltx.Pos{
TXID: dec.Header().MaxTXID,
PostApplyChecksum: dec.Trailer().PostApplyChecksum,
}, dec.Header().Timestamp); err != nil {
}
if err := db.setPos(pos, dec.Header().Timestamp); err != nil {
return fmt.Errorf("set pos: %w", err)
}

Expand All @@ -2543,6 +2583,19 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: dec.Header().PageSize,
Commit: dec.Header().Commit,
Timestamp: time.UnixMilli(dec.Header().Timestamp).UTC(),
},
})

// Calculate latency since LTX file was written.
latency := float64(time.Now().UnixMilli()-dec.Header().Timestamp) / 1000
dbLatencySecondsMetricVec.WithLabelValues(db.name).Set(latency)
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/superfly/litefs-go v0.0.0-20230227231337-34ea5dcf1e0b h1:+WuhtZFB8fNdPeaMUtuB/U8aknXBXdDW/mBm/HTYJNg=
github.com/superfly/litefs-go v0.0.0-20230227231337-34ea5dcf1e0b/go.mod h1:h+GUx1V2s0C5nY73ZN82760eWEJrpMaiDweF31VmJKk=
github.com/superfly/ltx v0.3.12 h1:Z7z1sc4g34/jUi3XO84+zBlIsbaoh2RJ3b4zTQpBK/M=
github.com/superfly/ltx v0.3.12/go.mod h1:ly+Dq7UVacQVEI5/b0r6j+PSNy9ibwx1yikcWAaSkhE=
github.com/superfly/ltx v0.3.13 h1:IbuocKJ6sY2jYvZbpUGMYmTkvaLSGUderEZwmaIUmJ0=
github.com/superfly/ltx v0.3.13/go.mod h1:ly+Dq7UVacQVEI5/b0r6j+PSNy9ibwx1yikcWAaSkhE=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
Expand Down
36 changes: 35 additions & 1 deletion http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ func (s *Server) serveHTTP(w http.ResponseWriter, r *http.Request) {
Error(w, r, fmt.Errorf("method not allowed"), http.StatusMethodNotAllowed)
}

case "/events":
switch r.Method {
case http.MethodGet:
s.handleGetEvents(w, r)
default:
Error(w, r, fmt.Errorf("method not allowed"), http.StatusMethodNotAllowed)
}

default:
http.NotFound(w, r)
}
Expand Down Expand Up @@ -506,7 +514,7 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) {
defer serverStreamCountMetric.Dec()

// Subscribe to store changes
subscription := s.store.Subscribe(id)
subscription := s.store.SubscribeChangeSet(id)
defer func() { _ = subscription.Close() }()

// Read in pos map.
Expand Down Expand Up @@ -745,6 +753,32 @@ func (s *Server) streamLTXSnapshot(ctx context.Context, w http.ResponseWriter, d
return ltx.Pos{TXID: header.MaxTXID, PostApplyChecksum: trailer.PostApplyChecksum}, nil
}

func (s *Server) handleGetEvents(w http.ResponseWriter, r *http.Request) {
subscription := s.store.SubscribeEvents()
defer func() { subscription.Stop() }()

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

enc := json.NewEncoder(w)
for {
select {
case <-r.Context().Done():
return
case event, ok := <-subscription.C():
if !ok {
log.Printf("http: event stream buffer exceeded, disconnecting")
return
}
if err := enc.Encode(event); err != nil {
log.Printf("http: event stream error: %s", err)
return
}
w.(http.Flusher).Flush()
}
}
}

func Error(w http.ResponseWriter, r *http.Request, err error, code int) {
log.Printf("http: %s %s: error: %s", r.Method, r.URL.Path, err)
http.Error(w, err.Error(), code)
Expand Down
Loading
Loading