diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java index 582149ac65..fb40af31e9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java @@ -155,7 +155,7 @@ public void start() { .filter(uploadTime -> time.nanoseconds() - uploadTime > DEFAULT_UPLOAD_WARNING_TIMEOUT) .count(); if (count > 0) { - log.warn("Found {} pending upload tasks exceed 5s.", count); + log.error("Found {} pending upload tasks exceed 5s.", count); } } catch (Throwable ignore) { } @@ -528,7 +528,8 @@ assert record != null; if (throwable instanceof WALFencedException) { List uploadedRecords = uploadMap.remove(firstOffset); Throwable finalThrowable = throwable; - uploadedRecords.forEach(record -> record.future.completeExceptionally(finalThrowable)); + // Release lock and complete future in callback thread. + callbackService.submit(() -> uploadedRecords.forEach(record -> record.future.completeExceptionally(finalThrowable))); } else if (throwable != null) { // Never fail the write task, the under layer storage will retry forever. log.error("[Bug] Failed to write records to S3: {}", firstOffset, throwable);