diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index cc9c41f103..3427bdad36 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -128,9 +128,11 @@ public final class BulkRetryStrategy { static class BulkOperationRequestResponse { final AccumulatingBulkRequest bulkRequest; final BulkResponse response; - public BulkOperationRequestResponse(final AccumulatingBulkRequest bulkRequest, final BulkResponse response) { + final Exception exception; + public BulkOperationRequestResponse(final AccumulatingBulkRequest bulkRequest, final BulkResponse response, final Exception exception) { this.bulkRequest = bulkRequest; this.response = response; + this.exception = exception; } AccumulatingBulkRequest getBulkRequest() { return bulkRequest; @@ -138,6 +140,9 @@ AccumulatingBulkRequest getBulkRequest() { BulkResponse getResponse() { return response; } + String getExceptionMessage() { + return exception.getMessage(); + } } public BulkRetryStrategy(final RequestFunction, BulkResponse> requestFunction, @@ -194,6 +199,7 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte final Backoff backoff = Backoff.exponential(INITIAL_DELAY_MS, MAXIMUM_DELAY_MS).withMaxAttempts(maxRetries); BulkOperationRequestResponse operationResponse; BulkResponse response = null; + String exceptionMessage = ""; AccumulatingBulkRequest request = bulkRequest; int attempt = 1; do { @@ -201,9 +207,9 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte if (operationResponse != null) { final long delayMillis = backoff.nextDelayMillis(attempt++); request = operationResponse.getBulkRequest(); - response = operationResponse.getResponse(); + exceptionMessage = operationResponse.getExceptionMessage(); if (delayMillis < 0) { - RuntimeException e = new RuntimeException(String.format("Number of retries reached the limit of max retries (configured value %d)", maxRetries)); + RuntimeException e = new RuntimeException(String.format("Index : %s, Number of retries reached the limit of max retries (configured value %d. Last exception message: %s)", maxRetries, exceptionMessage)); handleFailures(request, null, e); break; } @@ -251,13 +257,13 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { if(isItemInError(bulkItemResponse)) { final ErrorCause error = bulkItemResponse.error(); - LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), error != null ? error.reason() : ""); + LOG.warn("index = {} operation = {}, error = {}", bulkItemResponse.index(), bulkItemResponse.operationType(), error != null ? error.reason() : ""); } } } } bulkRequestNumberOfRetries.increment(); - return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse); + return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse, exceptionFromRequest); } else { handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest); } @@ -273,7 +279,7 @@ private void handleFailures(final AccumulatingBulkRequest createBulkReq requestToReissue.addOperation(bulkOperation); } else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { documentsVersionConflictErrors.increment(); - LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason()); + LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkItemResponse.index(), bulkItemResponse.error().reason()); bulkOperation.releaseEventHandle(true); } else { nonRetryableFailures.add(FailedBulkOperation.builder() @@ -368,7 +374,7 @@ private void handleFailures(final AccumulatingBulkRequest