Skip to content

Commit

Permalink
WIP Add RequestInfo Delay Calculator
Browse files Browse the repository at this point in the history
Implementation of exponential backoff. Idea is to start with a minimum
delay on the first time-out or circuit breaker activation. If the next such
event happens within twice the last delay after the previous event, double
the delay until a maximum delay is reached. Use the maximum delay from
then on, until a sufficiently long period (maximum delay) without an event
happens. Than the delay is reset to minimum.

TODO: Make minimum and maximum delay configurable.
Signed-off-by: Karsten Schnitter <[email protected]>
  • Loading branch information
KarstenSchnitter committed Mar 20, 2024
1 parent 1392056 commit 55b91da
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.opensearch.dataprepper;

import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.rpc.RetryInfo;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GoogleGrpcExceptionHandlerFunction;
Expand All @@ -23,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFunction {
Expand All @@ -38,12 +37,14 @@ public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFu
private final Counter badRequestsCounter;
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;
private final GrpcRetryInfoCalculator retryInfoCalculator;

public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
retryInfoCalculator = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(2));
}

@Override
Expand Down Expand Up @@ -81,14 +82,8 @@ private com.google.rpc.Status createStatus(final Throwable e, final Status.Code
builder.setMessage(e.getMessage() == null ? code.name() :e.getMessage());
}
if (code == Status.Code.RESOURCE_EXHAUSTED) {
builder.addDetails(Any.pack(createRetryInfo()));
builder.addDetails(Any.pack(retryInfoCalculator.createRetryInfo()));
}
return builder.build();
}

// TODO: Implement logic for the responsse retry delay to be sent with the retry info
private RetryInfo createRetryInfo() {
Duration.Builder duration = Duration.newBuilder().setSeconds(0L).setNanos(100_000_000);
return RetryInfo.newBuilder().setRetryDelay(duration).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper;

import com.google.rpc.RetryInfo;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

class GrpcRetryInfoCalculator {

private final Duration minimumDelay;
private final Duration maximumDelay;

private final AtomicReference<Instant> lastTimeCalled;
private final AtomicReference<Duration> nextDelay;

GrpcRetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) {
this.minimumDelay = minimumDelay;
this.maximumDelay = maximumDelay;
this.lastTimeCalled = new AtomicReference<>(Instant.now());
this.nextDelay = new AtomicReference<>(minimumDelay);
}

private static RetryInfo createProtoResult(Duration delay) {
return RetryInfo.newBuilder().setRetryDelay(mapDuration(delay)).build();
}

private static Duration minDuration(Duration left, Duration right) {
return left.compareTo(right) <= 0 ? left : right;
}

private static com.google.protobuf.Duration.Builder mapDuration(Duration duration) {
return com.google.protobuf.Duration.newBuilder().setSeconds(duration.getSeconds()).setNanos(duration.getNano());
}

RetryInfo createRetryInfo() {
Instant now = Instant.now();
// Is the last time we got called longer ago than the next delay?
if (lastTimeCalled.getAndSet(now).isBefore(now.minus(nextDelay.get()))) {
// Use minimum delay and reset the saved delay
nextDelay.set(minimumDelay);
return createProtoResult(minimumDelay);
}
Duration delay = nextDelay.getAndUpdate(d -> minDuration(maximumDelay, d.multipliedBy(2)));
return createProtoResult(delay);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Any;
import com.google.rpc.RetryInfo;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.server.RequestTimeoutException;
Expand All @@ -27,13 +27,14 @@
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static com.linecorp.armeria.internal.common.grpc.MetadataUtil.GRPC_STATUS_DETAILS_BIN_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -110,21 +111,11 @@ public void testHandleTimeoutException() {

verify(requestTimeoutsCounter, times(2)).increment();

// TODO: Adjust to retry delay logic
verify(metadata, times(2)).put(ArgumentMatchers.eq(GRPC_STATUS_DETAILS_BIN_KEY), status.capture());
assertThat(status.getValue().getDetailsCount(), equalTo(1));

status.getAllValues().stream().map(com.google.rpc.Status::getDetailsList).flatMap(List::stream).map(e -> {
try {
return e.unpack(
RetryInfo.class);
} catch (InvalidProtocolBufferException ex) {
throw new AssertionError("unxepected status detail item",ex);
}
}).forEach(info -> {
assertThat(info.getRetryDelay().getSeconds(), equalTo(0L));
assertThat(info.getRetryDelay().getNanos(), equalTo(100_000_000));
});
for (com.google.rpc.Status currentStatus: status.getAllValues()) {
Optional<Any> retryInfo = currentStatus.getDetailsList().stream().filter(d -> d.is(RetryInfo.class)).findFirst();
assertTrue(retryInfo.isPresent(), "No RetryInfo at status:\n" + currentStatus.toString());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.opensearch.dataprepper;

import com.google.rpc.RetryInfo;
import org.junit.jupiter.api.Test;

import java.time.Duration;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

public class GrpcRetryInfoCalculatorTest {

@Test
public void testMinimumDelayOnFirstCall() {
RetryInfo retryInfo = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(1)).createRetryInfo();

assertThat(retryInfo.getRetryDelay().getNanos(), equalTo(100_000_000));
assertThat(retryInfo.getRetryDelay().getSeconds(), equalTo(0L));
}

@Test
public void testExponentialBackoff() {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(10));
RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(2L));
assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(4L));
}

@Test
public void testUsesMaximumAsLongestDelay() {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(2));
RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(2L));
assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(2L));
}

@Test
public void testResetAfterDelayWearsOff() throws InterruptedException {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofNanos(1_000_000), Duration.ofSeconds(1));
RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
Thread.sleep(5);
RetryInfo retryInfo3 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getNanos(), equalTo(1_000_000));
assertThat(retryInfo2.getRetryDelay().getNanos(), equalTo(2_000_000));
assertThat(retryInfo3.getRetryDelay().getNanos(), equalTo(1_000_000));
}
}

0 comments on commit 55b91da

Please sign in to comment.