Skip to content

Commit

Permalink
refactor(s3stream): only lock the deltaWAL.append
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Dec 20, 2023
1 parent a27786d commit c9647fc
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,18 +289,19 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
return true;
}
WriteAheadLog.AppendResult appendResult;
Lock lock = confirmOffsetCalculator.addLock();
lock.lock();
try {
try {
StreamRecordBatch streamRecord = request.record;
streamRecord.retain();
appendResult = deltaWAL.append(streamRecord.encoded());
lock.unlock();
Lock lock = confirmOffsetCalculator.addLock();
lock.lock();
try {
appendResult = deltaWAL.append(streamRecord.encoded());
} finally {
lock.unlock();
}
} catch (WriteAheadLog.OverCapacityException e) {
// the WAL write data align with block, 'WAL is full but LogCacheBlock is not full' may happen.
// the read lock must release before write lock https://github.com/AutoMQ/automq-for-kafka/issues/581
lock.unlock();
confirmOffsetCalculator.update();
forceUpload(LogCache.MATCH_ALL_STREAMS);
if (!fromBackoff) {
Expand All @@ -315,7 +316,6 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
request.offset = appendResult.recordOffset();
confirmOffsetCalculator.add(request);
} catch (Throwable e) {
lock.unlock();
LOGGER.error("[UNEXPECTED] append WAL fail", e);
request.cf.completeExceptionally(e);
return false;
Expand Down

0 comments on commit c9647fc

Please sign in to comment.