diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java index 048172bea1..109868f698 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java @@ -5,9 +5,12 @@ package org.opensearch.dataprepper; +import com.google.protobuf.Any; +import com.google.protobuf.Duration; +import com.google.rpc.RetryInfo; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; +import com.linecorp.armeria.common.grpc.GoogleGrpcExceptionHandlerFunction; import com.linecorp.armeria.server.RequestTimeoutException; import io.grpc.Metadata; import io.grpc.Status; @@ -24,7 +27,7 @@ import java.util.concurrent.TimeoutException; -public class GrpcRequestExceptionHandler implements GrpcStatusFunction { +public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFunction { private static final Logger LOG = LoggerFactory.getLogger(GrpcRequestExceptionHandler.class); static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full."; @@ -46,44 +49,52 @@ public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) { } @Override - public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) { - final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception; - + public com.google.rpc.@Nullable Status applyStatusProto(RequestContext ctx, Throwable throwable, + Metadata metadata) { + final Throwable exceptionCause = throwable instanceof BufferWriteException ? throwable.getCause() : throwable; return handleExceptions(exceptionCause); } - private Status handleExceptions(final Throwable e) { + private com.google.rpc.Status handleExceptions(final Throwable e) { String message = e.getMessage(); if (e instanceof RequestTimeoutException || e instanceof TimeoutException) { requestTimeoutsCounter.increment(); - return createStatus(e, Status.RESOURCE_EXHAUSTED); + return createStatus(e, Status.Code.RESOURCE_EXHAUSTED); } else if (e instanceof SizeOverflowException) { requestsTooLargeCounter.increment(); - return createStatus(e, Status.RESOURCE_EXHAUSTED); + return createStatus(e, Status.Code.RESOURCE_EXHAUSTED); } else if (e instanceof BadRequestException) { badRequestsCounter.increment(); - return createStatus(e, Status.INVALID_ARGUMENT); + return createStatus(e, Status.Code.INVALID_ARGUMENT); } else if ((e instanceof StatusRuntimeException) && (message.contains("Invalid protobuf byte sequence") || message.contains("Can't decode compressed frame"))) { badRequestsCounter.increment(); - return createStatus(e, Status.INVALID_ARGUMENT); + return createStatus(e, Status.Code.INVALID_ARGUMENT); } else if (e instanceof RequestCancelledException) { requestTimeoutsCounter.increment(); - return createStatus(e, Status.CANCELLED); + return createStatus(e, Status.Code.CANCELLED); } internalServerErrorCounter.increment(); LOG.error("Unexpected exception handling gRPC request", e); - return createStatus(e, Status.INTERNAL); + return createStatus(e, Status.Code.INTERNAL); } - private Status createStatus(final Throwable e, final Status status) { - final String message; + private com.google.rpc.Status createStatus(final Throwable e, final Status.Code code) { + com.google.rpc.Status.Builder builder = com.google.rpc.Status.newBuilder().setCode(code.value()); if (e instanceof RequestTimeoutException) { - message = ARMERIA_REQUEST_TIMEOUT_MESSAGE; + builder.setMessage(ARMERIA_REQUEST_TIMEOUT_MESSAGE); } else { - message = e.getMessage() == null ? status.getCode().name() : e.getMessage(); + builder.setMessage(e.getMessage() == null ? code.name() :e.getMessage()); + } + if (code == Status.Code.RESOURCE_EXHAUSTED) { + builder.addDetails(Any.pack(createRetryInfo())); } + return builder.build(); + } - return status.withDescription(message); + // TODO: Implement logic for the response retry delay to be sent with the retry info + private RetryInfo createRetryInfo() { + Duration.Builder duration = Duration.newBuilder().setSeconds(0L).setNanos(100_000_000); + return RetryInfo.newBuilder().setRetryDelay(duration).build(); } } diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java index 7100891d3a..a23c8f3a20 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.rpc.RetryInfo; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.server.RequestTimeoutException; import io.grpc.Metadata; @@ -13,6 +15,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.exceptions.BadRequestException; @@ -22,9 +27,11 @@ import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import java.io.IOException; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeoutException; +import static com.linecorp.armeria.internal.common.grpc.MetadataUtil.GRPC_STATUS_DETAILS_BIN_KEY; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.times; @@ -55,6 +62,9 @@ public class GrpcRequestExceptionHandlerTest { @Mock private Metadata metadata; + @Captor + private ArgumentCaptor status; + private GrpcRequestExceptionHandler grpcRequestExceptionHandler; @BeforeEach @@ -99,6 +109,22 @@ public void testHandleTimeoutException() { assertThat(messageStatus.getDescription(), equalTo(exceptionMessage)); verify(requestTimeoutsCounter, times(2)).increment(); + + // TODO: Adjust to retry delay logic + verify(metadata, times(2)).put(ArgumentMatchers.eq(GRPC_STATUS_DETAILS_BIN_KEY), status.capture()); + assertThat(status.getValue().getDetailsCount(), equalTo(1)); + + status.getAllValues().stream().map(com.google.rpc.Status::getDetailsList).flatMap(List::stream).map(e -> { + try { + return e.unpack( + RetryInfo.class); + } catch (InvalidProtocolBufferException ex) { + throw new AssertionError("unxepected status detail item",ex); + } + }).forEach(info -> { + assertThat(info.getRetryDelay().getSeconds(), equalTo(0L)); + assertThat(info.getRetryDelay().getNanos(), equalTo(100_000_000)); + }); } @Test diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java index 5f5428489f..b37406e773 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java @@ -110,7 +110,7 @@ public void start(Buffer> buffer) { .builder() .useClientTimeoutHeader(false) .useBlockingTaskExecutor(true) - .exceptionMapping(requestExceptionHandler); + .exceptionHandler(requestExceptionHandler); final MethodDescriptor methodDescriptor = LogsServiceGrpc.getExportMethod(); final String oTelLogsSourcePath = oTelLogsSourceConfig.getPath(); diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java index 29d3536259..2ce4daba91 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java @@ -19,7 +19,6 @@ import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.SessionProtocol; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; @@ -55,6 +54,7 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.GrpcRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; import org.opensearch.dataprepper.metrics.MetricNames; @@ -521,7 +521,8 @@ void start_with_Health_configured_includes_HealthCheck_service() throws IOExcept grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder); when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder); when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); - when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.exceptionHandler(any( + GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder); when(server.stop()).thenReturn(completableFuture); final Path certFilePath = Path.of("data/certificate/test_cert.crt"); @@ -563,7 +564,8 @@ void start_without_Health_configured_does_not_include_HealthCheck_service() thro grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder); when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder); when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); - when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.exceptionHandler(any( + GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder); when(server.stop()).thenReturn(completableFuture); final Path certFilePath = Path.of("data/certificate/test_cert.crt"); diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 43e2e1f92d..200d4e87f2 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -112,8 +112,7 @@ public void start(Buffer> buffer) { final GrpcServiceBuilder grpcServiceBuilder = GrpcService .builder() .useClientTimeoutHeader(false) - .useBlockingTaskExecutor(true) - .exceptionMapping(requestExceptionHandler); + .useBlockingTaskExecutor(true).exceptionHandler(requestExceptionHandler); final MethodDescriptor methodDescriptor = MetricsServiceGrpc.getExportMethod(); final String oTelMetricsSourcePath = oTelMetricsSourceConfig.getPath(); diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index d03f84f07a..9972a81de8 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -20,7 +20,6 @@ import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.SessionProtocol; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; @@ -61,6 +60,7 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.GrpcRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; import org.opensearch.dataprepper.metrics.MetricNames; @@ -197,7 +197,8 @@ public void beforeEach() { lenient().when(grpcServiceBuilder.addService(any(BindableService.class))).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.useBlockingTaskExecutor(anyBoolean())).thenReturn(grpcServiceBuilder); - lenient().when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder); + lenient().when(grpcServiceBuilder.exceptionHandler(any( + GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.build()).thenReturn(grpcService); MetricsTestUtil.initMetrics(); pluginMetrics = PluginMetrics.fromNames("otel_metrics", "pipeline"); diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java index 077e4bc879..a4f3c1f8fb 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java @@ -113,7 +113,7 @@ public void start(Buffer> buffer) { .builder() .useClientTimeoutHeader(false) .useBlockingTaskExecutor(true) - .exceptionMapping(requestExceptionHandler); + .exceptionHandler(requestExceptionHandler); final MethodDescriptor methodDescriptor = TraceServiceGrpc.getExportMethod(); final String oTelTraceSourcePath = oTelTraceSourceConfig.getPath(); diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java index 9873cc6611..3dfd938657 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java @@ -20,7 +20,6 @@ import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.SessionProtocol; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; @@ -55,6 +54,7 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.GrpcRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; import org.opensearch.dataprepper.metrics.MetricNames; @@ -202,7 +202,8 @@ void beforeEach() { lenient().when(grpcServiceBuilder.addService(any(BindableService.class))).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.useBlockingTaskExecutor(anyBoolean())).thenReturn(grpcServiceBuilder); - lenient().when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder); + lenient().when(grpcServiceBuilder.exceptionHandler(any( + GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.build()).thenReturn(grpcService); lenient().when(authenticationProvider.getHttpAuthenticationService()).thenCallRealMethod();