Skip to content

Commit

Permalink
feat: support request level default timeout for common operation (#202)
Browse files Browse the repository at this point in the history
* feat: support request level default timeout for common operation

* upgrade github action lib

* Update AsyncOxiaClientImplTest.java

---------

Co-authored-by: Matteo Merli <[email protected]>
  • Loading branch information
mattisonchao and merlimat authored Jan 16, 2025
1 parent cf1f5ea commit c7e25d6
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
find . -type d -name "*surefire*" -exec cp --parents -R {} test-results/ \;
zip -r test-results.zip test-results
- name: Upload test results
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
if: failure()
with:
name: test-results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.streamnative.oxia.proto.ListResponse;
import io.streamnative.oxia.proto.RangeScanRequest;
import io.streamnative.oxia.proto.RangeScanResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -62,6 +63,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import lombok.NonNull;
Expand Down Expand Up @@ -105,8 +107,8 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
notificationManager,
readBatchManager,
writeBatchManager,
sessionManager);

sessionManager,
config.requestTimeout());
return shardManager.start().thenApply(v -> client);
}

Expand All @@ -118,6 +120,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
private final @NonNull BatchManager readBatchManager;
private final @NonNull BatchManager writeBatchManager;
private final @NonNull SessionManager sessionManager;
private final long requestTimeoutMs;
private volatile boolean closed;

private final Counter counterPutBytes;
Expand Down Expand Up @@ -152,7 +155,8 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
@NonNull NotificationManager notificationManager,
@NonNull BatchManager readBatchManager,
@NonNull BatchManager writeBatchManager,
@NonNull SessionManager sessionManager) {
@NonNull SessionManager sessionManager,
Duration requestTimeout) {
this.clientIdentifier = clientIdentifier;
this.instrumentProvider = instrumentProvider;
this.stubManager = stubManager;
Expand All @@ -162,6 +166,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
this.writeBatchManager = writeBatchManager;
this.sessionManager = sessionManager;
this.scheduledExecutor = scheduledExecutor;
this.requestTimeoutMs = requestTimeout.toMillis();

counterPutBytes =
instrumentProvider.newCounter(
Expand Down Expand Up @@ -288,18 +293,20 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
} catch (RuntimeException e) {
callback = CompletableFuture.failedFuture(e);
}
return callback.whenComplete(
(putResult, throwable) -> {
gaugePendingPutRequests.decrement();
gaugePendingPutBytes.add(-value.length);

if (throwable == null) {
counterPutBytes.add(value.length);
histogramPutLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramPutLatency.recordFailure(System.nanoTime() - startTime);
}
});
return callback
.orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS)
.whenComplete(
(putResult, throwable) -> {
gaugePendingPutRequests.decrement();
gaugePendingPutBytes.add(-value.length);

if (throwable == null) {
counterPutBytes.add(value.length);
histogramPutLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramPutLatency.recordFailure(System.nanoTime() - startTime);
}
});
}

private CompletableFuture<PutResult> internalPut(
Expand Down Expand Up @@ -355,7 +362,7 @@ private CompletableFuture<PutResult> internalPut(
});
}

return future;
return future.orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -381,15 +388,17 @@ private CompletableFuture<PutResult> internalPut(
} catch (RuntimeException e) {
callback.completeExceptionally(e);
}
return callback.whenComplete(
(putResult, throwable) -> {
gaugePendingDeleteRequests.decrement();
if (throwable == null) {
histogramDeleteLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramDeleteLatency.recordFailure(System.nanoTime() - startTime);
}
});
return callback
.orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS)
.whenComplete(
(putResult, throwable) -> {
gaugePendingDeleteRequests.decrement();
if (throwable == null) {
histogramDeleteLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramDeleteLatency.recordFailure(System.nanoTime() - startTime);
}
});
}

@Override
Expand Down Expand Up @@ -436,15 +445,17 @@ private CompletableFuture<PutResult> internalPut(
} catch (RuntimeException e) {
callback = CompletableFuture.failedFuture(e);
}
return callback.whenComplete(
(putResult, throwable) -> {
gaugePendingDeleteRangeRequests.decrement();
if (throwable == null) {
histogramDeleteRangeLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramDeleteRangeLatency.recordFailure(System.nanoTime() - startTime);
}
});
return callback
.orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS)
.whenComplete(
(putResult, throwable) -> {
gaugePendingDeleteRangeRequests.decrement();
if (throwable == null) {
histogramDeleteRangeLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramDeleteRangeLatency.recordFailure(System.nanoTime() - startTime);
}
});
}

@Override
Expand All @@ -464,18 +475,20 @@ private CompletableFuture<PutResult> internalPut(
} catch (RuntimeException e) {
callback.completeExceptionally(e);
}
return callback.whenComplete(
(getResult, throwable) -> {
gaugePendingGetRequests.decrement();
if (throwable == null) {
if (getResult != null) {
counterGetBytes.add(getResult.getValue().length);
}
histogramGetLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramGetLatency.recordFailure(System.nanoTime() - startTime);
}
});
return callback
.orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS)
.whenComplete(
(getResult, throwable) -> {
gaugePendingGetRequests.decrement();
if (throwable == null) {
if (getResult != null) {
counterGetBytes.add(getResult.getValue().length);
}
histogramGetLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramGetLatency.recordFailure(System.nanoTime() - startTime);
}
});
}

private void internalGet(
Expand Down Expand Up @@ -566,16 +579,18 @@ private void internalGetFloorCeiling(
} catch (Exception e) {
callback = CompletableFuture.failedFuture(e);
}
return callback.whenComplete(
(listResult, throwable) -> {
gaugePendingListRequests.decrement();
if (throwable == null) {
counterListBytes.add(listResult.stream().mapToInt(String::length).sum());
histogramListLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramListLatency.recordFailure(System.nanoTime() - startTime);
}
});
return callback
.orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS)
.whenComplete(
(listResult, throwable) -> {
gaugePendingListRequests.decrement();
if (throwable == null) {
counterListBytes.add(listResult.stream().mapToInt(String::length).sum());
histogramListLatency.recordSuccess(System.nanoTime() - startTime);
} else {
histogramListLatency.recordFailure(System.nanoTime() - startTime);
}
});
}

@Override
Expand Down
Loading

0 comments on commit c7e25d6

Please sign in to comment.