From 94416673cb214c2f2ce6b1687439805af594dcca Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 15 Aug 2024 15:29:17 +0800 Subject: [PATCH] fix(s3stream): fix dead lock issue Signed-off-by: SSpirits --- .../automq/stream/s3/wal/impl/object/RecordAccumulator.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java index 582149ac65..fb40af31e9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java @@ -155,7 +155,7 @@ public void start() { .filter(uploadTime -> time.nanoseconds() - uploadTime > DEFAULT_UPLOAD_WARNING_TIMEOUT) .count(); if (count > 0) { - log.warn("Found {} pending upload tasks exceed 5s.", count); + log.error("Found {} pending upload tasks exceed 5s.", count); } } catch (Throwable ignore) { } @@ -528,7 +528,8 @@ assert record != null; if (throwable instanceof WALFencedException) { List uploadedRecords = uploadMap.remove(firstOffset); Throwable finalThrowable = throwable; - uploadedRecords.forEach(record -> record.future.completeExceptionally(finalThrowable)); + // Release lock and complete future in callback thread. + callbackService.submit(() -> uploadedRecords.forEach(record -> record.future.completeExceptionally(finalThrowable))); } else if (throwable != null) { // Never fail the write task, the under layer storage will retry forever. log.error("[Bug] Failed to write records to S3: {}", firstOffset, throwable);