From 673026ac07cd5fb0d286f9a180fd3d3b635177a0 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Thu, 24 Oct 2024 16:24:20 -0500 Subject: [PATCH 1/2] fix(metrics): modify bulk metric for document missing --- .../elasticsearch/update/BulkListener.java | 18 ++++++++++--- .../update/BulkListenerTest.java | 27 +++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java index 485b95192389e..3c5c5f4b35008 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java @@ -11,10 +11,12 @@ import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.support.WriteRequest; +import org.opensearch.index.engine.DocumentMissingException; @Slf4j public class BulkListener implements BulkProcessor.Listener { private static final Map INSTANCES = new HashMap<>(); + private static final String DOCUMENT_MISSING = "document_missing"; public static BulkListener getInstance() { return INSTANCES.computeIfAbsent(null, BulkListener::new); @@ -84,19 +86,27 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) private static void incrementMetrics(BulkResponse response) { Arrays.stream(response.getItems()) - .map(req -> buildMetricName(req.getOpType(), req.status().name())) + .map( + req -> + buildMetricName(req.getOpType(), req.status().name(), req.getFailure().getCause())) .forEach(metricName -> MetricUtils.counter(BulkListener.class, metricName).inc()); } private static void incrementMetrics(BulkRequest request, Throwable failure) { request.requests().stream() - .map(req -> buildMetricName(req.opType(), "exception")) + .map(req -> buildMetricName(req.opType(), "exception", failure)) .forEach( metricName -> MetricUtils.exceptionCounter(BulkListener.class, metricName, failure)); } - private static String buildMetricName(DocWriteRequest.OpType opType, String status) { - return opType.getLowercase() + MetricUtils.DELIMITER + status.toLowerCase(); + private static String buildMetricName( + DocWriteRequest.OpType opType, String status, Throwable cause) { + StringBuilder sb = new StringBuilder(); + sb.append(opType.getLowercase()).append(MetricUtils.DELIMITER).append(status.toLowerCase()); + if (cause instanceof DocumentMissingException) { + sb.append(MetricUtils.DELIMITER).append(DOCUMENT_MISSING); + } + return sb.toString(); } public static String buildBulkRequestSummary(BulkRequest request) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java b/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java index e13c2d9fd1005..b61f7b40110fe 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java @@ -36,4 +36,31 @@ public void testDefaultPolicy() { test.beforeBulk(0L, mockRequest2); verify(mockRequest2, times(1)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } + + @Test + public void testException() { + BulkListener test = BulkListener.getInstance(); + + BulkRequest mockRequest = Mockito.mock(BulkRequest.class); + BulkResponse mockResponse = Mockito.mock(BulkResponse.class); + BulkItemResponse bulkItemResponse = Mockito.mock(BulkItemResponse.class); + BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] {bulkItemResponse}; + BulkItemResponse.Failure mockFailure = Mockito.mock(BulkItemResponse.Failure.class); + DocumentMissingException mockException = Mockito.mock(DocumentMissingException.class); + Mockito.when(mockResponse.hasFailures()).thenReturn(true); + Mockito.when(mockResponse.getItems()).thenReturn(bulkItemResponses); + Mockito.when(bulkItemResponse.getFailure()).thenReturn(mockFailure); + Mockito.when(mockFailure.getCause()).thenReturn(mockException); + Mockito.when(bulkItemResponse.getOpType()).thenReturn(DocWriteRequest.OpType.UPDATE); + Mockito.when(bulkItemResponse.status()).thenReturn(RestStatus.NOT_FOUND); + Mockito.when(mockResponse.getTook()).thenReturn(new TimeValue(1L)); + test.afterBulk(0L, mockRequest, mockResponse); + String metricName = + DocWriteRequest.OpType.UPDATE.getLowercase() + + "_" + + RestStatus.NOT_FOUND.name().toLowerCase() + + "_" + + "document_missing"; + assertEquals(MetricUtils.counter(BulkListener.class, metricName).getCount(), 1L); + } } From bc4676b341f942008663c284dd6a2e3aefcb7f99 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Thu, 24 Oct 2024 17:44:31 -0500 Subject: [PATCH 2/2] missing imports --- .../metadata/elasticsearch/update/BulkListenerTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java b/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java index b61f7b40110fe..48f72d3380bf1 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java @@ -8,9 +8,16 @@ import static org.testng.Assert.assertNotNull; import com.linkedin.metadata.search.elasticsearch.update.BulkListener; +import com.linkedin.metadata.utils.metrics.MetricUtils; import org.mockito.Mockito; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.support.WriteRequest; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.engine.DocumentMissingException; import org.testng.annotations.Test; public class BulkListenerTest {