diff --git a/pom.xml b/pom.xml index b5fb6d359..50f251825 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 32.0.1-jre 2.0.9 2.2 - 0.6.2-SNAPSHOT + 0.6.3-SNAPSHOT 23.5.26 diff --git a/s3stream/pom.xml b/s3stream/pom.xml index 90fcb2aec..9fb089ae9 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.automq.elasticstream s3stream - 0.6.2-SNAPSHOT + 0.6.3-SNAPSHOT 5.5.0 5.10.0 diff --git a/s3stream/src/main/java/com/automq/stream/api/exceptions/FastReadFailFastException.java b/s3stream/src/main/java/com/automq/stream/api/exceptions/FastReadFailFastException.java index 567b28e14..83afbe013 100644 --- a/s3stream/src/main/java/com/automq/stream/api/exceptions/FastReadFailFastException.java +++ b/s3stream/src/main/java/com/automq/stream/api/exceptions/FastReadFailFastException.java @@ -22,6 +22,6 @@ */ public class FastReadFailFastException extends StreamClientException { public FastReadFailFastException() { - super(ErrorCode.FAST_READ_FAIL_FAST, ""); + super(ErrorCode.FAST_READ_FAIL_FAST, "", false); } } diff --git a/s3stream/src/main/java/com/automq/stream/api/exceptions/StreamClientException.java b/s3stream/src/main/java/com/automq/stream/api/exceptions/StreamClientException.java index 815265d39..ad51756bb 100644 --- a/s3stream/src/main/java/com/automq/stream/api/exceptions/StreamClientException.java +++ b/s3stream/src/main/java/com/automq/stream/api/exceptions/StreamClientException.java @@ -32,6 +32,11 @@ public StreamClientException(int code, String str, Throwable e) { this.code = code; } + public StreamClientException(int code, String str, boolean writableStackTrace) { + super("code: " + code + ", " + str, null, false, writableStackTrace); + this.code = code; + } + public int getCode() { return this.code; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index df47b70f8..9c41df1f9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -26,6 +26,7 @@ import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; import com.automq.stream.api.Stream; +import com.automq.stream.api.exceptions.FastReadFailFastException; import com.automq.stream.api.exceptions.StreamClientException; import com.automq.stream.s3.cache.CacheAccessType; import com.automq.stream.s3.metrics.TimerUtil; @@ -190,7 +191,10 @@ public CompletableFuture fetch(long startOffset, long endOffset, in cf.whenComplete((rs, ex) -> { OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (ex != null) { - LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex); + Throwable cause = FutureUtil.cause(ex); + if (!(cause instanceof FastReadFailFastException)) { + LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex); + } } else if (networkOutboundLimiter != null) { long totalSize = rs.recordBatchList().stream().mapToLong(record -> record.rawPayload().remaining()).sum(); networkOutboundLimiter.forceConsume(totalSize);