Skip to content

Commit

Permalink
storage: optimize multiple objects write
Browse files Browse the repository at this point in the history
Previously multiple objects write required 2 round trips
to DB per object: one to fetch object state from DB and second
to update object. Each query is indexed and fast, but
with latency to DB comparable to query execution time it adds
significant overhead.

This change optimizes multiple objects write in 2 steps:

1. Instead of reading DB state for each object and then deciding
   on possible write operation, perform write operation unconditionally
   with correct predicates (version and permissions) defined where
   applicable. That said write operation might not succeed if row doesn't
   match predicate. Write query is structured in such way, that
   final state of the row in the database is returned, regardless
   whether writeop successed or not. By inspecting returned row we can
   infer whether it was success, version conflict or permission error.

2. Now that each object is written to DB in a single query, there
   is no dependencies between queries and all of them can be blasted
   to DB in a batch without waiting for result of each. Whole batch
   continues to be executed in a single transaction, so outcome is
   the same, but batching negates latency penalty.
  • Loading branch information
redbaron committed Jul 16, 2023
1 parent e113dec commit 823efaf
Showing 1 changed file with 147 additions and 110 deletions.
257 changes: 147 additions & 110 deletions server/core_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"database/sql"
"encoding/base64"
"encoding/gob"
"encoding/hex"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -50,6 +51,28 @@ type StorageOpWrite struct {
Object *api.WriteStorageObject
}

// Desired `read` persmission after this Op completes
func (op *StorageOpWrite) permissionRead() int32 {
if op.Object.PermissionRead != nil {
return op.Object.PermissionRead.Value
}
return 1
}

// Desired `write` persmission after this Op completes
func (op *StorageOpWrite) permissionWrite() int32 {
if op.Object.PermissionWrite != nil {
return op.Object.PermissionWrite.Value
}
return 1
}

// Expected object version after this Op completes
func (op *StorageOpWrite) expectedVersion() string {
hash := md5.Sum([]byte(op.Object.Value))
return hex.EncodeToString(hash[:])
}

func (s StorageOpWrites) Len() int {
return len(s)
}
Expand Down Expand Up @@ -473,6 +496,12 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, me
var writeErr error
acks, writeErr = storageWriteObjects(ctx, logger, metrics, tx, authoritativeWrite, ops)
if writeErr != nil {
if writeErr == runtime.ErrStorageRejectedVersion || writeErr == runtime.ErrStorageRejectedPermission {
logger.Debug("Error writing storage objects.", zap.Error(writeErr))
return StatusError(codes.InvalidArgument, "Storage write rejected.", writeErr)
} else {
logger.Error("Error writing storage objects.", zap.Error(writeErr))
}
return writeErr
}
return nil
Expand All @@ -497,143 +526,151 @@ func storageWriteObjects(ctx context.Context, logger *zap.Logger, metrics Metric
indexedOps[op] = i
}
sort.Sort(sortedOps)

// Run operations in the sorted order.
acks := make([]*api.StorageObjectAck, ops.Len())
for _, op := range sortedOps {
ack, writeErr := storageWriteObject(ctx, logger, metrics, tx, authoritativeWrite, op.OwnerID, op.Object)
if writeErr != nil {
if writeErr == runtime.ErrStorageRejectedVersion || writeErr == runtime.ErrStorageRejectedPermission {
return nil, StatusError(codes.InvalidArgument, "Storage write rejected.", writeErr)
}

logger.Debug("Error writing storage objects.", zap.Error(writeErr))
return nil, writeErr
}
// Acks are returned in the original order.
acks[indexedOps[op]] = ack
batch := &pgx.Batch{}
for _, op := range sortedOps {
storagePrepBatch(batch, authoritativeWrite, op)
}
return acks, nil
}

func storageWriteObject(ctx context.Context, logger *zap.Logger, metrics Metrics, tx pgx.Tx, authoritativeWrite bool, ownerID string, object *api.WriteStorageObject) (*api.StorageObjectAck, error) {
var dbVersion sql.NullString
var dbPermissionWrite sql.NullInt64
var dbPermissionRead sql.NullInt64
err := tx.QueryRow(ctx, "SELECT version, read, write FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3", object.Collection, object.Key, ownerID).Scan(&dbVersion, &dbPermissionRead, &dbPermissionWrite)
if err != nil {
if err == sql.ErrNoRows {
if object.Version != "" && object.Version != "*" {
// Conditional write with a specific version but the object did not exist at all.
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1)
br := tx.SendBatch(ctx, batch)
defer br.Close() // TODO: need to "drain" batch, otherwise it logs all unprocessed queries
for _, op := range sortedOps {
object := op.Object
var resultRead int32
var resultWrite int32
var resultVersion string
err := br.QueryRow().Scan(&resultRead, &resultWrite, &resultVersion)
var pgErr *pgconn.PgError
if err != nil && errors.As(err, &pgErr) {
if pgErr.Code == dbErrorUniqueViolation {
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "version"}, 1)
return nil, runtime.ErrStorageRejectedVersion
}
} else {
logger.Debug("Error in write storage object pre-flight.", zap.Any("object", object), zap.Error(err))
return nil, err
} else if err == pgx.ErrNoRows {
// Not every case from storagePrepWriteObject can return NoRows, but those
// which do it is always ErrStorageRejectedVersion
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "version"}, 1)
return nil, runtime.ErrStorageRejectedVersion
} else if err != nil {
return nil, err
}
}

if dbVersion.Valid && (object.Version == "*" || (object.Version != "" && object.Version != dbVersion.String)) {
// An object existed, and it's a conditional write that either:
// - Expects no object.
// - Or expects a given version, but it does not match.
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1)
return nil, runtime.ErrStorageRejectedVersion
}

if dbPermissionWrite.Valid && dbPermissionWrite.Int64 == 0 && !authoritativeWrite {
// Non-authoritative write to an existing storage object with permission 0.
return nil, runtime.ErrStorageRejectedPermission
}

newVersion := fmt.Sprintf("%x", md5.Sum([]byte(object.Value)))
newPermissionRead := int32(1)
if object.PermissionRead != nil {
newPermissionRead = object.PermissionRead.Value
}
newPermissionWrite := int32(1)
if object.PermissionWrite != nil {
newPermissionWrite = object.PermissionWrite.Value
}

if dbVersion.Valid && dbVersion.String == newVersion && dbPermissionRead.Int64 == int64(newPermissionRead) && dbPermissionWrite.Int64 == int64(newPermissionWrite) {
// Stored object existed, and exactly matches the new object's version and read/write permissions.
if !(op.permissionRead() == resultRead &&
op.permissionWrite() == resultWrite &&
op.expectedVersion() == resultVersion) {
// Write failed, it can happen for 3 reasons:
// - constraint violation on insert (handles elsewhere)
// - permission: non authoritative write & original row write != 1
// - version mismatch
if !authoritativeWrite && resultWrite != 1 {
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "permission"}, 1)
return nil, runtime.ErrStorageRejectedPermission
} else {
// version check failed
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "version"}, 1)
return nil, runtime.ErrStorageRejectedVersion
}
}
ack := &api.StorageObjectAck{
Collection: object.Collection,
Key: object.Key,
Version: newVersion,
}
if ownerID != uuid.Nil.String() {
ack.UserId = ownerID
Version: resultVersion,
UserId: op.OwnerID,
}
return ack, nil
acks[indexedOps[op]] = ack
}

return acks, nil
}

func storagePrepBatch(batch *pgx.Batch, authoritativeWrite bool, op *StorageOpWrite) {
object := op.Object
ownerID := op.OwnerID

newVersion := op.expectedVersion()
newPermissionRead := op.permissionRead()
newPermissionWrite := op.permissionWrite()

params := []interface{}{object.Collection, object.Key, ownerID, object.Value, newVersion, newPermissionRead, newPermissionWrite}
var query string

writeCheck := ""
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
writeCheck = " AND storage.write = 1"
}

switch {
case object.Version != "" && object.Version != "*":
// OCC if-match.
query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8"

// Query pattern
// (UPDATE t ... RETURNING) UNION ALL (SELECT FROM t) LIMIT 1
// allows us to fetch row state after update even if update itself fails WHERE
// condition.
// That is returned values are final state of the row regardless of UPDATE success
query = `
WITH upd AS (
UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now()
WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8
` + writeCheck + `
AND NOT (storage.version = $5 AND storage.read = $6 AND storage.write = $7) -- micro optimization: don't update row unnecessary
RETURNING read, write, version
)
(SELECT read, write, version from upd)
UNION ALL
(SELECT read, write, version FROM storage WHERE collection = $1 and key = $2 and user_id = $3)
LIMIT 1`

params = append(params, object.Version)
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
query += " AND write = 1"
}
case dbVersion.Valid && object.Version == "":
// An existing storage object was present, but no OCC of any kind is specified.
query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID"
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
query += " AND write = 1"
}
case !dbVersion.Valid && object.Version == "":
// An existing storage object was not present, and no OCC of any kind is specified.
// Separate to the case above to handle concurrent non-OCC object creations, where all but the first must become updates.
query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now()) ON CONFLICT (collection, read, key, user_id) DO UPDATE SET value = $4, version = $5, read = $6, write = $7, update_time = now()"
// Respect permissions in non-authoritative writes, where this operation also loses the race to insert the object.
if !authoritativeWrite {
query += " WHERE storage.write = 1"
}
case dbVersion.Valid && object.Version != "*":
// An existing storage object was present, but no OCC if-not-exists required.
query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8"
params = append(params, dbVersion.String)
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
query += " AND write = 1"
}
default:

// Outcomes:
// - No rows: if no rows returned, then object was not found in DB and can't be updated
// - We have row returned, but now we need to know if update happened, that is its WHERE matched
// * write != 1 means no permission to write
// * dbVersion != original version means OCC failure

case object.Version == "":
// non-OCC write, "last write wins" kind of write

// Similar pattern as in case above, but supports case when row
// didn't exist in the database. Another difference is that there is no version
// check for existing row.
query = `
WITH upd AS (
INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time)
VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())
ON CONFLICT (collection, key, user_id) DO
UPDATE SET value = $4, version = $5, read = $6, write = $7, update_time = now()
WHERE TRUE` + writeCheck + `
AND NOT (storage.version = $5 AND storage.read = $6 AND storage.write = $7) -- micro optimization: don't update row unnecessary
RETURNING read, write, version
)
(SELECT read, write, version from upd)
UNION ALL
(SELECT read, write, version FROM storage WHERE collection = $1 and key = $2 and user_id = $3)
LIMIT 1`

// Outcomes:
// - Row is always returned, need to know if update happened, that is its WHERE matches
// - write != 1 means no permission to write

case object.Version == "*":
// OCC if-not-exists, and all other non-OCC cases.
query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())"
// Existing permission checks are not applicable for new storage objects.
}

res, err := tx.Exec(ctx, query, params...)
if err != nil {
logger.Debug("Could not write storage object, exec error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == dbErrorUniqueViolation {
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1)
return nil, runtime.ErrStorageRejectedVersion
}
return nil, err
}
if rowsAffected := res.RowsAffected(); rowsAffected != 1 {
logger.Debug("Could not write storage object, rowsAffected error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1)
return nil, runtime.ErrStorageRejectedVersion
}
query = `
INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time)
VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())
RETURNING read, write, version`

ack := &api.StorageObjectAck{
Collection: object.Collection,
Key: object.Key,
Version: newVersion,
UserId: ownerID,
// Outcomes:
// - NoRows - insert failed due to constraint violation (concurrent insert)
}

return ack, nil
batch.Queue(query, params...)
}

func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritativeDelete bool, ops StorageOpDeletes) (codes.Code, error) {
Expand Down

0 comments on commit 823efaf

Please sign in to comment.