diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index dd506a9582..ff08908267 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -156,6 +156,10 @@ Elasticsearch index configuration | index.[X].elasticsearch.enable_index_names_cache | Enables cache for generated index store names. It is recommended to always enable index store names cache unless you have more then 50000 indexes per index store. | Boolean | true | MASKABLE | | index.[X].elasticsearch.health-request-timeout | When JanusGraph initializes its ES backend, JanusGraph waits up to this duration for the ES cluster health to reach at least yellow status. This string should be formatted as a natural number followed by the lowercase letter "s", e.g. 3s or 60s. | String | 30s | MASKABLE | | index.[X].elasticsearch.interface | Interface for connecting to Elasticsearch. TRANSPORT_CLIENT and NODE were previously supported, but now are required to migrate to REST_CLIENT. See the JanusGraph upgrade instructions for more details. | String | REST_CLIENT | MASKABLE | +| index.[X].elasticsearch.retry-error-codes | Comma separated list of Elasticsearch REST client ResponseException error codes to retry. E.g. "408,429" | String[] | | LOCAL | +| index.[X].elasticsearch.retry-initial-wait | Sets the initial retry wait time (in milliseconds) before exponential backoff. | Long | 1 | LOCAL | +| index.[X].elasticsearch.retry-limit | Sets the number of attempts for configured retryable error codes. | Integer | 0 | LOCAL | +| index.[X].elasticsearch.retry-max-wait | Sets the max retry wait time (in milliseconds). | Long | 1000 | LOCAL | | index.[X].elasticsearch.retry_on_conflict | Specify how many times should the operation be retried when a conflict occurs. | Integer | 0 | MASKABLE | | index.[X].elasticsearch.scroll-keep-alive | How long (in seconds) elasticsearch should keep alive the scroll context. | Integer | 60 | GLOBAL_OFFLINE | | index.[X].elasticsearch.setup-max-open-scroll-contexts | Whether JanusGraph should setup max_open_scroll_context to maximum value for the cluster or not. | Boolean | true | MASKABLE | diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java index 549865af08..7479979761 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java @@ -304,6 +304,26 @@ public class ElasticSearchIndex implements IndexProvider { "Sets the maximum socket timeout (in milliseconds).", ConfigOption.Type.MASKABLE, Integer.class, RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS); + public static final ConfigOption RETRY_LIMIT = + new ConfigOption<>(ELASTICSEARCH_NS, "retry-limit", + "Sets the number of attempts for configured retryable error codes.", ConfigOption.Type.LOCAL, + Integer.class, 0); + + public static final ConfigOption RETRY_INITIAL_WAIT = + new ConfigOption<>(ELASTICSEARCH_NS, "retry-initial-wait", + "Sets the initial retry wait time (in milliseconds) before exponential backoff.", + ConfigOption.Type.LOCAL, Long.class, 1L); + + public static final ConfigOption RETRY_MAX_WAIT = + new ConfigOption<>(ELASTICSEARCH_NS, "retry-max-wait", + "Sets the max retry wait time (in milliseconds).", ConfigOption.Type.LOCAL, + Long.class, 1000L); + + public static final ConfigOption RETRY_ERROR_CODES = + new ConfigOption<>(ELASTICSEARCH_NS, "retry-error-codes", + "Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " + + "E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]); + public static final int HOST_PORT_DEFAULT = 9200; /** diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java index 457756546b..bbfac23d6b 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java @@ -39,8 +39,11 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_HOSTS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_PORT; @@ -73,7 +76,13 @@ public ElasticSearchClient connect(Configuration config) throws IOException { final int scrollKeepAlive = config.get(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE); Preconditions.checkArgument(scrollKeepAlive >= 1, "Scroll keep-alive should be greater than or equal to 1"); final boolean useMappingTypesForES7 = config.get(ElasticSearchIndex.USE_MAPPING_FOR_ES7); - final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7); + int retryLimit = config.getOrDefault(ElasticSearchIndex.RETRY_LIMIT); + long retryInitialWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_INITIAL_WAIT); + long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT); + Set errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES)) + .mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet()); + final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, + retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs); if (config.has(ElasticSearchIndex.BULK_REFRESH)) { client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH)); } @@ -104,8 +113,11 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) { return RestClient.builder(hosts); } - protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7) { - return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7); + protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7, + int retryAttemptLimit, Set retryOnErrorCodes, long retryInitialWaitMs, + long retryMaxWaitMs) { + return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes, + retryInitialWaitMs, retryMaxWaitMs); } /** diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java index 1101e1185c..64e577724e 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java @@ -47,10 +47,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -110,14 +112,28 @@ public class RestElasticSearchClient implements ElasticSearchClient { private Integer retryOnConflict; private final String retryOnConflictKey; - - public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7) { + + private final int retryAttemptLimit; + + private final Set retryOnErrorCodes; + + private final long retryInitialWaitMs; + + private final long retryMaxWaitMs; + +public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7, + int retryAttemptLimit, Set retryOnErrorCodes, long retryInitialWaitMs, + long retryMaxWaitMs) { this.delegate = delegate; majorVersion = getMajorVersion(); this.scrollKeepAlive = scrollKeepAlive+"s"; esVersion7 = ElasticMajorVersion.SEVEN.equals(majorVersion); useMappingTypes = majorVersion.getValue() < 7 || (useMappingTypesForES7 && esVersion7); retryOnConflictKey = majorVersion.getValue() >= 7 ? "retry_on_conflict" : "_retry_on_conflict"; + this.retryAttemptLimit = retryAttemptLimit; + this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes); + this.retryInitialWaitMs = retryInitialWaitMs; + this.retryMaxWaitMs = retryMaxWaitMs; } @Override @@ -546,13 +562,35 @@ private Response performRequest(String method, String path, byte[] requestData) return performRequest(new Request(method, path), requestData); } + private Response performRequestWithRetry(Request request) throws IOException { + int retryCount = 0; + while (true) { + try { + return delegate.performRequest(request); + } catch (ResponseException e) { + if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) { + throw e; + } + //Wait before trying again + long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs); + log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit); + try { + Thread.sleep(waitDurationMs); + } catch (InterruptedException interruptedException) { + throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException); + } + } + retryCount++; + } + } + private Response performRequest(Request request, byte[] requestData) throws IOException { final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null; request.setEntity(entity); - final Response response = delegate.performRequest(request); + final Response response = performRequestWithRetry(request); if (response.getStatusLine().getStatusCode() >= 400) { throw new IOException("Error executing request: " + response.getStatusLine().getReasonPhrase()); diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java new file mode 100644 index 0000000000..f49f675c65 --- /dev/null +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java @@ -0,0 +1,156 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.es.rest; + +import com.google.common.collect.Sets; +import org.apache.http.StatusLine; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.Collections; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class RestClientRetryTest { + @Mock + private RestClient restClientMock; + + @Mock + private ResponseException responseException; + + @Mock + private Response response; + + @Mock + private StatusLine statusLine; + + @Captor + private ArgumentCaptor requestCaptor; + + RestElasticSearchClient createClient(int retryAttemptLimit, Set retryErrorCodes) throws IOException { + //Just throw an exception when there's an attempt to look up the ES version during instantiation + when(restClientMock.performRequest(any())).thenThrow(new IOException()); + + RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false, + retryAttemptLimit, retryErrorCodes, 0, 0); + //There's an initial query to get the ES version we need to accommodate, and then reset for the actual test + Mockito.reset(restClientMock); + return clientUnderTest; + } + + @Test + public void testRetryOnConfiguredErrorStatus() throws IOException { + Integer retryCode = 429; + int expectedNumberOfRequestAttempts = 2; + doReturn(retryCode).when(statusLine).getStatusCode(); + doReturn(statusLine).when(response).getStatusLine(); + doReturn(response).when(responseException).getResponse(); + //Just throw an expected exception the second time to confirm the retry occurred + //rather than mock out a parsable response + IOException expectedFinalException = new IOException("Expected"); + + try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1, + Sets.newHashSet(retryCode))) { + //prime the restClientMock again after it's reset after creation + when(restClientMock.performRequest(any())) + .thenThrow(responseException) + .thenThrow(expectedFinalException); + restClientUnderTest.bulkRequest(Collections.emptyList(), null); + Assertions.fail("Should have thrown the expected exception after retry"); + } catch (Exception actualException) { + Assertions.assertSame(expectedFinalException, actualException); + } + verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture()); + } + + @Test + public void testRetriesExhaustedReturnsLastRetryException() throws IOException { + Integer retryCode = 429; + int expectedNumberOfRequestAttempts = 2; + doReturn(retryCode).when(statusLine).getStatusCode(); + doReturn(statusLine).when(response).getStatusLine(); + doReturn(response).when(responseException).getResponse(); + ResponseException initialRetryException = mock(ResponseException.class); + doReturn(response).when(initialRetryException).getResponse(); + + try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1, + Sets.newHashSet(retryCode))) { + //prime the restClientMock again after it's reset after creation + when(restClientMock.performRequest(any())) + //first throw a different retry exception instance, then make sure it's the latter one + //that was retained and then thrown + .thenThrow(initialRetryException) + .thenThrow(responseException); + + + restClientUnderTest.bulkRequest(Collections.emptyList(), null); + Assertions.fail("Should have thrown the expected exception after retry"); + } catch (Exception e) { + Assertions.assertSame(responseException, e); + } + verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture()); + } + + @Test + public void testNonRetryErrorCodeException() throws IOException { + doReturn(503).when(statusLine).getStatusCode(); + doReturn(statusLine).when(response).getStatusLine(); + doReturn(response).when(responseException).getResponse(); + try (RestElasticSearchClient restClientUnderTest = createClient(0, + //Other retry error code is configured + Sets.newHashSet(429))) { + //prime the restClientMock again after it's reset after creation + when(restClientMock.performRequest(any())) + .thenThrow(responseException); + restClientUnderTest.bulkRequest(Collections.emptyList(), null); + Assertions.fail("Should have thrown the expected exception"); + } catch (Exception e) { + Assertions.assertSame(responseException, e); + } + verify(restClientMock, times(1)).performRequest(requestCaptor.capture()); + } + + @Test + public void testNonResponseExceptionErrorThrown() throws IOException { + IOException differentExceptionType = new IOException(); + when(restClientMock.performRequest(any())) + .thenThrow(differentExceptionType); + try (RestElasticSearchClient restClientUnderTest = createClient(0, Collections.emptySet())) { + restClientUnderTest.bulkRequest(Collections.emptyList(), null); + Assertions.fail("Should have thrown the expected exception"); + } catch (Exception e) { + Assertions.assertSame(differentExceptionType, e); + } + verify(restClientMock, times(1)).performRequest(requestCaptor.capture()); + } +} diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java index 7b2c8755d8..97c876ed7c 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java @@ -53,13 +53,18 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.net.ssl.SSLContext; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyInt; @@ -92,6 +97,12 @@ public class RestClientSetupTest { private static final Integer RETRY_ON_CONFLICT = ElasticSearchIndex.RETRY_ON_CONFLICT.getDefaultValue(); + private static final Integer RETRY_LIMIT = ElasticSearchIndex.RETRY_LIMIT.getDefaultValue(); + + private static final Long RETRY_INITIAL_WAIT = ElasticSearchIndex.RETRY_INITIAL_WAIT.getDefaultValue(); + + private static final Long RETRY_MAX_WAIT = ElasticSearchIndex.RETRY_MAX_WAIT.getDefaultValue(); + private static final AtomicInteger instanceCount = new AtomicInteger(); @Captor @@ -100,6 +111,18 @@ public class RestClientSetupTest { @Captor ArgumentCaptor scrollKACaptor; + @Captor + ArgumentCaptor retryAttemptLimitCaptor; + + @Captor + ArgumentCaptor retryInitialWaitCaptor; + + @Captor + ArgumentCaptor retryMaxWaitCaptor; + + @Captor + ArgumentCaptor> retryErrorCodesCaptor; + @Spy private RestClientSetup restClientSetup = new RestClientSetup(); @@ -134,7 +157,8 @@ private ElasticSearchClient baseConfigTest(Map extraConfigValues doReturn(restClientBuilderMock). when(restClientSetup).getRestClientBuilder(any()); doReturn(restElasticSearchClientMock).when(restClientSetup). - getElasticSearchClient(any(RestClient.class), anyInt(), anyBoolean()); + getElasticSearchClient(any(RestClient.class), anyInt(), anyBoolean(), + anyInt(), anySet(), anyLong(), anyLong()); return restClientSetup.connect(config.restrictTo(INDEX_NAME)); } @@ -165,7 +189,8 @@ public void testConnectBasicHttpConfigurationSingleHost() throws Exception { assertEquals(SCHEME_HTTP, host0.getSchemeName()); assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort()); - verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean()); + verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), + anyInt(), anySet(), anyLong(), anyLong()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue()); @@ -191,7 +216,9 @@ public void testConnectBasicHttpConfigurationMultiHost() throws Exception { assertEquals(SCHEME_HTTP, host1.getSchemeName()); assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host1.getPort()); - verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean()); + verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), + retryAttemptLimitCaptor.capture(), retryErrorCodesCaptor.capture(), retryInitialWaitCaptor.capture(), + retryMaxWaitCaptor.capture()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue()); @@ -207,6 +234,10 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { put("index." + INDEX_NAME + ".elasticsearch.scroll-keep-alive", String.valueOf(ES_SCROLL_KA)). put("index." + INDEX_NAME + ".elasticsearch.bulk-refresh", ES_BULK_REFRESH). put("index." + INDEX_NAME + ".elasticsearch.retry_on_conflict", String.valueOf(RETRY_ON_CONFLICT)). + put("index." + INDEX_NAME + ".elasticsearch.retry-limit", String.valueOf(RETRY_LIMIT)). + put("index." + INDEX_NAME + ".elasticsearch.retry-initial-wait", String.valueOf(RETRY_INITIAL_WAIT)). + put("index." + INDEX_NAME + ".elasticsearch.retry-max-wait", String.valueOf(RETRY_MAX_WAIT)). + put("index." + INDEX_NAME + ".elasticsearch.retry-error-codes", "408,429"). build()); assertNotNull(hostsConfigured); @@ -217,13 +248,22 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { assertEquals(SCHEME_HTTP, host0.getSchemeName()); assertEquals(ES_PORT, host0.getPort()); - verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean()); + verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), + retryAttemptLimitCaptor.capture(), retryErrorCodesCaptor.capture(), retryInitialWaitCaptor.capture(), + retryMaxWaitCaptor.capture()); assertEquals(ES_SCROLL_KA, scrollKACaptor.getValue().intValue()); + assertEquals(RETRY_LIMIT, + retryAttemptLimitCaptor.getValue().intValue()); + assertEquals(Stream.of(408, 429).collect(Collectors.toSet()), + retryErrorCodesCaptor.getValue()); + assertEquals(RETRY_INITIAL_WAIT, + retryInitialWaitCaptor.getValue().longValue()); + assertEquals(RETRY_MAX_WAIT, + retryMaxWaitCaptor.getValue().longValue()); verify(restElasticSearchClientMock).setBulkRefresh(eq(ES_BULK_REFRESH)); verify(restElasticSearchClientMock).setRetryOnConflict(eq(RETRY_ON_CONFLICT)); - } @Test @@ -242,7 +282,8 @@ public void testConnectBasicHttpsConfigurationSingleHost() throws Exception { assertEquals(SCHEME_HTTPS, host0.getSchemeName()); assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort()); - verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean()); + verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(), + anyInt(), anySet(), anyLong(), anyLong()); assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(), scrollKACaptor.getValue().intValue());