Skip to content

Commit

Permalink
Update interface, improve storage object deletion handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Jul 18, 2023
1 parent f4709ce commit 793baf0
Showing 1 changed file with 17 additions and 45 deletions.
62 changes: 17 additions & 45 deletions server/storage_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/blugelabs/bluge"
"github.com/blugelabs/bluge/index"
"github.com/blugelabs/bluge/search"
Expand All @@ -28,12 +30,11 @@ import (
"github.com/jackc/pgtype"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/wrapperspb"
"time"
)

type StorageIndex interface {
Write(ctx context.Context, objects StorageOpWrites) (map[string]StorageOpWrites, map[string]StorageOpDeletes)
Delete(ctx context.Context, objects StorageOpDeletes) map[string]StorageOpDeletes
Write(ctx context.Context, objects StorageOpWrites) (creates int, deletes int)
Delete(ctx context.Context, objects StorageOpDeletes) (deletes int)
List(ctx context.Context, indexName, query string, limit int) (*api.StorageObjects, error)
Load(ctx context.Context) error
CreateIndex(ctx context.Context, name, collection, key string, fields []string, maxEntries int) error
Expand All @@ -58,25 +59,23 @@ type LocalStorageIndex struct {
}

func NewLocalStorageIndex(logger *zap.Logger, db *sql.DB) (StorageIndex, error) {
lsc := &LocalStorageIndex{
si := &LocalStorageIndex{
logger: logger,
db: db,
indexByName: make(map[string]*storageIndex),
indicesByCollection: make(map[string][]*storageIndex),
customFilterFunctions: make(map[string]RuntimeStorageIndexFilterFunction),
}

return lsc, nil
return si, nil
}

func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpWrites) (updates map[string]StorageOpWrites, deletes map[string]StorageOpDeletes) {
func (si *LocalStorageIndex) Write(ctx context.Context, objects StorageOpWrites) (updates int, deletes int) {
batches := make(map[*storageIndex]*index.Batch, 0)
updates = make(map[string]StorageOpWrites, 0)
deletes = make(map[string]StorageOpDeletes, 0)

updateTime := time.Now()

for _, so := range storageWrites {
for _, so := range objects {
indices, found := si.indicesByCollection[so.Object.Collection]
if !found {
continue
Expand All @@ -102,25 +101,7 @@ func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpW
docId := si.storageIndexDocumentId(so.Object.Collection, so.Object.Key, so.OwnerID)
batch.Delete(docId)

if ds, ok := deletes[idx.Name]; ok {
deletes[idx.Name] = append(ds, &StorageOpDelete{
OwnerID: so.OwnerID,
ObjectID: &api.DeleteStorageObjectId{
Collection: so.Object.Collection,
Key: so.Object.Key,
// Blank Version as it is irrelevant for storage index deletes.
},
})
} else {
deletes[idx.Name] = StorageOpDeletes{&StorageOpDelete{
OwnerID: so.OwnerID,
ObjectID: &api.DeleteStorageObjectId{
Collection: so.Object.Collection,
Key: so.Object.Key,
// Blank Version as it is irrelevant for storage index deletes.
},
}}
}
deletes++

continue
}
Expand All @@ -138,11 +119,7 @@ func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpW

batch.Update(doc.ID(), doc)

if u, ok := updates[idx.Name]; ok {
updates[idx.Name] = append(u, so)
} else {
updates[idx.Name] = StorageOpWrites{so}
}
updates++
}
}
}
Expand Down Expand Up @@ -191,14 +168,13 @@ func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpW
return updates, deletes
}

func (si *LocalStorageIndex) Delete(ctx context.Context, deletes StorageOpDeletes) (ops map[string]StorageOpDeletes) {
func (si *LocalStorageIndex) Delete(ctx context.Context, objects StorageOpDeletes) (deletes int) {
batches := make(map[*storageIndex]*index.Batch, 0)
ops = make(map[string]StorageOpDeletes)

for _, d := range deletes {
for _, d := range objects {
indices, found := si.indicesByCollection[d.ObjectID.Collection]
if !found {
return ops
continue
}

for _, idx := range indices {
Expand All @@ -211,11 +187,7 @@ func (si *LocalStorageIndex) Delete(ctx context.Context, deletes StorageOpDelete
docId := si.storageIndexDocumentId(d.ObjectID.Collection, d.ObjectID.Key, d.OwnerID)
batch.Delete(docId)

if dels, ok := ops[idx.Name]; ok {
ops[idx.Name] = append(dels, d)
} else {
ops[idx.Name] = StorageOpDeletes{d}
}
deletes++
}
}

Expand All @@ -226,7 +198,7 @@ func (si *LocalStorageIndex) Delete(ctx context.Context, deletes StorageOpDelete
}
}

return ops
return deletes
}

func (si *LocalStorageIndex) List(ctx context.Context, indexName, query string, limit int) (*api.StorageObjects, error) {
Expand Down Expand Up @@ -410,7 +382,7 @@ LIMIT $2`
return nil
}

func (sc *LocalStorageIndex) mapIndexStorageFields(userID, collection, key, version, value string, filters []string, updateTime time.Time) (*bluge.Document, error) {
func (si *LocalStorageIndex) mapIndexStorageFields(userID, collection, key, version, value string, filters []string, updateTime time.Time) (*bluge.Document, error) {
if collection == "" || key == "" || userID == "" {
return nil, errors.New("insufficient fields to create index document id")
}
Expand All @@ -435,7 +407,7 @@ func (sc *LocalStorageIndex) mapIndexStorageFields(userID, collection, key, vers
return nil, nil
}

rv := bluge.NewDocument(string(sc.storageIndexDocumentId(collection, key, userID)))
rv := bluge.NewDocument(string(si.storageIndexDocumentId(collection, key, userID)))
rv.AddField(bluge.NewDateTimeField("update_time", updateTime).StoreValue().Sortable())
rv.AddField(bluge.NewKeywordField("collection", collection).StoreValue())
rv.AddField(bluge.NewKeywordField("key", key).StoreValue())
Expand Down

0 comments on commit 793baf0

Please sign in to comment.