Skip to content

Commit

Permalink
Add isolation option to the vault lock
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Feb 10, 2025
1 parent c14c908 commit f1a0818
Show file tree
Hide file tree
Showing 26 changed files with 496 additions and 677 deletions.
273 changes: 57 additions & 216 deletions platform/common/core/generic/vault/helpers.go

Large diffs are not rendered by default.

11 changes: 1 addition & 10 deletions platform/common/core/generic/vault/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type VersionedQueryExecutor interface {
GetStateMetadata(ctx context.Context, namespace, key string) (driver.Metadata, driver.RawVersion, error)
GetState(ctx context.Context, namespace, key string) (*driver.VaultRead, error)
Done()
Done() error
}

type VersionComparator interface {
Expand Down Expand Up @@ -82,15 +82,6 @@ func NewInterceptor[V driver.ValidationCode](
}

func (i *Interceptor[V]) IsValid() error {
tx, err := i.vaultStore.GetTxStatus(context.Background(), i.txID)
if err != nil {
return err
}
i.logger.Infof("found it at version [%v]", tx.Code)
if tx.Code == driver.Valid {
return errors.Errorf("duplicate txid %s", i.txID)
}

i.RLock()
defer i.RUnlock()
if i.qe == nil {
Expand Down
3 changes: 2 additions & 1 deletion platform/common/core/generic/vault/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (qe MockQE) GetState(_ context.Context, _ driver.Namespace, pkey driver.PKe
}, nil
}

func (qe MockQE) Done() {
func (qe MockQE) Done() error {
return nil
}

type MockTxStatusStore struct {
Expand Down
62 changes: 0 additions & 62 deletions platform/common/core/generic/vault/queryexec.go

This file was deleted.

75 changes: 23 additions & 52 deletions platform/common/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func New[V driver.ValidationCode](
}

func (db *Vault[V]) NewQueryExecutor(ctx context.Context) (dbdriver.QueryExecutor, error) {
return newGlobalLockQueryExecutor(ctx, db.vaultStore)
return db.vaultStore.NewGlobalLockVaultReader(ctx)
}

func (db *Vault[V]) Status(ctx context.Context, txID driver.TxID) (V, string, error) {
Expand Down Expand Up @@ -243,48 +243,45 @@ func (db *Vault[V]) SetDiscarded(ctx context.Context, txID driver.TxID, message
}

func (db *Vault[V]) NewRWSet(ctx context.Context, txID driver.TxID) (api2.RWSet, error) {
return db.NewInspector(ctx, txID)
db.logger.Debugf("NewRWSet[%s]", txID)

return db.newRWSet(ctx, txID, EmptyRWSet(), driver.LevelDefault)
}

func (db *Vault[V]) NewInspector(ctx context.Context, txID driver.TxID) (TxInterceptor, error) {
newCtx, _ := db.tracer.Start(ctx, "inspector")
func (db *Vault[V]) NewRWSetWithIsolationLevel(ctx context.Context, txID driver.TxID, isolationLevel driver.IsolationLevel) (api2.RWSet, error) {
db.logger.Debugf("NewRWSet[%s]", txID)
qe, err := newTxLockQueryExecutor(ctx, db.vaultStore, txID)
if err != nil {
return nil, err
}
i := db.newInterceptor(db.logger, newCtx, EmptyRWSet(), qe, db.vaultStore, txID)

db.interceptorsLock.Lock()
defer db.interceptorsLock.Unlock()
if _, in := db.interceptors[txID]; in {
return nil, errors.Errorf("duplicate read-write set for txid %s", txID)
}
db.interceptors[txID] = i

return i, nil
return db.newRWSet(ctx, txID, EmptyRWSet(), isolationLevel)
}

func (db *Vault[V]) GetRWSet(ctx context.Context, txID driver.TxID, rwsetBytes []byte) (driver.RWSet, error) {
span := trace.SpanFromContext(ctx)
span.AddEvent("start_get_rw_set")
defer span.AddEvent("end_get_rw_set")
db.logger.Debugf("GetRWSet[%s]", txID)
func (db *Vault[V]) NewRWSetFromBytes(ctx context.Context, txID driver.TxID, rwsetBytes []byte) (driver.RWSet, error) {
db.logger.Debugf("NewRWSetFromBytes[%s]", txID)
rwSet, err := db.populator.Populate(rwsetBytes)
if err != nil {
return nil, errors.Wrapf(err, "failed populating tx [%s]", txID)
}

qe, err := newTxLockQueryExecutor(ctx, db.vaultStore, txID)
return db.newRWSet(ctx, txID, rwSet, driver.LevelDefault)
}

func (db *Vault[V]) newRWSet(ctx context.Context, txID driver.TxID, rws ReadWriteSet, isolationLevel driver.IsolationLevel) (driver.RWSet, error) {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start new RW set")
defer span.AddEvent("End new RW set")

qe, err := db.vaultStore.NewTxLockVaultReader(ctx, txID, isolationLevel)
if err != nil {
return nil, err
}
i := db.newInterceptor(db.logger, ctx, rwSet, qe, db.vaultStore, txID)
if err != nil {
return nil, err
}
i := db.newInterceptor(db.logger, ctx, rws, qe, db.vaultStore, txID)

db.interceptorsLock.Lock()
defer db.interceptorsLock.Unlock()
if i, in := db.interceptors[txID]; in && !i.IsClosed() {
return nil, errors.Errorf("programming error: previous read-write set for %s has not been closed", txID)
if interceptor, ok := db.interceptors[txID]; ok && !interceptor.IsClosed() {
return nil, errors.Errorf("programming error: rwset already exists for [%s]", txID)
}
db.interceptors[txID] = i

Expand Down Expand Up @@ -366,32 +363,6 @@ func (db *Vault[V]) Statuses(ctx context.Context, txIDs ...driver.TxID) ([]drive
return statuses, nil
}

func (db *Vault[V]) GetExistingRWSet(ctx context.Context, txID driver.TxID) (driver.RWSet, error) {
span := trace.SpanFromContext(ctx)
span.AddEvent("start_get_existing_rwset")
defer span.AddEvent("end_get_existing_rwset")
db.logger.Debugf("GetExistingRWSet[%s]", txID)

db.interceptorsLock.Lock()
defer db.interceptorsLock.Unlock()
interceptor, in := db.interceptors[txID]
if !in {
return nil, errors.Errorf("rws for [%s] not found", txID)
}
if !interceptor.IsClosed() {
return nil, errors.Errorf("programming error: previous read-write set for %s has not been closed", txID)
}
qe, err := newTxLockQueryExecutor(ctx, db.vaultStore, txID)
if err != nil {
return nil, err
}
if err := interceptor.Reopen(qe); err != nil {
return nil, errors.Errorf("failed to reopen rwset [%s]", txID)
}

return interceptor, nil
}

func (db *Vault[V]) SetStatus(ctx context.Context, txID driver.TxID, code V) error {
return db.vaultStore.SetStatuses(ctx, db.vcProvider.ToInt32(code), "", txID)
}
2 changes: 1 addition & 1 deletion platform/common/core/generic/vault/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestPostgres(t *testing.T) {
}
artifactProvider := &testArtifactProvider{}

for _, c := range SingleDBCases {
for _, c := range append(SingleDBCases, ReadCommittedDBCases...) {
ddb, terminate, err := vault.OpenPostgresVault("common-sdk-node1")
assert.NoError(t, err)
t.Run(c.Name, func(xt *testing.T) {
Expand Down
76 changes: 48 additions & 28 deletions platform/common/driver/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ type VersionedResultsIterator = collections.Iterator[*VaultRead]
type QueryExecutor interface {
GetState(ctx context.Context, namespace Namespace, key PKey) (*VaultRead, error)
GetStateMetadata(ctx context.Context, namespace Namespace, key PKey) (Metadata, RawVersion, error)
GetStateRangeScanIterator(ctx context.Context, namespace Namespace, startKey PKey, endKey PKey) (VersionedResultsIterator, error)
Done()
GetStateRange(ctx context.Context, namespace Namespace, startKey PKey, endKey PKey) (VersionedResultsIterator, error)
Done() error
}

type TxValidationStatus[V comparable] struct {
Expand All @@ -87,11 +87,13 @@ type Vault[V comparable] interface {
// by way of the supplied txid
NewRWSet(ctx context.Context, txID TxID) (RWSet, error)

// GetRWSet returns a RWSet for this ledger whose content is unmarshalled
// NewRWSetFromBytes creates a new RWSet in the vault for this ledger whose content is unmarshalled
// from the passed bytes.
// A client may obtain more than one such simulator; they are made unique
// by way of the supplied txid
GetRWSet(ctx context.Context, txID TxID, rwset []byte) (RWSet, error)
// by way of the supplied txid.
// Example: alice creates a new RWSet using NewRWSet and marshals and sends the serialized RW set to bob.
// Then bob creates an identical RWSet using GetRWSet using the marshaled RW set from alice
NewRWSetFromBytes(ctx context.Context, txID TxID, rwset []byte) (RWSet, error)

SetDiscarded(ctx context.Context, txID TxID, message string) error

Expand All @@ -110,26 +112,7 @@ type MetaWrites map[Namespace]map[PKey]VaultMetadataValue

type Writes map[Namespace]map[PKey]VaultValue

// VaultLock represents a lock over a transaction or the whole vault
type VaultLock interface {
// Release releases the locked resources
Release() error
}

type VaultStore interface {
// AcquireTxIDRLock acquires a read lock on a specific transaction.
// While holding this lock, other routines:
// - cannot update the transaction states or statuses of the locked transactions
// - can read the locked transactions
AcquireTxIDRLock(ctx context.Context, txID TxID) (VaultLock, error)

// AcquireGlobalLock acquires a global exclusive read lock on the vault.
// While holding this lock, other routines:
// - cannot acquire this read lock (exclusive)
// - cannot update any transaction states or statuses
// - can read any transaction
AcquireGlobalLock(ctx context.Context) (VaultLock, error)

type VaultReader interface {
// GetStateMetadata returns the metadata for the given specific namespace - key pair
GetStateMetadata(ctx context.Context, namespace Namespace, key PKey) (Metadata, RawVersion, error)

Expand All @@ -145,9 +128,6 @@ type VaultStore interface {
// GetAllStates returns all states for a given namespace. Only used for testing purposes.
GetAllStates(ctx context.Context, namespace Namespace) (TxStateIterator, error)

// Store stores atomically the transaction statuses, writes and metadata writes
Store(ctx context.Context, txIDs []TxID, writes Writes, metaWrites MetaWrites) error

// GetLast returns the status of the latest non-pending transaction
GetLast(ctx context.Context) (*TxStatus, error)

Expand All @@ -159,6 +139,46 @@ type VaultStore interface {

// GetAllTxStatuses returns the statuses of the all transactions in the vault
GetAllTxStatuses(ctx context.Context) (TxStatusIterator, error)
}

// LockedVaultReader is a VaultReader with a lock on some or all entries
type LockedVaultReader interface {
VaultReader

// Done releases the lock on the locked resources
Done() error
}

type IsolationLevel int

const (
LevelDefault IsolationLevel = iota
LevelReadUncommitted
LevelReadCommitted
LevelWriteCommitted
LevelRepeatableRead
LevelSnapshot
LevelSerializable
LevelLinearizable
)

type VaultStore interface {
VaultReader
// NewTxLockVaultReader acquires a read lock on a specific transaction.
// While holding this lock, other routines:
// - cannot update the transaction states of the locked transactions (based on the isolation level passed)
// - can read the locked states
NewTxLockVaultReader(ctx context.Context, txID TxID, isolationLevel IsolationLevel) (LockedVaultReader, error)

// NewGlobalLockVaultReader acquires a global exclusive read lock on the vault.
// While holding this lock, other routines:
// - cannot acquire this read lock (exclusive)
// - cannot update any transaction states or statuses
// - can read any transaction
NewGlobalLockVaultReader(ctx context.Context) (LockedVaultReader, error)

// Store stores atomically the transaction statuses, writes and metadata writes
Store(ctx context.Context, txIDs []TxID, writes Writes, metaWrites MetaWrites) error

// SetStatuses sets the status and message for the given transactions
SetStatuses(ctx context.Context, code TxStatusCode, message string, txIDs ...TxID) error
Expand Down
6 changes: 5 additions & 1 deletion platform/fabric/core/generic/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,11 @@ func (c *Committer) ReloadConfigTransactions() error {
if err != nil {
return errors.WithMessagef(err, "failed getting query executor")
}
defer qe.Done()
defer func() {
if err := qe.Done(); err != nil {
logger.Errorf("error closing query executor: %v", err)
}
}()

c.logger.Debugf("looking up the latest config block available")
var sequence uint64 = 0
Expand Down
2 changes: 1 addition & 1 deletion platform/fabric/core/generic/rwset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (h *endorserTransactionHandler) Load(payl *common.Payload, chdr *common.Cha

logger.Debugf("retrieve rws [%s,%s]", h.channel, chdr.TxId)

rws, err := h.v.GetRWSet(context.Background(), chdr.TxId, upe.Results)
rws, err := h.v.NewRWSetFromBytes(context.Background(), chdr.TxId, upe.Results)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions platform/fabric/core/generic/transaction/transasction.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,15 @@ func (t *Transaction) SetRWSet() error {
if err != nil {
return errors.WithMessagef(err, "failed to get rws from proposal response")
}
t.rwset, err = t.channel.Vault().GetRWSet(t.ctx, t.ID(), results)
t.rwset, err = t.channel.Vault().NewRWSetFromBytes(t.ctx, t.ID(), results)
if err != nil {
return errors.WithMessagef(err, "failed to populate rws from proposal response")
}
case len(t.RWSet) != 0:
span.AddEvent("from_rwset")
logger.Debugf("populate rws from rwset")
var err error
t.rwset, err = t.channel.Vault().GetRWSet(t.ctx, t.ID(), t.RWSet)
t.rwset, err = t.channel.Vault().NewRWSetFromBytes(t.ctx, t.ID(), t.RWSet)
if err != nil {
return errors.WithMessagef(err, "failed to populate rws from existing rws")
}
Expand Down
5 changes: 1 addition & 4 deletions platform/fabric/driver/rwset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ type (
)

type RWSetInspector interface {
GetRWSet(ctx context.Context, txID driver.TxID, rwset []byte) (RWSet, error)
NewRWSetFromBytes(ctx context.Context, txID driver.TxID, rwset []byte) (RWSet, error)
InspectRWSet(ctx context.Context, rwsetBytes []byte, namespaces ...driver.Namespace) (RWSet, error)
NewRWSet(ctx context.Context, txID driver.TxID) (RWSet, error)
RWSExists(ctx context.Context, txID driver.TxID) bool
GetExistingRWSet(ctx context.Context, txID driver.TxID) (driver.RWSet, error)
}

type RWSetPayloadHandler interface {
Expand Down
Loading

0 comments on commit f1a0818

Please sign in to comment.