Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: optimize multiple objects write #1059

Merged
merged 4 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions server/core_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/heroiclabs/nakama/v3/console"
"github.com/jackc/pgconn"
"github.com/jackc/pgtype"
pgx "github.com/jackc/pgx/v4"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -243,7 +244,7 @@ WHERE u.id IN (` + strings.Join(statements, ",") + `)`
}

func UpdateAccounts(ctx context.Context, logger *zap.Logger, db *sql.DB, updates []*accountUpdate) error {
if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error {
updateErr := updateAccounts(ctx, logger, tx, updates)
if updateErr != nil {
return updateErr
Expand All @@ -260,7 +261,7 @@ func UpdateAccounts(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
return nil
}

func updateAccounts(ctx context.Context, logger *zap.Logger, tx *sql.Tx, updates []*accountUpdate) error {
func updateAccounts(ctx context.Context, logger *zap.Logger, tx pgx.Tx, updates []*accountUpdate) error {
for _, update := range updates {
updateStatements := make([]string, 0, 7)
distinctStatements := make([]string, 0, 7)
Expand Down Expand Up @@ -346,7 +347,7 @@ func updateAccounts(ctx context.Context, logger *zap.Logger, tx *sql.Tx, updates
query := "UPDATE users SET update_time = now(), " + strings.Join(updateStatements, ", ") +
" WHERE id = $1 AND (" + strings.Join(distinctStatements, " OR ") + ")"

if _, err := tx.ExecContext(ctx, query, params...); err != nil {
if _, err := tx.Exec(ctx, query, params...); err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == dbErrorUniqueViolation && strings.Contains(pgErr.Message, "users_username_key") {
return errors.New("Username is already in use.")
Expand Down
3 changes: 2 additions & 1 deletion server/core_multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/heroiclabs/nakama-common/api"
"github.com/heroiclabs/nakama-common/runtime"
pgx "github.com/jackc/pgx/v4"
"go.uber.org/zap"
)

Expand All @@ -31,7 +32,7 @@ func MultiUpdate(ctx context.Context, logger *zap.Logger, db *sql.DB, metrics Me
var storageWriteAcks []*api.StorageObjectAck
var walletUpdateResults []*runtime.WalletUpdateResult

if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error {
storageWriteAcks = nil
walletUpdateResults = nil

Expand Down
262 changes: 150 additions & 112 deletions server/core_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"database/sql"
"encoding/base64"
"encoding/gob"
"encoding/hex"
"errors"
"fmt"
"sort"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/heroiclabs/nakama-common/runtime"
"github.com/jackc/pgconn"
"github.com/jackc/pgtype"
pgx "github.com/jackc/pgx/v4"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -49,6 +51,28 @@ type StorageOpWrite struct {
Object *api.WriteStorageObject
}

// Desired `read` persmission after this Op completes
func (op *StorageOpWrite) permissionRead() int32 {
if op.Object.PermissionRead != nil {
return op.Object.PermissionRead.Value
}
return 1
}

// Desired `write` persmission after this Op completes
func (op *StorageOpWrite) permissionWrite() int32 {
if op.Object.PermissionWrite != nil {
return op.Object.PermissionWrite.Value
}
return 1
}

// Expected object version after this Op completes
func (op *StorageOpWrite) expectedVersion() string {
hash := md5.Sum([]byte(op.Object.Value))
return hex.EncodeToString(hash[:])
}

func (s StorageOpWrites) Len() int {
return len(s)
}
Expand Down Expand Up @@ -466,11 +490,17 @@ WHERE
func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, metrics Metrics, storageIndex StorageIndex, authoritativeWrite bool, ops StorageOpWrites) (*api.StorageObjectAcks, codes.Code, error) {
var acks []*api.StorageObjectAck

if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error {
// If the transaction is retried ensure we wipe any acks that may have been prepared by previous attempts.
var writeErr error
acks, writeErr = storageWriteObjects(ctx, logger, metrics, tx, authoritativeWrite, ops)
if writeErr != nil {
if writeErr == runtime.ErrStorageRejectedVersion || writeErr == runtime.ErrStorageRejectedPermission {
logger.Debug("Error writing storage objects.", zap.Error(writeErr))
return StatusError(codes.InvalidArgument, "Storage write rejected.", writeErr)
} else {
logger.Error("Error writing storage objects.", zap.Error(writeErr))
}
return writeErr
}
return nil
Expand All @@ -487,7 +517,7 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, me
return &api.StorageObjectAcks{Acks: acks}, codes.OK, nil
}

func storageWriteObjects(ctx context.Context, logger *zap.Logger, metrics Metrics, tx *sql.Tx, authoritativeWrite bool, ops StorageOpWrites) ([]*api.StorageObjectAck, error) {
func storageWriteObjects(ctx context.Context, logger *zap.Logger, metrics Metrics, tx pgx.Tx, authoritativeWrite bool, ops StorageOpWrites) ([]*api.StorageObjectAck, error) {
// Ensure writes are processed in a consistent order to avoid deadlocks from concurrent operations.
// Sorting done on a copy to ensure we don't modify the input, which may be re-used on transaction retries.
sortedOps := make(StorageOpWrites, 0, len(ops))
Expand All @@ -497,143 +527,151 @@ func storageWriteObjects(ctx context.Context, logger *zap.Logger, metrics Metric
indexedOps[op] = i
}
sort.Sort(sortedOps)

// Run operations in the sorted order.
acks := make([]*api.StorageObjectAck, ops.Len())
for _, op := range sortedOps {
ack, writeErr := storageWriteObject(ctx, logger, metrics, tx, authoritativeWrite, op.OwnerID, op.Object)
if writeErr != nil {
if writeErr == runtime.ErrStorageRejectedVersion || writeErr == runtime.ErrStorageRejectedPermission {
return nil, StatusError(codes.InvalidArgument, "Storage write rejected.", writeErr)
}

logger.Debug("Error writing storage objects.", zap.Error(writeErr))
return nil, writeErr
}
// Acks are returned in the original order.
acks[indexedOps[op]] = ack
batch := &pgx.Batch{}
for _, op := range sortedOps {
storagePrepBatch(batch, authoritativeWrite, op)
}
return acks, nil
}

func storageWriteObject(ctx context.Context, logger *zap.Logger, metrics Metrics, tx *sql.Tx, authoritativeWrite bool, ownerID string, object *api.WriteStorageObject) (*api.StorageObjectAck, error) {
var dbVersion sql.NullString
var dbPermissionWrite sql.NullInt64
var dbPermissionRead sql.NullInt64
err := tx.QueryRowContext(ctx, "SELECT version, read, write FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3", object.Collection, object.Key, ownerID).Scan(&dbVersion, &dbPermissionRead, &dbPermissionWrite)
if err != nil {
if err == sql.ErrNoRows {
if object.Version != "" && object.Version != "*" {
// Conditional write with a specific version but the object did not exist at all.
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1)
br := tx.SendBatch(ctx, batch)
defer br.Close() // TODO: need to "drain" batch, otherwise it logs all unprocessed queries
for _, op := range sortedOps {
object := op.Object
var resultRead int32
var resultWrite int32
var resultVersion string
err := br.QueryRow().Scan(&resultRead, &resultWrite, &resultVersion)
var pgErr *pgconn.PgError
if err != nil && errors.As(err, &pgErr) {
if pgErr.Code == dbErrorUniqueViolation {
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "version"}, 1)
return nil, runtime.ErrStorageRejectedVersion
}
} else {
logger.Debug("Error in write storage object pre-flight.", zap.Any("object", object), zap.Error(err))
return nil, err
} else if err == pgx.ErrNoRows {
// Not every case from storagePrepWriteObject can return NoRows, but those
// which do it is always ErrStorageRejectedVersion
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "version"}, 1)
return nil, runtime.ErrStorageRejectedVersion
} else if err != nil {
return nil, err
}
}

if dbVersion.Valid && (object.Version == "*" || (object.Version != "" && object.Version != dbVersion.String)) {
// An object existed, and it's a conditional write that either:
// - Expects no object.
// - Or expects a given version, but it does not match.
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1)
return nil, runtime.ErrStorageRejectedVersion
}

if dbPermissionWrite.Valid && dbPermissionWrite.Int64 == 0 && !authoritativeWrite {
// Non-authoritative write to an existing storage object with permission 0.
return nil, runtime.ErrStorageRejectedPermission
}

newVersion := fmt.Sprintf("%x", md5.Sum([]byte(object.Value)))
newPermissionRead := int32(1)
if object.PermissionRead != nil {
newPermissionRead = object.PermissionRead.Value
}
newPermissionWrite := int32(1)
if object.PermissionWrite != nil {
newPermissionWrite = object.PermissionWrite.Value
}

if dbVersion.Valid && dbVersion.String == newVersion && dbPermissionRead.Int64 == int64(newPermissionRead) && dbPermissionWrite.Int64 == int64(newPermissionWrite) {
// Stored object existed, and exactly matches the new object's version and read/write permissions.
if !(op.permissionRead() == resultRead &&
op.permissionWrite() == resultWrite &&
op.expectedVersion() == resultVersion) {
// Write failed, it can happen for 3 reasons:
// - constraint violation on insert (handles elsewhere)
// - permission: non authoritative write & original row write != 1
// - version mismatch
if !authoritativeWrite && resultWrite != 1 {
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "permission"}, 1)
return nil, runtime.ErrStorageRejectedPermission
} else {
// version check failed
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection, "reason": "version"}, 1)
return nil, runtime.ErrStorageRejectedVersion
}
}
ack := &api.StorageObjectAck{
Collection: object.Collection,
Key: object.Key,
Version: newVersion,
Version: resultVersion,
UserId: op.OwnerID,
}
if ownerID != uuid.Nil.String() {
ack.UserId = ownerID
}
return ack, nil
acks[indexedOps[op]] = ack
}

return acks, nil
}

func storagePrepBatch(batch *pgx.Batch, authoritativeWrite bool, op *StorageOpWrite) {
object := op.Object
ownerID := op.OwnerID

newVersion := op.expectedVersion()
newPermissionRead := op.permissionRead()
newPermissionWrite := op.permissionWrite()

params := []interface{}{object.Collection, object.Key, ownerID, object.Value, newVersion, newPermissionRead, newPermissionWrite}
var query string

writeCheck := ""
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
writeCheck = " AND storage.write = 1"
}

switch {
case object.Version != "" && object.Version != "*":
// OCC if-match.
query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8"

// Query pattern
// (UPDATE t ... RETURNING) UNION ALL (SELECT FROM t) LIMIT 1
// allows us to fetch row state after update even if update itself fails WHERE
// condition.
// That is returned values are final state of the row regardless of UPDATE success
query = `
WITH upd AS (
UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now()
WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8
` + writeCheck + `
AND NOT (storage.version = $5 AND storage.read = $6 AND storage.write = $7) -- micro optimization: don't update row unnecessary
RETURNING read, write, version
)
(SELECT read, write, version from upd)
UNION ALL
(SELECT read, write, version FROM storage WHERE collection = $1 and key = $2 and user_id = $3)
LIMIT 1`

params = append(params, object.Version)
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
query += " AND write = 1"
}
case dbVersion.Valid && object.Version == "":
// An existing storage object was present, but no OCC of any kind is specified.
query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID"
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
query += " AND write = 1"
}
case !dbVersion.Valid && object.Version == "":
// An existing storage object was not present, and no OCC of any kind is specified.
// Separate to the case above to handle concurrent non-OCC object creations, where all but the first must become updates.
query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now()) ON CONFLICT (collection, read, key, user_id) DO UPDATE SET value = $4, version = $5, read = $6, write = $7, update_time = now()"
// Respect permissions in non-authoritative writes, where this operation also loses the race to insert the object.
if !authoritativeWrite {
query += " WHERE storage.write = 1"
}
case dbVersion.Valid && object.Version != "*":
// An existing storage object was present, but no OCC if-not-exists required.
query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8"
params = append(params, dbVersion.String)
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
query += " AND write = 1"
}
default:

// Outcomes:
// - No rows: if no rows returned, then object was not found in DB and can't be updated
// - We have row returned, but now we need to know if update happened, that is its WHERE matched
// * write != 1 means no permission to write
// * dbVersion != original version means OCC failure

case object.Version == "":
// non-OCC write, "last write wins" kind of write

// Similar pattern as in case above, but supports case when row
// didn't exist in the database. Another difference is that there is no version
// check for existing row.
query = `
WITH upd AS (
INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time)
VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())
ON CONFLICT (collection, key, user_id) DO
UPDATE SET value = $4, version = $5, read = $6, write = $7, update_time = now()
WHERE TRUE` + writeCheck + `
AND NOT (storage.version = $5 AND storage.read = $6 AND storage.write = $7) -- micro optimization: don't update row unnecessary
RETURNING read, write, version
)
(SELECT read, write, version from upd)
UNION ALL
(SELECT read, write, version FROM storage WHERE collection = $1 and key = $2 and user_id = $3)
LIMIT 1`

// Outcomes:
// - Row is always returned, need to know if update happened, that is its WHERE matches
// - write != 1 means no permission to write

case object.Version == "*":
// OCC if-not-exists, and all other non-OCC cases.
query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())"
// Existing permission checks are not applicable for new storage objects.
}

res, err := tx.ExecContext(ctx, query, params...)
if err != nil {
logger.Debug("Could not write storage object, exec error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == dbErrorUniqueViolation {
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1)
return nil, runtime.ErrStorageRejectedVersion
}
return nil, err
}
if rowsAffected, err := res.RowsAffected(); rowsAffected != 1 {
logger.Debug("Could not write storage object, rowsAffected error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
metrics.StorageWriteRejectCount(map[string]string{"collection": object.Collection}, 1)
return nil, runtime.ErrStorageRejectedVersion
}
query = `
INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time)
VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())
RETURNING read, write, version`

ack := &api.StorageObjectAck{
Collection: object.Collection,
Key: object.Key,
Version: newVersion,
UserId: ownerID,
// Outcomes:
// - NoRows - insert failed due to constraint violation (concurrent insert)
}

return ack, nil
batch.Queue(query, params...)
}

func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, storageIndex StorageIndex, authoritativeDelete bool, ops StorageOpDeletes) (codes.Code, error) {
Expand Down
Loading
Loading