Skip to content

Commit

Permalink
Fix LockBroker gc and write performance (#497)
Browse files Browse the repository at this point in the history
* Fix LockBroker gc and perf

Don't rely on gc to clean up range locks. Also, fix logic that made it possible for an IllegalMonitorStateException
to occur when gc happened.

* add 0.11.5 to cross version tests
  • Loading branch information
jtnelson authored Jan 29, 2023
1 parent 9d15d7b commit 87cf237
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public final class CrossVersionBenchmarkTests {
/**
* The versions to use in all benchmark tests
*/
public static String[] VERSIONS = { "0.11.3", "0.11.4", "latest" };
public static String[] VERSIONS = { "0.11.3", "0.11.4", "0.11.5",
"latest" };

private CrossVersionBenchmarkTests() {/* no-init */}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,16 @@ public Permit writeLock(Token token) {
* {@link RangeReadWriteLock RangeReadWriteLocks} to provide locking for
* abstract ranges.
* </p>
* <p>
* <li>A value of <strong>0</strong> indicates that the {@link Range} is
* unlocked</li>
* <li>A <strong>positive number</strong> value indicates that the
* {@link Range} is write locked</li>
* <li>A <strong>negative number</strong> value indicates that the
* {@link Range} is read locked</li>
* </p>
*/
private final Map<Text, Map<Range<Value>, AtomicInteger>> rangeLocks;
private final Map<Text, Map<Range<Value>, Integer>> rangeLocks;

/**
* Threads that are queued to be {@link LockSupport#unpark(Thread) unparked}
Expand Down Expand Up @@ -385,25 +393,6 @@ void gc() {
locksIt.remove();
}
}
Iterator<Entry<Text, Map<Range<Value>, AtomicInteger>>> rangeKeys = rangeLocks
.entrySet().iterator();
while (rangeKeys.hasNext()) {
Entry<Text, Map<Range<Value>, AtomicInteger>> rangeEntry = rangeKeys
.next();
Map<Range<Value>, AtomicInteger> rangeKeyLocks = rangeEntry
.getValue();
Iterator<Entry<Range<Value>, AtomicInteger>> rangeKeyLocksIt = rangeKeyLocks
.entrySet().iterator();
while (rangeKeyLocksIt.hasNext()) {
Entry<Range<Value>, AtomicInteger> entry = rangeKeyLocksIt
.next();
AtomicInteger state = entry.getValue();
int s = state.get();
if(s == 0 && state.compareAndSet(s, s)) {
rangeKeyLocksIt.remove();
}
}
}
}

/**
Expand Down Expand Up @@ -652,7 +641,7 @@ public Condition newCondition() {
public boolean tryLock() {
Text key = token.getKey();
Operator operator = token.getOperator();
Map<Range<Value>, AtomicInteger> ranges = rangeLocks
Map<Range<Value>, Integer> ranges = rangeLocks
.computeIfAbsent(key, $ -> new ConcurrentHashMap<>());
outer: for (;;) {
/*
Expand All @@ -661,21 +650,20 @@ public boolean tryLock() {
* while checking its state.
*/
for (Range<Value> range : token.ranges()) {
AtomicInteger state = ranges.computeIfAbsent(range,
$ -> new AtomicInteger(0));
int s = state.get();
if(mode == Mode.WRITE && (s >= 1 || s < 0)) {
int state = ranges.computeIfAbsent(range, $ -> 0);
if(mode == Mode.WRITE && (state >= 1 || state < 0)) {
// If the same #range is locked in any state, this
// attempt to WRITE lock is blocked.
if(state.compareAndSet(s, s)) {
if(ranges.replace(range, state, state)) {
return false;
}
else {
continue outer;
}
}
if(mode == Mode.READ && operator == Operator.EQUALS) {
if(s <= 0 && state.compareAndSet(s, s)) {
if(state <= 0
&& ranges.replace(range, state, state)) {
// A EQUAL READ is only blocked if there is a
// concurrent WRITE for the exact value. If that
// isn't the case we can proceed to the next of
Expand All @@ -684,7 +672,8 @@ public boolean tryLock() {
// connected ranges.
continue;
}
else if(s > 0 && state.compareAndSet(s, s)) {
else if(state > 0
&& ranges.replace(range, state, state)) {
return false;
}
else {
Expand All @@ -693,16 +682,15 @@ else if(s > 0 && state.compareAndSet(s, s)) {
}
// As a last resort, check for locked connected ranges
// and determine if this lock is blocked as a result.
Iterator<Entry<Range<Value>, AtomicInteger>> it = ranges
Iterator<Entry<Range<Value>, Integer>> it = ranges
.entrySet().iterator();
while (it.hasNext()) {
Entry<Range<Value>, AtomicInteger> entry = it
.next();
Entry<Range<Value>, Integer> entry = it.next();
Range<Value> locked = entry.getKey();
state = entry.getValue();
s = state.get();
if((s == 0 || (mode == Mode.READ && s < 0))) {
if(state.compareAndSet(s, s)) {
if((state == 0
|| (mode == Mode.READ && state < 0))) {
if(ranges.replace(locked, state, state)) {
// If the #locked range is actually unlocked
// or we are attempting a READ lock and the
// #locked range is also a READ lock, we can
Expand All @@ -716,9 +704,10 @@ else if(s > 0 && state.compareAndSet(s, s)) {
}
else if(Ranges.haveNonEmptyIntersection(locked,
range)
&& ((mode == Mode.READ && s > 0)
|| (mode == Mode.WRITE && s < 0))) {
if(state.compareAndSet(s, s)) {
&& ((mode == Mode.READ && state > 0)
|| (mode == Mode.WRITE
&& state < 0))) {
if(ranges.replace(locked, state, state)) {
return false;
}
else {
Expand All @@ -727,23 +716,30 @@ else if(Ranges.haveNonEmptyIntersection(locked,
}
}
}
List<AtomicInteger> undos = new ArrayList<>(1);
List<Range<Value>> undos = new ArrayList<>(1);
for (Range<Value> range : token.ranges()) {
AtomicInteger state = ranges.computeIfAbsent(range,
$ -> new AtomicInteger(0));
int s = state.get();
if(!state.compareAndSet(s,
s + (mode == Mode.READ ? -1 : 1))) {
int state = ranges.computeIfAbsent(range, $ -> 0);
if(!ranges.replace(range, state,
state + (mode == Mode.READ ? -1 : 1))) {
// The lock has been newly acquired or released
// since we checked the state, so undo any ranges we
// intermediately acquired and tryLock again.
for (AtomicInteger undo : undos) {
undo.addAndGet(mode == Mode.READ ? 1 : -1);
for (Range<Value> undo : undos) {
for (;;) {
int s = ranges.get(undo);
if(ranges.replace(undo, s,
s + (mode == Mode.READ ? 1 : -1))) {
break;
}
else {
continue;
}
}
}
continue outer;
}
else {
undos.add(state);
undos.add(range);
}
}
return true;
Expand Down Expand Up @@ -777,32 +773,43 @@ public boolean tryLock(long time, TimeUnit unit)
@Override
public void unlock() {
Text key = token.getKey();
Map<Range<Value>, AtomicInteger> ranges = rangeLocks
Map<Range<Value>, Integer> ranges = rangeLocks
.computeIfAbsent(key, $ -> new ConcurrentHashMap<>());
outer: for (;;) {
List<AtomicInteger> undos = new ArrayList<>(1);
List<Range<Value>> undos = new ArrayList<>(1);
for (Range<Value> range : token.ranges()) {
AtomicInteger state = ranges.computeIfAbsent(range,
$ -> new AtomicInteger(0));
int s = state.get();
if(s == 0 || (mode == Mode.READ && s > 0)
|| (mode == Mode.WRITE && s < 0)) {
int state = ranges.computeIfAbsent(range, $ -> 0);
if(state == 0 || (mode == Mode.READ && state > 0)
|| (mode == Mode.WRITE && state < 0)) {
throw new IllegalMonitorStateException();
}
if(state.compareAndSet(s,
s + (mode == Mode.READ ? 1 : -1))) {
undos.add(state);
if(ranges.replace(range, state,
state + (mode == Mode.READ ? 1 : -1))) {
undos.add(range);
}
else {
// The lock has been newly acquired or released
// since we checked the state, so try to unlock
// again
for (AtomicInteger undo : undos) {
undo.addAndGet(mode == Mode.READ ? -1 : 1);
for (Range<Value> undo : undos) {
for (;;) {
int s = ranges.get(undo);
if(ranges.replace(undo, s,
s + (mode == Mode.READ ? -1 : 1))) {
break;
}
else {
continue;
}
}
}
continue outer;
}
}
// Remove previously locked ranges that remain unlocked.
for (Range<Value> range : token.ranges()) {
ranges.remove(range, 0);
}
break;
}
Thread t;
Expand All @@ -817,6 +824,11 @@ public void unlock() {

}

/**
* An enum that tracks the locking mode.
*
* @author Jeff Nelson
*/
enum Mode {
READ, WRITE
}
Expand Down Expand Up @@ -896,5 +908,4 @@ private LockReference(Object lock) {
this.count = new AtomicInteger(0);
}
}

}

0 comments on commit 87cf237

Please sign in to comment.