Skip to content

Commit

Permalink
[fix][ml] Fix deadlock in PendingReadsManager (#23958)
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Feb 11, 2025
1 parent 40a3b38 commit 367faef
Showing 1 changed file with 42 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,73 +211,95 @@ private FindPendingReadOutcome findPendingRead(PendingReadKey key, ConcurrentMap
private class PendingRead {
final PendingReadKey key;
final ConcurrentMap<PendingReadKey, PendingRead> ledgerCache;
final List<ReadEntriesCallbackWithContext> callbacks = new ArrayList<>(1);
boolean completed = false;
final List<ReadEntriesCallbackWithContext> listeners = new ArrayList<>(1);
PendingReadState state = PendingReadState.INITIALISED;

enum PendingReadState {
INITIALISED,
ATTACHED,
COMPLETED
}

public PendingRead(PendingReadKey key,
ConcurrentMap<PendingReadKey, PendingRead> ledgerCache) {
this.key = key;
this.ledgerCache = ledgerCache;
}

public void attach(CompletableFuture<List<EntryImpl>> handle) {
public synchronized void attach(CompletableFuture<List<EntryImpl>> handle) {
if (state != PendingReadState.INITIALISED) {
// this shouldn't ever happen. this is here to prevent misuse in future changes
throw new IllegalStateException("Unexpected state " + state + " for PendingRead for key " + key);
}
state = PendingReadState.ATTACHED;
handle.whenComplete((entriesToReturn, error) -> {
// execute in the completing thread
completeAndRemoveFromCache();
// execute in the completing thread and return a copy of the listeners
List<ReadEntriesCallbackWithContext> callbacks = completeAndRemoveFromCache();
// execute the callbacks in the managed ledger executor
rangeEntryCache.getManagedLedger().getExecutor().execute(() -> {
if (error != null) {
readEntriesFailed(error);
readEntriesFailed(callbacks, error);
} else {
readEntriesComplete(entriesToReturn);
readEntriesComplete(callbacks, entriesToReturn);
}
});
});
}

private synchronized void completeAndRemoveFromCache() {
completed = true;
synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
Object ctx, long startEntry, long endEntry) {
if (state == PendingReadState.COMPLETED) {
return false;
}
listeners.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
return true;
}

private synchronized List<ReadEntriesCallbackWithContext> completeAndRemoveFromCache() {
state = PendingReadState.COMPLETED;
// When the read has completed, remove the instance from the ledgerCache map
// so that new reads will go to a new instance.
// this is required because we are going to do refcount management
// on the results of the callback
ledgerCache.remove(key, this);
// return a copy of the listeners
return List.copyOf(listeners);
}

private synchronized void readEntriesComplete(List<EntryImpl> entriesToReturn) {
// this method isn't synchronized since that could lead to deadlocks
private void readEntriesComplete(List<ReadEntriesCallbackWithContext> callbacks,
List<EntryImpl> entriesToReturn) {
if (callbacks.size() == 1) {
ReadEntriesCallbackWithContext first = callbacks.get(0);
if (first.startEntry == key.startEntry
&& first.endEntry == key.endEntry) {
// perfect match, no copy, this is the most common case
first.callback.readEntriesComplete((List) entriesToReturn,
first.ctx);
first.callback.readEntriesComplete((List) entriesToReturn, first.ctx);
} else {
first.callback.readEntriesComplete(
keepEntries(entriesToReturn, first.startEntry, first.endEntry),
first.ctx);
keepEntries(entriesToReturn, first.startEntry, first.endEntry), first.ctx);
}
} else {
for (ReadEntriesCallbackWithContext callback : callbacks) {
callback.callback.readEntriesComplete(
copyEntries(entriesToReturn, callback.startEntry, callback.endEntry),
callback.ctx);
copyEntries(entriesToReturn, callback.startEntry, callback.endEntry), callback.ctx);
}
for (EntryImpl entry : entriesToReturn) {
entry.release();
}
}
}

private synchronized void readEntriesFailed(Throwable error) {
// this method isn't synchronized since that could lead to deadlocks
private void readEntriesFailed(List<ReadEntriesCallbackWithContext> callbacks, Throwable error) {
for (ReadEntriesCallbackWithContext callback : callbacks) {
ManagedLedgerException mlException = createManagedLedgerException(error);
callback.callback.readEntriesFailed(mlException, callback.ctx);
}
}

private List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry));
private static List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry + 1));
for (EntryImpl entry : list) {
long entryId = entry.getEntryId();
if (startEntry <= entryId && entryId <= endEntry) {
Expand All @@ -289,7 +311,7 @@ private List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endE
return result;
}

private List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry, long endEntry) {
private static List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry, long endEntry) {
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry + 1));
for (EntryImpl entry : entriesToReturn) {
long entryId = entry.getEntryId();
Expand All @@ -300,15 +322,6 @@ private List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry
}
return result;
}

synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
Object ctx, long startEntry, long endEntry) {
if (completed) {
return false;
}
callbacks.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
return true;
}
}


Expand Down

0 comments on commit 367faef

Please sign in to comment.