diff --git a/server/core_storage.go b/server/core_storage.go index 9ab2839768..10d9291018 100644 --- a/server/core_storage.go +++ b/server/core_storage.go @@ -517,6 +517,11 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, me } return writeErr } + + for i, ack := range acks { + ops[i].Object.Version = ack.Version + } + storageIndex.Write(ctx, ops) return nil }); err != nil { if e, ok := err.(*statusError); ok { @@ -526,8 +531,6 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, me return nil, codes.Internal, err } - storageIndex.Write(ctx, ops) - return &api.StorageObjectAcks{Acks: acks}, codes.OK, nil } @@ -658,7 +661,7 @@ func storagePrepBatch(batch *pgx.Batch, authoritativeWrite bool, op *StorageOpWr WITH upd AS ( INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now()) - ON CONFLICT (collection, key, user_id) DO + ON CONFLICT (collection, key, user_id) DO UPDATE SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE TRUE` + writeCheck + ` AND NOT (storage.version = $5 AND storage.read = $6 AND storage.write = $7) -- micro optimization: don't update row unnecessary @@ -725,6 +728,7 @@ func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, s return StatusError(codes.InvalidArgument, "Storage delete rejected.", errors.New("Storage delete rejected - not found, version check failed, or permission denied.")) } } + storageIndex.Delete(ctx, ops) return nil }); err != nil { if e, ok := err.(*statusError); ok { @@ -734,7 +738,5 @@ func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, s return codes.Internal, err } - storageIndex.Delete(ctx, ops) - return codes.OK, nil }