From 823efafb4d4ec3cf7933b49cc2dff38a02fbd2f9 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sat, 15 Jul 2023 18:00:25 +0100 Subject: [PATCH] storage: optimize multiple objects write Previously multiple objects write required 2 round trips to DB per object: one to fetch object state from DB and second to update object. Each query is indexed and fast, but with latency to DB comparable to query execution time it adds significant overhead. This change optimizes multiple objects write in 2 steps: 1. Instead of reading DB state for each object and then deciding on possible write operation, perform write operation unconditionally with correct predicates (version and permissions) defined where applicable. That said write operation might not succeed if row doesn't match predicate. Write query is structured in such way, that final state of the row in the database is returned, regardless whether writeop successed or not. By inspecting returned row we can infer whether it was success, version conflict or permission error. 2. Now that each object is written to DB in a single query, there is no dependencies between queries and all of them can be blasted to DB in a batch without waiting for result of each. Whole batch continues to be executed in a single transaction, so outcome is the same, but batching negates latency penalty. --- server/core_storage.go | 257 +++++++++++++++++++++++------------------ 1 file changed, 147 insertions(+), 110 deletions(-) diff --git a/server/core_storage.go b/server/core_storage.go index e23f49ac77..c8125ccc95 100644 --- a/server/core_storage.go +++ b/server/core_storage.go @@ -21,6 +21,7 @@ import ( "database/sql" "encoding/base64" "encoding/gob" + "encoding/hex" "errors" "fmt" "sort" @@ -50,6 +51,28 @@ type StorageOpWrite struct { Object *api.WriteStorageObject } +// Desired `read` persmission after this Op completes +func (op *StorageOpWrite) permissionRead() int32 { + if op.Object.PermissionRead != nil { + return op.Object.PermissionRead.Value + } + return 1 +} + +// Desired `write` persmission after this Op completes +func (op *StorageOpWrite) permissionWrite() int32 { + if op.Object.PermissionWrite != nil { + return op.Object.PermissionWrite.Value + } + return 1 +} + +// Expected object version after this Op completes +func (op *StorageOpWrite) expectedVersion() string { + hash := md5.Sum([]byte(op.Object.Value)) + return hex.EncodeToString(hash[:]) +} + func (s StorageOpWrites) Len() int { return len(s) } @@ -473,6 +496,12 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, me var writeErr error acks, writeErr = storageWriteObjects(ctx, logger, metrics, tx, authoritativeWrite, ops) if writeErr != nil { + if writeErr == runtime.ErrStorageRejectedVersion || writeErr == runtime.ErrStorageRejectedPermission { + logger.Debug("Error writing storage objects.", zap.Error(writeErr)) + return StatusError(codes.InvalidArgument, "Storage write rejected.", writeErr) + } else { + logger.Error("Error writing storage objects.", zap.Error(writeErr)) + } return writeErr } return nil @@ -497,143 +526,151 @@ func storageWriteObjects(ctx context.Context, logger *zap.Logger, metrics Metric indexedOps[op] = i } sort.Sort(sortedOps) - // Run operations in the sorted order. acks := make([]*api.StorageObjectAck, ops.Len()) - for _, op := range sortedOps { - ack, writeErr := storageWriteObject(ctx, logger, metrics, tx, authoritativeWrite, op.OwnerID, op.Object) - if writeErr != nil { - if writeErr == runtime.ErrStorageRejectedVersion || writeErr == runtime.ErrStorageRejectedPermission { - return nil, StatusError(codes.InvalidArgument, "Storage write rejected.", writeErr) - } - logger.Debug("Error writing storage objects.", zap.Error(writeErr)) - return nil, writeErr - } - // Acks are returned in the original order. - acks[indexedOps[op]] = ack + batch := &pgx.Batch{} + for _, op := range sortedOps { + storagePrepBatch(batch, authoritativeWrite, op) } - return acks, nil -} -func storageWriteObject(ctx context.Context, logger *zap.Logger, metrics Metrics, tx pgx.Tx, authoritativeWrite bool, ownerID string, object *api.WriteStorageObject) (*api.StorageObjectAck, error) { - var dbVersion sql.NullString - var dbPermissionWrite sql.NullInt64 - var dbPermissionRead sql.NullInt64 - err := tx.QueryRow(ctx, "SELECT version, read, write FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3", object.Collection, object.Key, ownerID).Scan(&dbVersion, &dbPermissionRead, &dbPermissionWrite) - if err != nil { - if err == sql.ErrNoRows { - if object.Version != "" && object.Version != "*" { - // Conditional write with a specific version but the object did not exist at all. - metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1) + br := tx.SendBatch(ctx, batch) + defer br.Close() // TODO: need to "drain" batch, otherwise it logs all unprocessed queries + for _, op := range sortedOps { + object := op.Object + var resultRead int32 + var resultWrite int32 + var resultVersion string + err := br.QueryRow().Scan(&resultRead, &resultWrite, &resultVersion) + var pgErr *pgconn.PgError + if err != nil && errors.As(err, &pgErr) { + if pgErr.Code == dbErrorUniqueViolation { + metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "version"}, 1) return nil, runtime.ErrStorageRejectedVersion } - } else { - logger.Debug("Error in write storage object pre-flight.", zap.Any("object", object), zap.Error(err)) + return nil, err + } else if err == pgx.ErrNoRows { + // Not every case from storagePrepWriteObject can return NoRows, but those + // which do it is always ErrStorageRejectedVersion + metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "version"}, 1) + return nil, runtime.ErrStorageRejectedVersion + } else if err != nil { return nil, err } - } - - if dbVersion.Valid && (object.Version == "*" || (object.Version != "" && object.Version != dbVersion.String)) { - // An object existed, and it's a conditional write that either: - // - Expects no object. - // - Or expects a given version, but it does not match. - metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1) - return nil, runtime.ErrStorageRejectedVersion - } - - if dbPermissionWrite.Valid && dbPermissionWrite.Int64 == 0 && !authoritativeWrite { - // Non-authoritative write to an existing storage object with permission 0. - return nil, runtime.ErrStorageRejectedPermission - } - - newVersion := fmt.Sprintf("%x", md5.Sum([]byte(object.Value))) - newPermissionRead := int32(1) - if object.PermissionRead != nil { - newPermissionRead = object.PermissionRead.Value - } - newPermissionWrite := int32(1) - if object.PermissionWrite != nil { - newPermissionWrite = object.PermissionWrite.Value - } - if dbVersion.Valid && dbVersion.String == newVersion && dbPermissionRead.Int64 == int64(newPermissionRead) && dbPermissionWrite.Int64 == int64(newPermissionWrite) { - // Stored object existed, and exactly matches the new object's version and read/write permissions. + if !(op.permissionRead() == resultRead && + op.permissionWrite() == resultWrite && + op.expectedVersion() == resultVersion) { + // Write failed, it can happen for 3 reasons: + // - constraint violation on insert (handles elsewhere) + // - permission: non authoritative write & original row write != 1 + // - version mismatch + if !authoritativeWrite && resultWrite != 1 { + metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "permission"}, 1) + return nil, runtime.ErrStorageRejectedPermission + } else { + // version check failed + metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "version"}, 1) + return nil, runtime.ErrStorageRejectedVersion + } + } ack := &api.StorageObjectAck{ Collection: object.Collection, Key: object.Key, - Version: newVersion, - } - if ownerID != uuid.Nil.String() { - ack.UserId = ownerID + Version: resultVersion, + UserId: op.OwnerID, } - return ack, nil + acks[indexedOps[op]] = ack } + return acks, nil +} + +func storagePrepBatch(batch *pgx.Batch, authoritativeWrite bool, op *StorageOpWrite) { + object := op.Object + ownerID := op.OwnerID + + newVersion := op.expectedVersion() + newPermissionRead := op.permissionRead() + newPermissionWrite := op.permissionWrite() + params := []interface{}{object.Collection, object.Key, ownerID, object.Value, newVersion, newPermissionRead, newPermissionWrite} var query string + + writeCheck := "" + // Respect permissions in non-authoritative writes. + if !authoritativeWrite { + writeCheck = " AND storage.write = 1" + } + switch { case object.Version != "" && object.Version != "*": // OCC if-match. - query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8" + + // Query pattern + // (UPDATE t ... RETURNING) UNION ALL (SELECT FROM t) LIMIT 1 + // allows us to fetch row state after update even if update itself fails WHERE + // condition. + // That is returned values are final state of the row regardless of UPDATE success + query = ` + WITH upd AS ( + UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() + WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8 + ` + writeCheck + ` + AND NOT (storage.version = $5 AND storage.read = $6 AND storage.write = $7) -- micro optimization: don't update row unnecessary + RETURNING read, write, version + ) + (SELECT read, write, version from upd) + UNION ALL + (SELECT read, write, version FROM storage WHERE collection = $1 and key = $2 and user_id = $3) + LIMIT 1` + params = append(params, object.Version) - // Respect permissions in non-authoritative writes. - if !authoritativeWrite { - query += " AND write = 1" - } - case dbVersion.Valid && object.Version == "": - // An existing storage object was present, but no OCC of any kind is specified. - query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID" - // Respect permissions in non-authoritative writes. - if !authoritativeWrite { - query += " AND write = 1" - } - case !dbVersion.Valid && object.Version == "": - // An existing storage object was not present, and no OCC of any kind is specified. - // Separate to the case above to handle concurrent non-OCC object creations, where all but the first must become updates. - query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now()) ON CONFLICT (collection, read, key, user_id) DO UPDATE SET value = $4, version = $5, read = $6, write = $7, update_time = now()" - // Respect permissions in non-authoritative writes, where this operation also loses the race to insert the object. - if !authoritativeWrite { - query += " WHERE storage.write = 1" - } - case dbVersion.Valid && object.Version != "*": - // An existing storage object was present, but no OCC if-not-exists required. - query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8" - params = append(params, dbVersion.String) - // Respect permissions in non-authoritative writes. - if !authoritativeWrite { - query += " AND write = 1" - } - default: + + // Outcomes: + // - No rows: if no rows returned, then object was not found in DB and can't be updated + // - We have row returned, but now we need to know if update happened, that is its WHERE matched + // * write != 1 means no permission to write + // * dbVersion != original version means OCC failure + + case object.Version == "": + // non-OCC write, "last write wins" kind of write + + // Similar pattern as in case above, but supports case when row + // didn't exist in the database. Another difference is that there is no version + // check for existing row. + query = ` + WITH upd AS ( + INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) + VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now()) + ON CONFLICT (collection, key, user_id) DO + UPDATE SET value = $4, version = $5, read = $6, write = $7, update_time = now() + WHERE TRUE` + writeCheck + ` + AND NOT (storage.version = $5 AND storage.read = $6 AND storage.write = $7) -- micro optimization: don't update row unnecessary + RETURNING read, write, version + ) + (SELECT read, write, version from upd) + UNION ALL + (SELECT read, write, version FROM storage WHERE collection = $1 and key = $2 and user_id = $3) + LIMIT 1` + + // Outcomes: + // - Row is always returned, need to know if update happened, that is its WHERE matches + // - write != 1 means no permission to write + + case object.Version == "*": // OCC if-not-exists, and all other non-OCC cases. - query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())" // Existing permission checks are not applicable for new storage objects. - } - - res, err := tx.Exec(ctx, query, params...) - if err != nil { - logger.Debug("Could not write storage object, exec error.", zap.Any("object", object), zap.String("query", query), zap.Error(err)) - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) && pgErr.Code == dbErrorUniqueViolation { - metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1) - return nil, runtime.ErrStorageRejectedVersion - } - return nil, err - } - if rowsAffected := res.RowsAffected(); rowsAffected != 1 { - logger.Debug("Could not write storage object, rowsAffected error.", zap.Any("object", object), zap.String("query", query), zap.Error(err)) - metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1) - return nil, runtime.ErrStorageRejectedVersion - } + query = ` + INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) + VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now()) + RETURNING read, write, version` - ack := &api.StorageObjectAck{ - Collection: object.Collection, - Key: object.Key, - Version: newVersion, - UserId: ownerID, + // Outcomes: + // - NoRows - insert failed due to constraint violation (concurrent insert) } - return ack, nil + batch.Queue(query, params...) } func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritativeDelete bool, ops StorageOpDeletes) (codes.Code, error) {