Skip to content

Commit

Permalink
WIP Send RetryInfo on OTel Timeouts
Browse files Browse the repository at this point in the history
DataPrepper is sending `RESOURCE_EXHAUSTED` gRPC responses
whenever a buffer is full or a circuit breaker is active. These statuses do
not contain a retry info. In the OpenTelemetry protocol, this implies a
non-retryable error, that will lead to message drops, e.g. in the OTel
collector. To apply proper back pressure in these scenarios a retry info is
added to the status.

Signed-off-by: Karsten Schnitter <[email protected]>
  • Loading branch information
KarstenSchnitter committed Sep 18, 2024
1 parent a22c9cb commit 7dd334b
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

package org.opensearch.dataprepper;

import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.rpc.RetryInfo;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.common.grpc.GoogleGrpcExceptionHandlerFunction;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -24,7 +27,7 @@

import java.util.concurrent.TimeoutException;

public class GrpcRequestExceptionHandler implements GrpcStatusFunction {
public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFunction {
private static final Logger LOG = LoggerFactory.getLogger(GrpcRequestExceptionHandler.class);
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";

Expand All @@ -46,44 +49,52 @@ public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) {
}

@Override
public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) {
final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception;

public com.google.rpc.@Nullable Status applyStatusProto(RequestContext ctx, Throwable throwable,
Metadata metadata) {
final Throwable exceptionCause = throwable instanceof BufferWriteException ? throwable.getCause() : throwable;
return handleExceptions(exceptionCause);
}

private Status handleExceptions(final Throwable e) {
private com.google.rpc.Status handleExceptions(final Throwable e) {
String message = e.getMessage();
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
return createStatus(e, Status.Code.RESOURCE_EXHAUSTED);
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
return createStatus(e, Status.Code.RESOURCE_EXHAUSTED);
} else if (e instanceof BadRequestException) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
return createStatus(e, Status.Code.INVALID_ARGUMENT);
} else if ((e instanceof StatusRuntimeException) && (message.contains("Invalid protobuf byte sequence") || message.contains("Can't decode compressed frame"))) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
return createStatus(e, Status.Code.INVALID_ARGUMENT);
} else if (e instanceof RequestCancelledException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.CANCELLED);
return createStatus(e, Status.Code.CANCELLED);
}

internalServerErrorCounter.increment();
LOG.error("Unexpected exception handling gRPC request", e);
return createStatus(e, Status.INTERNAL);
return createStatus(e, Status.Code.INTERNAL);
}

private Status createStatus(final Throwable e, final Status status) {
final String message;
private com.google.rpc.Status createStatus(final Throwable e, final Status.Code code) {
com.google.rpc.Status.Builder builder = com.google.rpc.Status.newBuilder().setCode(code.value());
if (e instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
builder.setMessage(ARMERIA_REQUEST_TIMEOUT_MESSAGE);
} else {
message = e.getMessage() == null ? status.getCode().name() : e.getMessage();
builder.setMessage(e.getMessage() == null ? code.name() :e.getMessage());
}
if (code == Status.Code.RESOURCE_EXHAUSTED) {
builder.addDetails(Any.pack(createRetryInfo()));
}
return builder.build();
}

return status.withDescription(message);
// TODO: Implement logic for the response retry delay to be sent with the retry info
private RetryInfo createRetryInfo() {
Duration.Builder duration = Duration.newBuilder().setSeconds(0L).setNanos(100_000_000);
return RetryInfo.newBuilder().setRetryDelay(duration).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.RetryInfo;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
Expand All @@ -13,6 +15,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.exceptions.BadRequestException;
Expand All @@ -22,9 +27,11 @@
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static com.linecorp.armeria.internal.common.grpc.MetadataUtil.GRPC_STATUS_DETAILS_BIN_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -55,6 +62,9 @@ public class GrpcRequestExceptionHandlerTest {
@Mock
private Metadata metadata;

@Captor
private ArgumentCaptor<com.google.rpc.Status> status;

private GrpcRequestExceptionHandler grpcRequestExceptionHandler;

@BeforeEach
Expand Down Expand Up @@ -99,6 +109,22 @@ public void testHandleTimeoutException() {
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestTimeoutsCounter, times(2)).increment();

// TODO: Adjust to retry delay logic
verify(metadata, times(2)).put(ArgumentMatchers.eq(GRPC_STATUS_DETAILS_BIN_KEY), status.capture());
assertThat(status.getValue().getDetailsCount(), equalTo(1));

status.getAllValues().stream().map(com.google.rpc.Status::getDetailsList).flatMap(List::stream).map(e -> {
try {
return e.unpack(
RetryInfo.class);
} catch (InvalidProtocolBufferException ex) {
throw new AssertionError("unxepected status detail item",ex);
}
}).forEach(info -> {
assertThat(info.getRetryDelay().getSeconds(), equalTo(0L));
assertThat(info.getRetryDelay().getNanos(), equalTo(100_000_000));
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void start(Buffer<Record<Object>> buffer) {
.builder()
.useClientTimeoutHeader(false)
.useBlockingTaskExecutor(true)
.exceptionMapping(requestExceptionHandler);
.exceptionHandler(requestExceptionHandler);

final MethodDescriptor<ExportLogsServiceRequest, ExportLogsServiceResponse> methodDescriptor = LogsServiceGrpc.getExportMethod();
final String oTelLogsSourcePath = oTelLogsSourceConfig.getPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
Expand Down Expand Up @@ -55,6 +54,7 @@
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig;
import org.opensearch.dataprepper.metrics.MetricNames;
Expand Down Expand Up @@ -521,7 +521,8 @@ void start_with_Health_configured_includes_HealthCheck_service() throws IOExcept
grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.exceptionHandler(any(
GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder);

when(server.stop()).thenReturn(completableFuture);
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
Expand Down Expand Up @@ -563,7 +564,8 @@ void start_without_Health_configured_does_not_include_HealthCheck_service() thro
grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.exceptionHandler(any(
GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder);

when(server.stop()).thenReturn(completableFuture);
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ public void start(Buffer<Record<? extends Metric>> buffer) {
final GrpcServiceBuilder grpcServiceBuilder = GrpcService
.builder()
.useClientTimeoutHeader(false)
.useBlockingTaskExecutor(true)
.exceptionMapping(requestExceptionHandler);
.useBlockingTaskExecutor(true).exceptionHandler(requestExceptionHandler);

final MethodDescriptor<ExportMetricsServiceRequest, ExportMetricsServiceResponse> methodDescriptor = MetricsServiceGrpc.getExportMethod();
final String oTelMetricsSourcePath = oTelMetricsSourceConfig.getPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
Expand Down Expand Up @@ -61,6 +60,7 @@
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig;
import org.opensearch.dataprepper.metrics.MetricNames;
Expand Down Expand Up @@ -197,7 +197,8 @@ public void beforeEach() {
lenient().when(grpcServiceBuilder.addService(any(BindableService.class))).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.useBlockingTaskExecutor(anyBoolean())).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.exceptionHandler(any(
GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.build()).thenReturn(grpcService);
MetricsTestUtil.initMetrics();
pluginMetrics = PluginMetrics.fromNames("otel_metrics", "pipeline");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void start(Buffer<Record<Object>> buffer) {
.builder()
.useClientTimeoutHeader(false)
.useBlockingTaskExecutor(true)
.exceptionMapping(requestExceptionHandler);
.exceptionHandler(requestExceptionHandler);

final MethodDescriptor<ExportTraceServiceRequest, ExportTraceServiceResponse> methodDescriptor = TraceServiceGrpc.getExportMethod();
final String oTelTraceSourcePath = oTelTraceSourceConfig.getPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
Expand Down Expand Up @@ -55,6 +54,7 @@
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig;
import org.opensearch.dataprepper.metrics.MetricNames;
Expand Down Expand Up @@ -202,7 +202,8 @@ void beforeEach() {
lenient().when(grpcServiceBuilder.addService(any(BindableService.class))).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.useBlockingTaskExecutor(anyBoolean())).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.exceptionHandler(any(
GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder);
lenient().when(grpcServiceBuilder.build()).thenReturn(grpcService);

lenient().when(authenticationProvider.getHttpAuthenticationService()).thenCallRealMethod();
Expand Down

0 comments on commit 7dd334b

Please sign in to comment.