Skip to content

Commit

Permalink
OpenSearchSink - Enhance logs to include index name and last exceptio…
Browse files Browse the repository at this point in the history
…n information

Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Aug 16, 2024
1 parent 0d3ca76 commit f53475b
Showing 1 changed file with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,21 @@ public final class BulkRetryStrategy {
static class BulkOperationRequestResponse {
final AccumulatingBulkRequest bulkRequest;
final BulkResponse response;
public BulkOperationRequestResponse(final AccumulatingBulkRequest bulkRequest, final BulkResponse response) {
final Exception exception;
public BulkOperationRequestResponse(final AccumulatingBulkRequest bulkRequest, final BulkResponse response, final Exception exception) {
this.bulkRequest = bulkRequest;
this.response = response;
this.exception = exception;
}
AccumulatingBulkRequest getBulkRequest() {
return bulkRequest;
}
BulkResponse getResponse() {
return response;
}
String getExceptionMessage() {
return exception.getMessage();
}
}

public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction,
Expand Down Expand Up @@ -194,16 +199,17 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte
final Backoff backoff = Backoff.exponential(INITIAL_DELAY_MS, MAXIMUM_DELAY_MS).withMaxAttempts(maxRetries);
BulkOperationRequestResponse operationResponse;
BulkResponse response = null;
String exceptionMessage = "";
AccumulatingBulkRequest request = bulkRequest;
int attempt = 1;
do {
operationResponse = handleRetry(request, response, attempt);
if (operationResponse != null) {
final long delayMillis = backoff.nextDelayMillis(attempt++);
request = operationResponse.getBulkRequest();
response = operationResponse.getResponse();
exceptionMessage = operationResponse.getExceptionMessage();
if (delayMillis < 0) {
RuntimeException e = new RuntimeException(String.format("Number of retries reached the limit of max retries (configured value %d)", maxRetries));
RuntimeException e = new RuntimeException(String.format("Index : %s, Number of retries reached the limit of max retries (configured value %d. Last exception message: %s)", maxRetries, exceptionMessage));
handleFailures(request, null, e);
break;
}
Expand Down Expand Up @@ -251,13 +257,13 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if(isItemInError(bulkItemResponse)) {
final ErrorCause error = bulkItemResponse.error();
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), error != null ? error.reason() : "");
LOG.warn("index = {} operation = {}, error = {}", bulkItemResponse.index(), bulkItemResponse.operationType(), error != null ? error.reason() : "");
}
}
}
}
bulkRequestNumberOfRetries.increment();
return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse);
return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse, exceptionFromRequest);
} else {
handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest);
}
Expand All @@ -273,7 +279,7 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
if (error != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(error.type())) {
continue;
}
LOG.warn("operation = {}, status = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.status(), error != null ? error.reason() : "");
LOG.warn("index = {}, operation = {}, status = {}, error = {}", bulkItemResponse.index(), bulkItemResponse.operationType(), bulkItemResponse.status(), error != null ? error.reason() : "");
}
}
handleFailures(bulkRequest, bulkResponse.items());
Expand Down Expand Up @@ -332,7 +338,7 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
requestToReissue.addOperation(bulkOperation);
} else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkItemResponse.index(), bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
nonRetryableFailures.add(FailedBulkOperation.builder()
Expand Down Expand Up @@ -368,7 +374,7 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
if (isItemInError(bulkItemResponse)) {
if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkOperation.getIndex(), bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
failures.add(FailedBulkOperation.builder()
Expand Down

0 comments on commit f53475b

Please sign in to comment.