From 7b2b2f1b7617dd642bfbaa3d068b05f2653f70c5 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 14 Aug 2024 14:05:17 +0800 Subject: [PATCH 1/5] feat(s3stream): optimize the execution sequence to avoid deep dependencies --- .../src/main/java/com/automq/stream/s3/S3Stream.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index f08439f875..3fb77249fa 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -149,6 +149,7 @@ public CompletableFuture append(AppendContext context, RecordBatch long startTimeNanos = System.nanoTime(); readLock.lock(); try { + CompletableFuture result = new CompletableFuture<>(); CompletableFuture cf = exec(() -> { if (networkInboundLimiter != null) { networkInboundLimiter.consume(ThrottleStrategy.BYPASS, recordBatch.rawPayload().remaining()); @@ -166,8 +167,14 @@ public CompletableFuture append(AppendContext context, RecordBatch StreamOperationStats.getInstance().appendStreamLatency.record(TimerUtil.durationElapsedAs(startTimeNanos, TimeUnit.NANOSECONDS)); pendingAppends.remove(cf); pendingAppendTimestamps.pop(); + + if (ex != null) { + result.completeExceptionally(ex); + } else { + result.complete(nil); + } }); - return cf; + return result; } finally { readLock.unlock(); } From 93fbc8e90ee84edbd99b1c0e7c2a56546882de91 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 14 Aug 2024 14:14:37 +0800 Subject: [PATCH 2/5] feat(s3stream): optimize the execution sequence to avoid deep dependencies --- s3stream/src/main/java/com/automq/stream/s3/S3Stream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 3fb77249fa..0302aa2a47 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -163,7 +163,7 @@ public CompletableFuture append(AppendContext context, RecordBatch }, LOGGER, "append"); pendingAppends.add(cf); pendingAppendTimestamps.push(startTimeNanos); - cf.whenComplete((nil, ex) -> { + cf.whenComplete((res, ex) -> { StreamOperationStats.getInstance().appendStreamLatency.record(TimerUtil.durationElapsedAs(startTimeNanos, TimeUnit.NANOSECONDS)); pendingAppends.remove(cf); pendingAppendTimestamps.pop(); @@ -171,7 +171,7 @@ public CompletableFuture append(AppendContext context, RecordBatch if (ex != null) { result.completeExceptionally(ex); } else { - result.complete(nil); + result.complete(res); } }); return result; From 02e02b56563ef3f6376941bdf2a379b98ac8f058 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 14 Aug 2024 17:46:16 +0800 Subject: [PATCH 3/5] feat(s3stream): optimize the execution sequence --- .../src/main/java/com/automq/stream/s3/S3Stream.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 0302aa2a47..8022c9b2b3 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -163,18 +163,12 @@ public CompletableFuture append(AppendContext context, RecordBatch }, LOGGER, "append"); pendingAppends.add(cf); pendingAppendTimestamps.push(startTimeNanos); - cf.whenComplete((res, ex) -> { + + return cf.whenComplete((res, ex) -> { StreamOperationStats.getInstance().appendStreamLatency.record(TimerUtil.durationElapsedAs(startTimeNanos, TimeUnit.NANOSECONDS)); pendingAppends.remove(cf); pendingAppendTimestamps.pop(); - - if (ex != null) { - result.completeExceptionally(ex); - } else { - result.complete(res); - } }); - return result; } finally { readLock.unlock(); } From 6c4751b65dc283ab9807c26a29b1e6e4e368e9af Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 14 Aug 2024 17:47:31 +0800 Subject: [PATCH 4/5] feat(s3stream): optimize the execution sequence --- s3stream/src/main/java/com/automq/stream/s3/S3Stream.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 8022c9b2b3..ddffa10e82 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -163,8 +163,7 @@ public CompletableFuture append(AppendContext context, RecordBatch }, LOGGER, "append"); pendingAppends.add(cf); pendingAppendTimestamps.push(startTimeNanos); - - return cf.whenComplete((res, ex) -> { + return cf.whenComplete((nil, ex) -> { StreamOperationStats.getInstance().appendStreamLatency.record(TimerUtil.durationElapsedAs(startTimeNanos, TimeUnit.NANOSECONDS)); pendingAppends.remove(cf); pendingAppendTimestamps.pop(); From daa8e855b081e779c3db3b6e3029ecd73560791f Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Thu, 15 Aug 2024 08:38:10 +0800 Subject: [PATCH 5/5] feat(s3stream): optimize the execution sequence --- s3stream/src/main/java/com/automq/stream/s3/S3Stream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index ddffa10e82..168542a4a5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -149,7 +149,6 @@ public CompletableFuture append(AppendContext context, RecordBatch long startTimeNanos = System.nanoTime(); readLock.lock(); try { - CompletableFuture result = new CompletableFuture<>(); CompletableFuture cf = exec(() -> { if (networkInboundLimiter != null) { networkInboundLimiter.consume(ThrottleStrategy.BYPASS, recordBatch.rawPayload().remaining());