Skip to content

Commit

Permalink
kvserver: latching changes for replicated shared locks
Browse files Browse the repository at this point in the history
Two locking requests from the same transaction that are trying to
acquire replicated shared locks need to be isolated from one another.
They don't need to be isolated against shared locking requests from
other transactions and unreplicated shared lock attempts from the same
transaction.

To achieve these semantics, we introduce a per-transaction range local
key that all replicated shared locking requests declare non-MVCC write
latches over.

Closes #109668

Release note: None
  • Loading branch information
arulajmani committed Sep 28, 2023
1 parent 6e49f7f commit 6fa4870
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 8 deletions.
5 changes: 5 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ var (
// AbortSpan protects a transaction from re-reading its own intents
// after it's been aborted.
LocalAbortSpanSuffix = []byte("abc-")
// LocalReplicatedSharedLocksTransactionLatchingKeySuffix specifies the key
// suffix ("rsl" = replicated shared locks) for all replicated shared lock
// attempts, per transaction. The detail about the transaction is the
// transaction id.
LocalReplicatedSharedLocksTransactionLatchingKeySuffix = roachpb.RKey("rsl-")
// localRangeFrozenStatusSuffix is DEPRECATED and remains to prevent reuse.
localRangeFrozenStatusSuffix = []byte("fzn-")
// LocalRangeGCThresholdSuffix is the suffix for the GC threshold. It keeps
Expand Down
13 changes: 7 additions & 6 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,13 @@ var _ = [...]interface{}{
// range as a whole. Though they are replicated, they are unaddressable.
// Typical examples are MVCC stats and the abort span. They all share
// `LocalRangeIDPrefix` and `LocalRangeIDReplicatedInfix`.
AbortSpanKey, // "abc-"
RangeGCThresholdKey, // "lgc-"
RangeAppliedStateKey, // "rask"
RangeLeaseKey, // "rll-"
RangePriorReadSummaryKey, // "rprs"
RangeVersionKey, // "rver"
AbortSpanKey, // "abc-"
ReplicatedSharedLocksTransactionLatchingKey, // "rsl-"
RangeGCThresholdKey, // "lgc-"
RangeAppliedStateKey, // "rask"
RangeLeaseKey, // "rll-"
RangePriorReadSummaryKey, // "rprs"
RangeVersionKey, // "rver"

// 2. Unreplicated range-ID local keys: These contain metadata that
// pertain to just one replica of a range. They are unreplicated and
Expand Down
19 changes: 19 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ func AbortSpanKey(rangeID roachpb.RangeID, txnID uuid.UUID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).AbortSpanKey(txnID)
}

// ReplicatedSharedLocksTransactionLatchingKey returns a range-local key, based
// on the provided range ID and transaction ID, that all replicated shared
// locking requests from the specified transaction should use to serialize on
// latches.
func ReplicatedSharedLocksTransactionLatchingKey(
rangeID roachpb.RangeID, txnID uuid.UUID,
) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).ReplicatedSharedLocksTransactionLatchingKey(txnID)
}

// DecodeAbortSpanKey decodes the provided AbortSpan entry,
// returning the transaction ID.
func DecodeAbortSpanKey(key roachpb.Key, dest []byte) (uuid.UUID, error) {
Expand Down Expand Up @@ -1066,6 +1076,15 @@ func (b RangeIDPrefixBuf) AbortSpanKey(txnID uuid.UUID) roachpb.Key {
return encoding.EncodeBytesAscending(key, txnID.GetBytes())
}

// ReplicatedSharedLocksTransactionLatchingKey returns a range-local key, by
// range ID, for a key on which all replicated shared locking requests from a
// specific transaction should serialize on latches. The per-transaction bit is
// achieved by encoding the supplied transaction ID into the key.
func (b RangeIDPrefixBuf) ReplicatedSharedLocksTransactionLatchingKey(txnID uuid.UUID) roachpb.Key {
key := append(b.replicatedPrefix(), LocalReplicatedSharedLocksTransactionLatchingKeySuffix...)
return encoding.EncodeBytesAscending(key, txnID.GetBytes())
}

// RangeAppliedStateKey returns a system-local key for the range applied state key.
// See comment on RangeAppliedStateKey function.
func (b RangeIDPrefixBuf) RangeAppliedStateKey() roachpb.Key {
Expand Down
20 changes: 20 additions & 0 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ var (
psFunc func(rangeID roachpb.RangeID, input string) (string, roachpb.Key)
}{
{name: "AbortSpan", suffix: LocalAbortSpanSuffix, ppFunc: abortSpanKeyPrint, psFunc: abortSpanKeyParse},
{name: "ReplicatedSharedLocksTransactionLatch",
suffix: LocalReplicatedSharedLocksTransactionLatchingKeySuffix,
ppFunc: replicatedSharedLocksTransactionLatchingKeyPrint,
},
{name: "RangeTombstone", suffix: LocalRangeTombstoneSuffix},
{name: "RaftHardState", suffix: LocalRaftHardStateSuffix},
{name: "RangeAppliedState", suffix: LocalRangeAppliedStateSuffix},
Expand Down Expand Up @@ -567,6 +571,22 @@ func abortSpanKeyPrint(buf *redact.StringBuilder, key roachpb.Key) {
buf.Printf("/%q", txnID)
}

func replicatedSharedLocksTransactionLatchingKeyPrint(buf *redact.StringBuilder, key roachpb.Key) {
_, id, err := encoding.DecodeBytesAscending([]byte(key), nil)
if err != nil {
buf.Printf("/%q/err:%v", key, err)
return
}

txnID, err := uuid.FromBytes(id)
if err != nil {
buf.Printf("/%q/err:%v", key, err)
return
}

buf.Printf("/%q", txnID)
}

func print(buf *redact.StringBuilder, _ []encoding.Direction, key roachpb.Key) {
buf.Printf("/%q", []byte(key))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func TestPrettyPrint(t *testing.T) {
{keys.StoreLossOfQuorumRecoveryCleanupActionsKey(), "/Local/Store/lossOfQuorumRecovery/cleanup", revertSupportUnknown},

{keys.AbortSpanKey(roachpb.RangeID(1000001), txnID), fmt.Sprintf(`/Local/RangeID/1000001/r/AbortSpan/%q`, txnID), revertSupportUnknown},
{keys.ReplicatedSharedLocksTransactionLatchingKey(roachpb.RangeID(1000001), txnID), fmt.Sprintf(`/Local/RangeID/1000001/r/ReplicatedSharedLocksTransactionLatch/%q`, txnID), revertSupportUnknown},
{keys.RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState", revertSupportUnknown},
{keys.RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftTruncatedState", revertSupportUnknown},
{keys.RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease", revertSupportUnknown},
Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func DefaultDeclareKeys(
// ensures that the commands are fully isolated from conflicting transactions
// when it evaluated.
func DefaultDeclareIsolatedKeys(
_ ImmutableRangeState,
rs ImmutableRangeState,
header *kvpb.Header,
req kvpb.Request,
latchSpans *spanset.SpanSet,
Expand Down Expand Up @@ -92,7 +92,8 @@ func DefaultDeclareIsolatedKeys(
// Get the correct lock strength to use for {lock,latch} spans if we're
// dealing with locking read requests.
if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok {
str, _ = readOnlyReq.KeyLocking()
var dur lock.Durability
str, dur = readOnlyReq.KeyLocking()
switch str {
case lock.None:
panic(errors.AssertionFailedf("unexpected non-locking read handling"))
Expand All @@ -109,6 +110,15 @@ func DefaultDeclareIsolatedKeys(
// from concurrent writers operating at lower timestamps, a shared-locking
// read extends this protection to all timestamps.
timestamp = hlc.MaxTimestamp
if dur == lock.Replicated && header.Txn != nil {
// Concurrent replicated shared lock attempts by the same transaction
// need to be isolated from one another. We acquire a write latch on
// a per-transaction local key to achieve this. See
// https://github.com/cockroachdb/cockroach/issues/109668.
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: keys.ReplicatedSharedLocksTransactionLatchingKey(rs.GetRangeID(), header.Txn.ID),
})
}
case lock.Exclusive:
// Reads that acquire exclusive locks acquire write latches at the
// request's timestamp. This isolates them from all concurrent writes,
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func scanUserPriority(t *testing.T, d *datadriven.TestData) roachpb.UserPriority
func scanLockDurability(t *testing.T, d *datadriven.TestData) lock.Durability {
var durS string
d.ScanArgs(t, "dur", &durS)
return getLockDurability(t, d, durS)
}

func getLockDurability(t *testing.T, d *datadriven.TestData, durS string) lock.Durability {
switch durS {
case "r":
return lock.Replicated
Expand Down Expand Up @@ -177,13 +181,21 @@ func scanSingleRequest(
}
return concurrency.GetStrength(t, d, s)
}
maybeGetDur := func() lock.Durability {
s, ok := fields["dur"]
if !ok {
return lock.Unreplicated
}
return getLockDurability(t, d, s)
}

switch cmd {
case "get":
var r kvpb.GetRequest
r.Sequence = maybeGetSeq()
r.Key = roachpb.Key(mustGetField("key"))
r.KeyLockingStrength = maybeGetStr()
r.KeyLockingDurability = maybeGetDur()
return &r

case "scan":
Expand All @@ -194,6 +206,7 @@ func scanSingleRequest(
r.EndKey = roachpb.Key(v)
}
r.KeyLockingStrength = maybeGetStr()
r.KeyLockingDurability = maybeGetDur()
return &r

case "put":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,78 @@ finish req=req33
finish req=req34
----
[-] finish req34: finishing request

# ------------------------------------------------------------------------------
# Ensure concurrent replicated shared locking requests by the same transaction
# conflict on latches. Also ensure concurrent replicated shared lock attempts
# by different transactions do not.
# ------------------------------------------------------------------------------

new-request name=req35 txn=txn2 ts=11,1
get key=c str=shared dur=r
----

sequence req=req35
----
[35] sequence req35: sequencing request
[35] sequence req35: acquiring latches
[35] sequence req35: scanning lock table for conflicting locks
[35] sequence req35: sequencing complete, returned guard

new-request name=req36 txn=txn2 ts=11,1
scan key=a endkey=f str=shared dur=r
----

sequence req=req36
----
[36] sequence req36: sequencing request
[36] sequence req36: acquiring latches
[36] sequence req36: waiting to acquire write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0, held by write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0
[36] sequence req36: blocked on select in spanlatch.(*Manager).waitForSignal

new-request name=req37 txn=txn1 ts=11,1
get key=c str=shared dur=r
----

sequence req=req37
----
[37] sequence req37: sequencing request
[37] sequence req37: acquiring latches
[37] sequence req37: scanning lock table for conflicting locks
[37] sequence req37: sequencing complete, returned guard


# Unreplicated shared locking request from txn2. Shouldn't conflict on latches.
new-request name=req38 txn=txn2 ts=11,1
get key=c str=shared dur=u
----

sequence req=req38
----
[38] sequence req38: sequencing request
[38] sequence req38: acquiring latches
[38] sequence req38: scanning lock table for conflicting locks
[38] sequence req38: sequencing complete, returned guard

debug-latch-manager
----
write count: 3
read count: 4

finish req=req35
----
[-] finish req35: finishing request
[36] sequence req36: scanning lock table for conflicting locks
[36] sequence req36: sequencing complete, returned guard

finish req=req36
----
[-] finish req36: finishing request

finish req=req37
----
[-] finish req37: finishing request

finish req=req38
----
[-] finish req38: finishing request

0 comments on commit 6fa4870

Please sign in to comment.