Skip to content

Commit

Permalink
Add iteration to firestore backend
Browse files Browse the repository at this point in the history
  • Loading branch information
rosstimothy committed Feb 8, 2025
1 parent 3a0f9b8 commit c1cd47a
Showing 1 changed file with 238 additions and 29 deletions.
267 changes: 238 additions & 29 deletions lib/backend/firestore/firestorebk.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"encoding/base64"
"errors"
"iter"
"log/slog"
"strconv"
"strings"
Expand All @@ -34,6 +35,7 @@ import (
"cloud.google.com/go/firestore/apiv1/admin/adminpb"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -541,45 +543,252 @@ func (b *Backend) getRangeDocs(ctx context.Context, startKey, endKey backend.Key
return allDocs, nil
}

// GetRange returns range of elements
func (b *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
docSnaps, err := b.getRangeDocs(ctx, startKey, endKey, limit)
if err != nil {
return nil, trace.Wrap(err)
func (b *Backend) iterateDocs(ctx context.Context, startKey, endKey backend.Key, limit int, order backend.IterationOrder) (iter.Seq2[*firestore.DocumentSnapshot, error], error) {
if startKey.IsZero() {
return nil, trace.BadParameter("missing parameter startKey")
}
if endKey.IsZero() {
return nil, trace.BadParameter("missing parameter endKey")
}
if limit <= 0 {
limit = backend.DefaultRangeLimit
}
var values []backend.Item
for _, docSnap := range docSnaps {
r, err := newRecordFromDoc(docSnap)
if err != nil {
return nil, trace.Wrap(err)
}

if r.isExpired(b.clock.Now()) {
if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition {
// If the document has been updated, then attempt one additional get to see if the
// resource was updated and is no longer expired.
docSnap, err := b.svc.Collection(b.CollectionName).
Doc(docSnap.Ref.ID).
Get(ctx)
if err != nil {
return nil, ConvertGRPCError(err)
}
sort := firestore.Asc
if order == backend.IterateDescend {
sort = firestore.Desc
}

return func(yield func(*firestore.DocumentSnapshot, error) bool) {
docsIter := newDocIter(b.svc.Collection(b.CollectionName).
Where(keyDocProperty, ">=", []byte(startKey.String())).
Where(keyDocProperty, "<=", []byte(endKey.String())).
Limit(limit).
OrderBy(keyDocProperty, sort).
Documents(ctx))

legacyDocsIter := newDocIter(b.svc.Collection(b.CollectionName).
Where(keyDocProperty, ">=", startKey.String()).
Where(keyDocProperty, "<=", endKey.String()).
Limit(limit).
OrderBy(keyDocProperty, sort).
Documents(ctx))

brokenDocsIter := newDocIter(b.svc.Collection(b.CollectionName).
Where(keyDocProperty, ">=", brokenKey(startKey.String())).
Where(keyDocProperty, "<=", brokenKey(endKey.String())).
Limit(limit).
OrderBy(keyDocProperty, sort).
Documents(ctx))

defer func() {
docsIter.stop()
legacyDocsIter.stop()
brokenDocsIter.stop()
}()

for {
docSnap, docSnapErr := docsIter.next()
legacySnap, legacySnapErr := legacyDocsIter.next()
brokenSnap, brokenSnapErr := brokenDocsIter.next()

// All items have been exhausted.
if errors.Is(docSnapErr, iterator.Done) &&
errors.Is(legacySnapErr, iterator.Done) &&
errors.Is(brokenSnapErr, iterator.Done) {
return
}

// All iterators failed.
if docSnapErr != nil && legacySnapErr != nil && brokenSnapErr != nil {
yield(nil, trace.NewAggregate(docSnapErr, legacySnapErr, brokenSnapErr))
return
}

// Find the iterator with the next key in the sequence.
var docKey, legacyKey, brokenKey []byte
if docSnap != nil {
r, err := newRecordFromDoc(docSnap)
if err != nil {
return nil, trace.Wrap(err)
if err == nil {
docKey = r.Key
}
}

if !r.isExpired(b.clock.Now()) {
values = append(values, r.backendItem())
if legacySnap != nil {
r, err := newRecordFromDoc(legacySnap)
if err == nil {
legacyKey = r.Key
}
}
// Do not include this document in the results.
continue

if brokenSnap != nil {
r, err := newRecordFromDoc(brokenSnap)
if err == nil {
brokenKey = r.Key
}
}

compareKeys := func(key, other1, other2 []byte) bool {
valid := -1
if order == backend.IterateDescend {
valid = 1
}

switch {
case len(key) == 0:
return false
case len(other1) == 0 && len(other2) == 0:
return true
case len(other1) == 0:
return bytes.Compare(key, other2) == valid
case len(other2) == 0:
return bytes.Compare(key, other1) == valid
default:
return bytes.Compare(key, other1) == valid && bytes.Compare(key, other2) == valid
}
}

switch {
case compareKeys(docKey, legacyKey, brokenKey):
docsIter.consume()
if !yield(docSnap, nil) {
return
}
case compareKeys(legacyKey, docKey, brokenKey):
legacyDocsIter.consume()
if !yield(legacySnap, nil) {
return
}
case compareKeys(brokenKey, legacyKey, docKey):
brokenDocsIter.consume()
if !yield(brokenSnap, nil) {
return
}
default:
yield(nil, errors.New("no valid snapshots found"))
return
}
}
}, nil
}

values = append(values, r.backendItem())
type docIter struct {
iter *firestore.DocumentIterator
snap *firestore.DocumentSnapshot
err error
}

func newDocIter(iter *firestore.DocumentIterator) *docIter {
return &docIter{iter: iter}
}

func (d *docIter) next() (*firestore.DocumentSnapshot, error) {
if d.snap == nil && d.err == nil {
d.snap, d.err = d.iter.Next()
}

return d.snap, d.err
}

func (d *docIter) consume() {
d.snap, d.err = nil, nil
}

func (d *docIter) stop() {
d.snap, d.err = nil, nil
d.iter.Stop()
}

func (b *Backend) Iterate(ctx context.Context, startKey, endKey backend.Key, limit int, order backend.IterationOrder) (iter.Seq2[backend.Item, error], error) {
if startKey.IsZero() {
return nil, trace.BadParameter("missing parameter startKey")
}
if endKey.IsZero() {
return nil, trace.BadParameter("missing parameter endKey")
}

return func(yield func(backend.Item, error) bool) {
count := 0
iter, err := b.iterateDocs(ctx, startKey, endKey, limit, order)
if err != nil {
yield(backend.Item{}, trace.Wrap(err))
return
}

for docSnap, err := range iter {
if err != nil {
yield(backend.Item{}, trace.Wrap(err))
return
}

r, err := newRecordFromDoc(docSnap)
if err != nil {
yield(backend.Item{}, trace.Wrap(err))
return
}

if r.isExpired(b.clock.Now()) {
if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition {
// If the document has been updated, then attempt one additional get to see if the
// resource was updated and is no longer expired.
docSnap, err := b.svc.Collection(b.CollectionName).
Doc(docSnap.Ref.ID).
Get(ctx)
if err != nil {
yield(backend.Item{}, trace.Wrap(err))
return
}

r, err = newRecordFromDoc(docSnap)
if err != nil {
yield(backend.Item{}, trace.Wrap(err))
return
}

if r.isExpired(b.clock.Now()) {
continue
}
}
}

if !yield(r.backendItem(), nil) {
return
}
count++

if limit != backend.NoLimit && count >= limit {
return
}
}
}, nil
}

// GetRange returns range of elements
func (b *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
if startKey.IsZero() {
return nil, trace.BadParameter("missing parameter startKey")
}
if endKey.IsZero() {
return nil, trace.BadParameter("missing parameter endKey")
}

if limit <= 0 {
limit = backend.DefaultRangeLimit
}

iter, err := b.Iterate(ctx, startKey, endKey, limit, backend.IterateAscend)
if err != nil {
return nil, trace.Wrap(err)
}

var result backend.GetResult
for item, err := range iter {
if err != nil {
return nil, trace.Wrap(err)
}
result.Items = append(result.Items, item)
}
return &backend.GetResult{Items: values}, nil
return &result, nil
}

// DeleteRange deletes range of items with keys between startKey and endKey
Expand Down

0 comments on commit c1cd47a

Please sign in to comment.