diff --git a/server/storage_index.go b/server/storage_index.go index cb126ed9c4..aa62cbb426 100644 --- a/server/storage_index.go +++ b/server/storage_index.go @@ -20,6 +20,8 @@ import ( "encoding/json" "errors" "fmt" + "time" + "github.com/blugelabs/bluge" "github.com/blugelabs/bluge/index" "github.com/blugelabs/bluge/search" @@ -28,12 +30,11 @@ import ( "github.com/jackc/pgtype" "go.uber.org/zap" "google.golang.org/protobuf/types/known/wrapperspb" - "time" ) type StorageIndex interface { - Write(ctx context.Context, objects StorageOpWrites) (map[string]StorageOpWrites, map[string]StorageOpDeletes) - Delete(ctx context.Context, objects StorageOpDeletes) map[string]StorageOpDeletes + Write(ctx context.Context, objects StorageOpWrites) (creates int, deletes int) + Delete(ctx context.Context, objects StorageOpDeletes) (deletes int) List(ctx context.Context, indexName, query string, limit int) (*api.StorageObjects, error) Load(ctx context.Context) error CreateIndex(ctx context.Context, name, collection, key string, fields []string, maxEntries int) error @@ -58,7 +59,7 @@ type LocalStorageIndex struct { } func NewLocalStorageIndex(logger *zap.Logger, db *sql.DB) (StorageIndex, error) { - lsc := &LocalStorageIndex{ + si := &LocalStorageIndex{ logger: logger, db: db, indexByName: make(map[string]*storageIndex), @@ -66,17 +67,15 @@ func NewLocalStorageIndex(logger *zap.Logger, db *sql.DB) (StorageIndex, error) customFilterFunctions: make(map[string]RuntimeStorageIndexFilterFunction), } - return lsc, nil + return si, nil } -func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpWrites) (updates map[string]StorageOpWrites, deletes map[string]StorageOpDeletes) { +func (si *LocalStorageIndex) Write(ctx context.Context, objects StorageOpWrites) (updates int, deletes int) { batches := make(map[*storageIndex]*index.Batch, 0) - updates = make(map[string]StorageOpWrites, 0) - deletes = make(map[string]StorageOpDeletes, 0) updateTime := time.Now() - for _, so := range storageWrites { + for _, so := range objects { indices, found := si.indicesByCollection[so.Object.Collection] if !found { continue @@ -102,25 +101,7 @@ func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpW docId := si.storageIndexDocumentId(so.Object.Collection, so.Object.Key, so.OwnerID) batch.Delete(docId) - if ds, ok := deletes[idx.Name]; ok { - deletes[idx.Name] = append(ds, &StorageOpDelete{ - OwnerID: so.OwnerID, - ObjectID: &api.DeleteStorageObjectId{ - Collection: so.Object.Collection, - Key: so.Object.Key, - // Blank Version as it is irrelevant for storage index deletes. - }, - }) - } else { - deletes[idx.Name] = StorageOpDeletes{&StorageOpDelete{ - OwnerID: so.OwnerID, - ObjectID: &api.DeleteStorageObjectId{ - Collection: so.Object.Collection, - Key: so.Object.Key, - // Blank Version as it is irrelevant for storage index deletes. - }, - }} - } + deletes++ continue } @@ -138,11 +119,7 @@ func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpW batch.Update(doc.ID(), doc) - if u, ok := updates[idx.Name]; ok { - updates[idx.Name] = append(u, so) - } else { - updates[idx.Name] = StorageOpWrites{so} - } + updates++ } } } @@ -191,14 +168,13 @@ func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpW return updates, deletes } -func (si *LocalStorageIndex) Delete(ctx context.Context, deletes StorageOpDeletes) (ops map[string]StorageOpDeletes) { +func (si *LocalStorageIndex) Delete(ctx context.Context, objects StorageOpDeletes) (deletes int) { batches := make(map[*storageIndex]*index.Batch, 0) - ops = make(map[string]StorageOpDeletes) - for _, d := range deletes { + for _, d := range objects { indices, found := si.indicesByCollection[d.ObjectID.Collection] if !found { - return ops + continue } for _, idx := range indices { @@ -211,11 +187,7 @@ func (si *LocalStorageIndex) Delete(ctx context.Context, deletes StorageOpDelete docId := si.storageIndexDocumentId(d.ObjectID.Collection, d.ObjectID.Key, d.OwnerID) batch.Delete(docId) - if dels, ok := ops[idx.Name]; ok { - ops[idx.Name] = append(dels, d) - } else { - ops[idx.Name] = StorageOpDeletes{d} - } + deletes++ } } @@ -226,7 +198,7 @@ func (si *LocalStorageIndex) Delete(ctx context.Context, deletes StorageOpDelete } } - return ops + return deletes } func (si *LocalStorageIndex) List(ctx context.Context, indexName, query string, limit int) (*api.StorageObjects, error) { @@ -410,7 +382,7 @@ LIMIT $2` return nil } -func (sc *LocalStorageIndex) mapIndexStorageFields(userID, collection, key, version, value string, filters []string, updateTime time.Time) (*bluge.Document, error) { +func (si *LocalStorageIndex) mapIndexStorageFields(userID, collection, key, version, value string, filters []string, updateTime time.Time) (*bluge.Document, error) { if collection == "" || key == "" || userID == "" { return nil, errors.New("insufficient fields to create index document id") } @@ -435,7 +407,7 @@ func (sc *LocalStorageIndex) mapIndexStorageFields(userID, collection, key, vers return nil, nil } - rv := bluge.NewDocument(string(sc.storageIndexDocumentId(collection, key, userID))) + rv := bluge.NewDocument(string(si.storageIndexDocumentId(collection, key, userID))) rv.AddField(bluge.NewDateTimeField("update_time", updateTime).StoreValue().Sortable()) rv.AddField(bluge.NewKeywordField("collection", collection).StoreValue()) rv.AddField(bluge.NewKeywordField("key", key).StoreValue())