From 81b07ac68b1f8f61c3e0c648d6cef02a1cfa08e0 Mon Sep 17 00:00:00 2001 From: Allan Clements Date: Thu, 30 May 2024 16:01:51 -0500 Subject: [PATCH] Implemented bulk retry test & implemented splitting up a bulk request if large enough --- docs/configs/janusgraph-cfg.md | 1 + .../diskstorage/es/ElasticSearchIndex.java | 5 + .../diskstorage/es/rest/RestClientSetup.java | 7 +- .../es/rest/RestElasticSearchClient.java | 149 ++++++++++++++---- .../es/rest/RestClientBulkRequestsTest.java | 106 +++++++++++++ .../es/rest/RestClientRetryTest.java | 76 ++++++++- .../es/rest/RestClientSetupTest.java | 15 +- 7 files changed, 311 insertions(+), 48 deletions(-) create mode 100644 janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index ff089082670..f5fd722f773 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -150,6 +150,7 @@ Elasticsearch index configuration | Name | Description | Datatype | Default Value | Mutability | | ---- | ---- | ---- | ---- | ---- | +| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL | | index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE | | index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE | | index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE | diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java index 74799797611..e357fd20add 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java @@ -324,6 +324,11 @@ public class ElasticSearchIndex implements IndexProvider { "Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " + "E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]); + public static final ConfigOption BULK_CHUNK_SIZE_LIMIT_BYTES = + new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes", + "The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " + + "chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000); + public static final int HOST_PORT_DEFAULT = 9200; /** diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java index bbfac23d6be..355bb7cc447 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java @@ -81,8 +81,9 @@ public ElasticSearchClient connect(Configuration config) throws IOException { long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT); Set errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES)) .mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet()); + int bulkChunkLimitBytes = config.getOrDefault(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES); final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, - retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs); + retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs, bulkChunkLimitBytes); if (config.has(ElasticSearchIndex.BULK_REFRESH)) { client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH)); } @@ -115,9 +116,9 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) { protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7, int retryAttemptLimit, Set retryOnErrorCodes, long retryInitialWaitMs, - long retryMaxWaitMs) { + long retryMaxWaitMs, int bulkChunkSerializedLimit) { return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes, - retryInitialWaitMs, retryMaxWaitMs); + retryInitialWaitMs, retryMaxWaitMs, bulkChunkSerializedLimit); } /** diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java index 44c0ee307f0..dc770f027e3 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java @@ -16,11 +16,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; import org.apache.http.HttpEntity; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.ContentType; import org.apache.tinkerpop.shaded.jackson.annotation.JsonIgnoreProperties; import org.apache.tinkerpop.shaded.jackson.core.JsonParseException; +import org.apache.tinkerpop.shaded.jackson.core.JsonProcessingException; import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference; import org.apache.tinkerpop.shaded.jackson.databind.JsonMappingException; import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; @@ -48,10 +51,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -124,9 +129,11 @@ public class RestElasticSearchClient implements ElasticSearchClient { private final long retryMaxWaitMs; + private final int bulkChunkSerializedLimitBytes; + public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7, int retryAttemptLimit, Set retryOnErrorCodes, long retryInitialWaitMs, - long retryMaxWaitMs) { + long retryMaxWaitMs, int bulkChunkSerializedLimitBytes) { this.delegate = delegate; majorVersion = getMajorVersion(); this.scrollKeepAlive = scrollKeepAlive+"s"; @@ -137,6 +144,7 @@ public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes); this.retryInitialWaitMs = retryInitialWaitMs; this.retryMaxWaitMs = retryMaxWaitMs; + this.bulkChunkSerializedLimitBytes = bulkChunkSerializedLimitBytes; } @Override @@ -383,9 +391,11 @@ public void clearStore(String indexName, String storeName) throws IOException { } } - private Pair buildBulkRequestInput(List requests, String ingestPipeline) throws IOException { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - for (final ElasticSearchMutation request : requests) { + private class RequestBytes { + final byte [] requestBytes; + final byte [] requestSource; + + private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException { Map requestData = new HashMap<>(); if (useMappingTypes) { requestData.put("_index", request.getIndex()); @@ -400,15 +410,39 @@ private Pair buildBulkRequestInput(List r requestData.put(retryOnConflictKey, retryOnConflict); } - outputStream.write(mapWriter.writeValueAsBytes( - ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData)) - ); - outputStream.write(NEW_LINE_BYTES); + this.requestBytes = mapWriter.writeValueAsBytes(ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData)); if (request.getSource() != null) { - outputStream.write(mapWriter.writeValueAsBytes(request.getSource())); + this.requestSource = mapWriter.writeValueAsBytes(request.getSource()); + } else { + this.requestSource = null; + } + } + + private int getSerializedSize() { + int serializedSize = this.requestBytes.length; + serializedSize+= 1; //For follow-up NEW_LINE_BYTES + if (this.requestSource != null) { + serializedSize += this.requestSource.length; + serializedSize+= 1; //For follow-up NEW_LINE_BYTES + } + return serializedSize; + } + + private void writeTo(OutputStream outputStream) throws IOException { + outputStream.write(this.requestBytes); + outputStream.write(NEW_LINE_BYTES); + if (this.requestSource != null) { + outputStream.write(requestSource); outputStream.write(NEW_LINE_BYTES); } } + } + + private Pair buildBulkRequestInput(List requests, String ingestPipeline) throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (final RequestBytes request : requests) { + request.writeTo(outputStream); + } final StringBuilder builder = new StringBuilder(); if (ingestPipeline != null) { @@ -421,13 +455,13 @@ private Pair buildBulkRequestInput(List r return Pair.with(builder.toString(), outputStream.toByteArray()); } - private List> pairErrorsWithSubmittedMutation( + private List> pairErrorsWithSubmittedMutation( //Bulk API is documented to return bulk item responses in the same order of submission //https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body //As such we only need to retry elements that failed final List> bulkResponseItems, - final List submittedBulkRequestItems) { - final List> errors = new ArrayList<>(bulkResponseItems.size()); + final List submittedBulkRequestItems) { + final List> errors = new ArrayList<>(bulkResponseItems.size()); for (int itemIndex = 0; itemIndex < bulkResponseItems.size(); itemIndex++) { Collection bulkResponseItem = bulkResponseItems.get(itemIndex).values(); if (bulkResponseItem.size() > 1) { @@ -441,32 +475,77 @@ private List> pairErrorsWithSubm return errors; } + private class BulkRequestChunker implements Iterator> { + //By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under + //the specified limit + + //https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc + //There is no "correct" number of actions to perform in a single bulk request. Experiment with different + // settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum + // size of a HTTP request to 100mb by default + private final PeekingIterator requestIterator; + + private BulkRequestChunker(List requests) throws JsonProcessingException { + List serializedRequests = new ArrayList<>(requests.size()); + for (ElasticSearchMutation request : requests) { + serializedRequests.add(new RequestBytes(request)); + } + this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator()); + } + + @Override + public boolean hasNext() { + return requestIterator.hasNext(); + } + + @Override + public List next() { + List serializedRequests = new ArrayList<>(); + int chunkSerializedTotal = 0; + while (requestIterator.hasNext()) { + RequestBytes peeked = requestIterator.peek(); + if (peeked.getSerializedSize() + chunkSerializedTotal < bulkChunkSerializedLimitBytes) { + chunkSerializedTotal += peeked.getSerializedSize(); + serializedRequests.add(requestIterator.next()); + } else { + //Adding this element would exceed the limit, so return the chunk + return serializedRequests; + } + } + //All remaining requests fit in this chunk + return serializedRequests; + } + } + @Override - public void bulkRequest(List requests, String ingestPipeline) throws IOException { - List requestsToSend = requests; - int retryCount = 0; - while (true) { - final Pair bulkRequestInput = buildBulkRequestInput(requestsToSend, ingestPipeline); - final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1()); - try (final InputStream inputStream = response.getEntity().getContent()) { - final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class); - List> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), requestsToSend); - if (!bulkItemsThatFailed.isEmpty()) { - //Only retry the bulk request if *all* the bulk response item error codes are retry error codes - final Set errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet()); - if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) { - //Build up the next request batch, of only the failed mutations - requestsToSend = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList()); - performRetryWait(retryCount); - retryCount++; + public void bulkRequest(final List requests, String ingestPipeline) throws IOException { + BulkRequestChunker bulkRequestChunker = new BulkRequestChunker(requests); + while (bulkRequestChunker.hasNext()) { + List bulkRequestChunk = bulkRequestChunker.next(); + int retryCount = 0; + while (true) { + final Pair bulkRequestInput = buildBulkRequestInput(bulkRequestChunk, ingestPipeline); + final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1()); + try (final InputStream inputStream = response.getEntity().getContent()) { + final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class); + List> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), bulkRequestChunk); + if (!bulkItemsThatFailed.isEmpty()) { + //Only retry the bulk request if *all* the bulk response item error codes are retry error codes + final Set errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet()); + if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) { + //Build up the next request batch, of only the failed mutations + bulkRequestChunk = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList()); + performRetryWait(retryCount); + retryCount++; + } else { + final List errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList()); + errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error)); + throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems); + } } else { - final List errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList()); - errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error)); - throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems); + //The entire bulk request was successful, leave the loop + break; } - } else { - //The entire bulk request was successful, leave the loop - break; } } } diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java new file mode 100644 index 00000000000..ce7dfd1991d --- /dev/null +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java @@ -0,0 +1,106 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.es.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpEntity; +import org.apache.http.StatusLine; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.janusgraph.diskstorage.es.ElasticSearchMutation; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.stream.IntStream; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class RestClientBulkRequestsTest { + @Mock + private RestClient restClientMock; + + @Mock + private Response response; + + @Mock + private StatusLine statusLine; + + @Captor + private ArgumentCaptor requestCaptor; + + RestElasticSearchClient createClient(int bulkChunkSerializedLimitBytes) throws IOException { + //Just throw an exception when there's an attempt to look up the ES version during instantiation + when(restClientMock.performRequest(any())).thenThrow(new IOException()); + + RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false, + 0, Collections.emptySet(), 0, 0, bulkChunkSerializedLimitBytes); + //There's an initial query to get the ES version we need to accommodate, and then reset for the actual test + Mockito.reset(restClientMock); + return clientUnderTest; + } + + @Test + public void testSplittingOfLargeBulkItems() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + when(statusLine.getStatusCode()).thenReturn(200); + + //In both cases return a "success" + RestBulkResponse singletonBulkItemResponseSuccess = new RestBulkResponse(); + singletonBulkItemResponseSuccess.setItems( + Collections.singletonList(Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()))); + byte [] singletonBulkItemResponseSuccessBytes = mapper.writeValueAsBytes(singletonBulkItemResponseSuccess); + HttpEntity singletonBulkItemHttpEntityMock = mock(HttpEntity.class); + when(singletonBulkItemHttpEntityMock.getContent()) + .thenReturn(new ByteArrayInputStream(singletonBulkItemResponseSuccessBytes)) + //Have to setup a second input stream because it will have been consumed by the first pass + .thenReturn(new ByteArrayInputStream(singletonBulkItemResponseSuccessBytes)); + when(response.getEntity()).thenReturn(singletonBulkItemHttpEntityMock); + when(response.getStatusLine()).thenReturn(statusLine); + + int bulkLimit = 800; + try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) { + //prime the restClientMock again after it's reset after creation + when(restClientMock.performRequest(any())).thenReturn(response).thenReturn(response); + StringBuilder payloadBuilder = new StringBuilder(); + IntStream.range(0, bulkLimit - 100).forEach(value -> payloadBuilder.append("a")); + String largePayload = payloadBuilder.toString(); + restClientUnderTest.bulkRequest(Arrays.asList( + //There should be enough characters in the payload that they can't both be sent in a single bulk call + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id1", + Collections.singletonMap("someKey", largePayload)), + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id2", + Collections.singletonMap("someKey", largePayload)) + ), null); + //Verify that despite only calling bulkRequest() once, we had 2 calls to the underlying rest client's + //perform request (due to the mutations being split across 2 calls) + verify(restClientMock, times(2)).performRequest(requestCaptor.capture()); + } + } +} diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java index f49f675c651..9c9e1019772 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java @@ -14,12 +14,15 @@ package org.janusgraph.diskstorage.es.rest; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; +import org.apache.http.HttpEntity; import org.apache.http.StatusLine; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; +import org.janusgraph.diskstorage.es.ElasticSearchMutation; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -29,9 +32,13 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -62,12 +69,63 @@ RestElasticSearchClient createClient(int retryAttemptLimit, Set retryEr when(restClientMock.performRequest(any())).thenThrow(new IOException()); RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false, - retryAttemptLimit, retryErrorCodes, 0, 0); + retryAttemptLimit, retryErrorCodes, 0, 0, 100_000_000); //There's an initial query to get the ES version we need to accommodate, and then reset for the actual test Mockito.reset(restClientMock); return clientUnderTest; } + @Test + public void testRetryOfIndividuallyFailedBulkItems() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + int retryErrorCode = 429; + //A bulk response will still return a success despite an underlying item having an error + when(statusLine.getStatusCode()).thenReturn(200); + + //The initial bulk request will have one element that failed in its response + RestBulkResponse.RestBulkItemResponse initialRequestFailedItem = new RestBulkResponse.RestBulkItemResponse(); + initialRequestFailedItem.setError("An Error"); + initialRequestFailedItem.setStatus(retryErrorCode); + RestBulkResponse initialBulkResponse = new RestBulkResponse(); + initialBulkResponse.setItems( + Stream.of( + Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()), + Collections.singletonMap("index", initialRequestFailedItem), + Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()) + ).collect(Collectors.toList()) + ); + HttpEntity initialHttpEntityMock = mock(HttpEntity.class); + when(initialHttpEntityMock.getContent()).thenReturn(new ByteArrayInputStream( + mapper.writeValueAsBytes(initialBulkResponse))); + Response initialResponseMock = mock(Response.class); + when(initialResponseMock.getEntity()).thenReturn(initialHttpEntityMock); + when(initialResponseMock.getStatusLine()).thenReturn(statusLine); + + //The retry should then only have a single item that succeeded + RestBulkResponse retriedBulkResponse = new RestBulkResponse(); + retriedBulkResponse.setItems( + Collections.singletonList(Collections.singletonMap("index", new RestBulkResponse.RestBulkItemResponse()))); + HttpEntity retriedHttpEntityMock = mock(HttpEntity.class); + when(retriedHttpEntityMock.getContent()).thenReturn(new ByteArrayInputStream( + mapper.writeValueAsBytes(retriedBulkResponse))); + Response retriedResponseMock = mock(Response.class); + when(retriedResponseMock.getEntity()).thenReturn(retriedHttpEntityMock); + when(retriedResponseMock.getStatusLine()).thenReturn(statusLine); + + try (RestElasticSearchClient restClientUnderTest = createClient(1, Sets.newHashSet(retryErrorCode))) { + //prime the restClientMock again after it's reset after creation + when(restClientMock.performRequest(any())).thenReturn(initialResponseMock).thenReturn(retriedResponseMock); + restClientUnderTest.bulkRequest(Arrays.asList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id1"), + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id2"), + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id3") + ), null); + //Verify that despite only calling bulkRequest once, we had 2 calls to the underlying rest client's + //perform request (due to the retried failure) + verify(restClientMock, times(2)).performRequest(requestCaptor.capture()); + } + } + @Test public void testRetryOnConfiguredErrorStatus() throws IOException { Integer retryCode = 429; @@ -85,7 +143,9 @@ public void testRetryOnConfiguredErrorStatus() throws IOException { when(restClientMock.performRequest(any())) .thenThrow(responseException) .thenThrow(expectedFinalException); - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception after retry"); } catch (Exception actualException) { Assertions.assertSame(expectedFinalException, actualException); @@ -113,7 +173,9 @@ public void testRetriesExhaustedReturnsLastRetryException() throws IOException { .thenThrow(responseException); - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception after retry"); } catch (Exception e) { Assertions.assertSame(responseException, e); @@ -132,7 +194,9 @@ public void testNonRetryErrorCodeException() throws IOException { //prime the restClientMock again after it's reset after creation when(restClientMock.performRequest(any())) .thenThrow(responseException); - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception"); } catch (Exception e) { Assertions.assertSame(responseException, e); @@ -146,7 +210,9 @@ public void testNonResponseExceptionErrorThrown() throws IOException { when(restClientMock.performRequest(any())) .thenThrow(differentExceptionType); try (RestElasticSearchClient restClientUnderTest = createClient(0, Collections.emptySet())) { - restClientUnderTest.bulkRequest(Collections.emptyList(), null); + restClientUnderTest.bulkRequest(Collections.singletonList( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + null); Assertions.fail("Should have thrown the expected exception"); } catch (Exception e) { Assertions.assertSame(differentExceptionType, e); diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java index 97c876ed7cc..0f8d6fb17e1 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java @@ -123,6 +123,9 @@ public class RestClientSetupTest { @Captor ArgumentCaptor> retryErrorCodesCaptor; + @Captor + ArgumentCaptor bulkChunkSerializedLimitCaptor; + @Spy private RestClientSetup restClientSetup = new RestClientSetup(); @@ -158,7 +161,7 @@ private ElasticSearchClient baseConfigTest(Map extraConfigValues when(restClientSetup).getRestClientBuilder(any()); doReturn(restElasticSearchClientMock).when(restClientSetup). getElasticSearchClient(any(RestClient.class), anyInt(), anyBoolean(), - anyInt(), anySet(), anyLong(), anyLong()); + anyInt(), anySet(), anyLong(), anyLong(), anyInt()); return restClientSetup.connect(config.restrictTo(INDEX_NAME)); } @@ -190,7 +193,7 @@ public void testConnectBasicHttpConfigurationSingleHost() throws Exception { assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort()); verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), - anyInt(), anySet(), anyLong(), anyLong()); + anyInt(), anySet(), anyLong(), anyLong(), anyInt()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue()); @@ -218,7 +221,7 @@ public void testConnectBasicHttpConfigurationMultiHost() throws Exception { verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), retryAttemptLimitCaptor.capture(), retryErrorCodesCaptor.capture(), retryInitialWaitCaptor.capture(), - retryMaxWaitCaptor.capture()); + retryMaxWaitCaptor.capture(), bulkChunkSerializedLimitCaptor.capture()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue()); @@ -238,6 +241,7 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { put("index." + INDEX_NAME + ".elasticsearch.retry-initial-wait", String.valueOf(RETRY_INITIAL_WAIT)). put("index." + INDEX_NAME + ".elasticsearch.retry-max-wait", String.valueOf(RETRY_MAX_WAIT)). put("index." + INDEX_NAME + ".elasticsearch.retry-error-codes", "408,429"). + put("index." + INDEX_NAME + ".elasticsearch.bulk-chunk-size-limit-bytes", "1000000"). build()); assertNotNull(hostsConfigured); @@ -250,7 +254,7 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), retryAttemptLimitCaptor.capture(), retryErrorCodesCaptor.capture(), retryInitialWaitCaptor.capture(), - retryMaxWaitCaptor.capture()); + retryMaxWaitCaptor.capture(), bulkChunkSerializedLimitCaptor.capture()); assertEquals(ES_SCROLL_KA, scrollKACaptor.getValue().intValue()); assertEquals(RETRY_LIMIT, @@ -261,6 +265,7 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { retryInitialWaitCaptor.getValue().longValue()); assertEquals(RETRY_MAX_WAIT, retryMaxWaitCaptor.getValue().longValue()); + assertEquals(1_000_000, bulkChunkSerializedLimitCaptor.getValue().intValue()); verify(restElasticSearchClientMock).setBulkRefresh(eq(ES_BULK_REFRESH)); verify(restElasticSearchClientMock).setRetryOnConflict(eq(RETRY_ON_CONFLICT)); @@ -283,7 +288,7 @@ public void testConnectBasicHttpsConfigurationSingleHost() throws Exception { assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort()); verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), - anyInt(), anySet(), anyLong(), anyLong()); + anyInt(), anySet(), anyLong(), anyLong(), anyInt()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue());