Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch latest snapshot metadata to determine snapshot creation need #597

Merged
merged 18 commits into from
Aug 7, 2023
Merged
13 changes: 12 additions & 1 deletion server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ var (
// ErrDocumentNotFound is returned when the document could not be found.
ErrDocumentNotFound = errors.New("document not found")

// ErrSnapshotNotFound is returned when the snapshot could not be found.
ErrSnapshotNotFound = errors.New("snapshot not found")

// ErrConflictOnUpdate is returned when a conflict occurs during update.
ErrConflictOnUpdate = errors.New("conflict on update")

Expand Down Expand Up @@ -205,8 +208,16 @@ type Database interface {
// CreateSnapshotInfo stores the snapshot of the given document.
CreateSnapshotInfo(ctx context.Context, docID types.ID, doc *document.InternalDocument) error

// FindSnapshotInfoByID returns the snapshot by the given id.
FindSnapshotInfoByID(ctx context.Context, id types.ID) (*SnapshotInfo, error)

// FindClosestSnapshotInfo finds the closest snapshot info in a given serverSeq.
FindClosestSnapshotInfo(ctx context.Context, docID types.ID, serverSeq int64) (*SnapshotInfo, error)
FindClosestSnapshotInfo(
ctx context.Context,
docID types.ID,
serverSeq int64,
includeSnapshot bool,
) (*SnapshotInfo, error)

// FindMinSyncedSeqInfo finds the minimum synced sequence info.
FindMinSyncedSeqInfo(ctx context.Context, docID types.ID) (*SyncedSeqInfo, error)
Expand Down
27 changes: 26 additions & 1 deletion server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,11 +969,27 @@ func (d *DB) CreateSnapshotInfo(
return nil
}

// FindSnapshotInfoByID returns the snapshot by the given id.
func (d *DB) FindSnapshotInfoByID(ctx context.Context, id types.ID) (*database.SnapshotInfo, error) {
txn := d.db.Txn(false)
defer txn.Abort()
raw, err := txn.First(tblSnapshots, "id", id.String())
if err != nil {
return nil, fmt.Errorf("find snapshot by id: %w", err)
}
if raw == nil {
return nil, fmt.Errorf("%s: %w", id, database.ErrSnapshotNotFound)
}

return raw.(*database.SnapshotInfo).DeepCopy(), nil
}

// FindClosestSnapshotInfo finds the last snapshot of the given document.
func (d *DB) FindClosestSnapshotInfo(
ctx context.Context,
docID types.ID,
serverSeq int64,
includeSnapshot bool,
) (*database.SnapshotInfo, error) {
txn := d.db.Txn(false)
defer txn.Abort()
Expand All @@ -992,7 +1008,16 @@ func (d *DB) FindClosestSnapshotInfo(
for raw := iterator.Next(); raw != nil; raw = iterator.Next() {
info := raw.(*database.SnapshotInfo)
if info.DocID == docID {
snapshotInfo = info
snapshotInfo = &database.SnapshotInfo{
ID: info.ID,
DocID: info.DocID,
ServerSeq: info.ServerSeq,
Lamport: info.Lamport,
CreatedAt: info.CreatedAt,
}
if includeSnapshot {
snapshotInfo.Snapshot = info.Snapshot
}
break
}
}
Expand Down
42 changes: 39 additions & 3 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,25 +1049,61 @@ func (c *Client) CreateSnapshotInfo(
return nil
}

// FindSnapshotInfoByID returns the snapshot by the given id.
func (c *Client) FindSnapshotInfoByID(
ctx context.Context,
id types.ID,
) (*database.SnapshotInfo, error) {
encodedID, err := encodeID(id)
if err != nil {
return nil, err
}

result := c.collection(colSnapshots).FindOne(ctx, bson.M{
"_id": encodedID,
})

snapshotInfo := &database.SnapshotInfo{}
if result.Err() == mongo.ErrNoDocuments {
return snapshotInfo, nil
}
if result.Err() != nil {
return nil, fmt.Errorf("find snapshot: %w", result.Err())
}

if err := result.Decode(snapshotInfo); err != nil {
return nil, fmt.Errorf("decode snapshot: %w", err)
}

return snapshotInfo, nil
}

// FindClosestSnapshotInfo finds the last snapshot of the given document.
func (c *Client) FindClosestSnapshotInfo(
ctx context.Context,
docID types.ID,
serverSeq int64,
includeSnapshot bool,
) (*database.SnapshotInfo, error) {
encodedDocID, err := encodeID(docID)
if err != nil {
return nil, err
}

option := options.FindOne().SetSort(bson.M{
"server_seq": -1,
})

if !includeSnapshot {
option.SetProjection(bson.M{"Snapshot": 0})
}

result := c.collection(colSnapshots).FindOne(ctx, bson.M{
"doc_id": encodedDocID,
"server_seq": bson.M{
"$lte": serverSeq,
},
}, options.FindOne().SetSort(bson.M{
"server_seq": -1,
}))
}, option)

snapshotInfo := &database.SnapshotInfo{}
if result.Err() == mongo.ErrNoDocuments {
Expand Down
16 changes: 16 additions & 0 deletions server/backend/database/snapshot_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,19 @@ type SnapshotInfo struct {
// CreatedAt is the time when the snapshot is created.
CreatedAt time.Time `bson:"created_at"`
}

// DeepCopy returns a deep copy of the SnapshotInfo.
func (i *SnapshotInfo) DeepCopy() *SnapshotInfo {
if i == nil {
return nil
}

return &SnapshotInfo{
ID: i.ID,
DocID: i.DocID,
ServerSeq: i.ServerSeq,
Lamport: i.Lamport,
Snapshot: i.Snapshot,
CreatedAt: i.CreatedAt,
}
}
8 changes: 4 additions & 4 deletions server/backend/database/testcases/testcases.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,25 @@ func RunFindClosestSnapshotInfoTest(t *testing.T, db database.Database, projectI
}))

assert.NoError(t, db.CreateSnapshotInfo(ctx, docInfo.ID, doc.InternalDocument()))
snapshot, err := db.FindClosestSnapshotInfo(ctx, docInfo.ID, change.MaxCheckpoint.ServerSeq)
snapshot, err := db.FindClosestSnapshotInfo(ctx, docInfo.ID, change.MaxCheckpoint.ServerSeq, true)
assert.NoError(t, err)
assert.Equal(t, int64(0), snapshot.ServerSeq)

pack := change.NewPack(doc.Key(), doc.Checkpoint().NextServerSeq(1), nil, nil)
assert.NoError(t, doc.ApplyChangePack(pack))
assert.NoError(t, db.CreateSnapshotInfo(ctx, docInfo.ID, doc.InternalDocument()))
snapshot, err = db.FindClosestSnapshotInfo(ctx, docInfo.ID, change.MaxCheckpoint.ServerSeq)
snapshot, err = db.FindClosestSnapshotInfo(ctx, docInfo.ID, change.MaxCheckpoint.ServerSeq, true)
assert.NoError(t, err)
assert.Equal(t, int64(1), snapshot.ServerSeq)

pack = change.NewPack(doc.Key(), doc.Checkpoint().NextServerSeq(2), nil, nil)
assert.NoError(t, doc.ApplyChangePack(pack))
assert.NoError(t, db.CreateSnapshotInfo(ctx, docInfo.ID, doc.InternalDocument()))
snapshot, err = db.FindClosestSnapshotInfo(ctx, docInfo.ID, change.MaxCheckpoint.ServerSeq)
snapshot, err = db.FindClosestSnapshotInfo(ctx, docInfo.ID, change.MaxCheckpoint.ServerSeq, true)
assert.NoError(t, err)
assert.Equal(t, int64(2), snapshot.ServerSeq)

snapshot, err = db.FindClosestSnapshotInfo(ctx, docInfo.ID, 1)
snapshot, err = db.FindClosestSnapshotInfo(ctx, docInfo.ID, 1, true)
assert.NoError(t, err)
assert.Equal(t, int64(1), snapshot.ServerSeq)
})
Expand Down
1 change: 1 addition & 0 deletions server/packs/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func FindChanges(
snapshotInfo, err := be.DB.FindClosestSnapshotInfo(
ctx, docInfo.ID,
minSyncedSeqInfo.ServerSeq+be.Config.SnapshotInterval,
false,
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func BuildDocumentForServerSeq(
docInfo *database.DocInfo,
serverSeq int64,
) (*document.InternalDocument, error) {
snapshotInfo, err := be.DB.FindClosestSnapshotInfo(ctx, docInfo.ID, serverSeq)
snapshotInfo, err := be.DB.FindClosestSnapshotInfo(ctx, docInfo.ID, serverSeq, true)
if err != nil {
return nil, err
}
Expand Down
19 changes: 13 additions & 6 deletions server/packs/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,38 @@ func storeSnapshot(
docInfo *database.DocInfo,
minSyncedTicket *time.Ticket,
) error {
// 01. get the closest snapshot of this docInfo
// TODO: For performance issue, we only need to read the snapshot's metadata.
snapshotInfo, err := be.DB.FindClosestSnapshotInfo(ctx, docInfo.ID, docInfo.ServerSeq)
// 01. get the closest snapshot's metadata of this docInfo
snapshotMetadata, err := be.DB.FindClosestSnapshotInfo(ctx, docInfo.ID, docInfo.ServerSeq, false)
if err != nil {
return err
}
if snapshotInfo.ServerSeq == docInfo.ServerSeq {
if snapshotMetadata.ServerSeq == docInfo.ServerSeq {
return nil
}
if docInfo.ServerSeq-snapshotInfo.ServerSeq < be.Config.SnapshotInterval {
if docInfo.ServerSeq-snapshotMetadata.ServerSeq < be.Config.SnapshotInterval {
return nil
}

// 02. retrieve the changes between last snapshot and current docInfo
changes, err := be.DB.FindChangesBetweenServerSeqs(
ctx,
docInfo.ID,
snapshotInfo.ServerSeq+1,
snapshotMetadata.ServerSeq+1,
docInfo.ServerSeq,
)
if err != nil {
return err
}

// 03. create document instance of the docInfo
snapshotInfo := snapshotMetadata
if snapshotMetadata.ID != "" {
snapshotInfo, err = be.DB.FindSnapshotInfoByID(ctx, snapshotInfo.ID)
if err != nil {
return err
}
}

doc, err := document.NewInternalDocumentFromSnapshot(
docInfo.Key,
snapshotInfo.ServerSeq,
Expand Down
Loading