Skip to content

Commit

Permalink
Added retry logic if client request returns an error code that is con…
Browse files Browse the repository at this point in the history
…figured for retrying

Closes JanusGraph#4408

Added additional tests for coverage

Included example of comma separated values for retry-error-codes config parameter

Simplified retry loop logic

Moved retry configurations into constructor and removed setters

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed Apr 26, 2024
1 parent 0d601cf commit d75cfe6
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 12 deletions.
4 changes: 4 additions & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Elasticsearch index configuration
| index.[X].elasticsearch.enable_index_names_cache | Enables cache for generated index store names. It is recommended to always enable index store names cache unless you have more then 50000 indexes per index store. | Boolean | true | MASKABLE |
| index.[X].elasticsearch.health-request-timeout | When JanusGraph initializes its ES backend, JanusGraph waits up to this duration for the ES cluster health to reach at least yellow status. This string should be formatted as a natural number followed by the lowercase letter "s", e.g. 3s or 60s. | String | 30s | MASKABLE |
| index.[X].elasticsearch.interface | Interface for connecting to Elasticsearch. TRANSPORT_CLIENT and NODE were previously supported, but now are required to migrate to REST_CLIENT. See the JanusGraph upgrade instructions for more details. | String | REST_CLIENT | MASKABLE |
| index.[X].elasticsearch.retry-error-codes | Comma separated list of Elasticsearch REST client ResponseException error codes to retry. E.g. "408,429" | String[] | | LOCAL |
| index.[X].elasticsearch.retry-initial-wait | Sets the initial retry wait time (in milliseconds) before exponential backoff. | Long | 1 | LOCAL |
| index.[X].elasticsearch.retry-limit | Sets the number of attempts for configured retryable error codes. | Integer | 0 | LOCAL |
| index.[X].elasticsearch.retry-max-wait | Sets the max retry wait time (in milliseconds). | Long | 1000 | LOCAL |
| index.[X].elasticsearch.retry_on_conflict | Specify how many times should the operation be retried when a conflict occurs. | Integer | 0 | MASKABLE |
| index.[X].elasticsearch.scroll-keep-alive | How long (in seconds) elasticsearch should keep alive the scroll context. | Integer | 60 | GLOBAL_OFFLINE |
| index.[X].elasticsearch.setup-max-open-scroll-contexts | Whether JanusGraph should setup max_open_scroll_context to maximum value for the cluster or not. | Boolean | true | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,26 @@ public class ElasticSearchIndex implements IndexProvider {
"Sets the maximum socket timeout (in milliseconds).", ConfigOption.Type.MASKABLE,
Integer.class, RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS);

public static final ConfigOption<Integer> RETRY_LIMIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-limit",
"Sets the number of attempts for configured retryable error codes.", ConfigOption.Type.LOCAL,
Integer.class, 0);

public static final ConfigOption<Long> RETRY_INITIAL_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-initial-wait",
"Sets the initial retry wait time (in milliseconds) before exponential backoff.",
ConfigOption.Type.LOCAL, Long.class, 1L);

public static final ConfigOption<Long> RETRY_MAX_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-max-wait",
"Sets the max retry wait time (in milliseconds).", ConfigOption.Type.LOCAL,
Long.class, 1000L);

public static final ConfigOption<String[]> RETRY_ERROR_CODES =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-error-codes",
"Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " +
"E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]);

public static final int HOST_PORT_DEFAULT = 9200;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_HOSTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_PORT;
Expand Down Expand Up @@ -73,7 +76,13 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
final int scrollKeepAlive = config.get(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE);
Preconditions.checkArgument(scrollKeepAlive >= 1, "Scroll keep-alive should be greater than or equal to 1");
final boolean useMappingTypesForES7 = config.get(ElasticSearchIndex.USE_MAPPING_FOR_ES7);
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7);
int retryLimit = config.getOrDefault(ElasticSearchIndex.RETRY_LIMIT);
long retryInitialWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_INITIAL_WAIT);
long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT);
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES))
.mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7,
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs);
if (config.has(ElasticSearchIndex.BULK_REFRESH)) {
client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH));
}
Expand Down Expand Up @@ -104,8 +113,11 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) {
return RestClient.builder(hosts);
}

protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7);
protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes,
retryInitialWaitMs, retryMaxWaitMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -110,14 +112,28 @@ public class RestElasticSearchClient implements ElasticSearchClient {
private Integer retryOnConflict;

private final String retryOnConflictKey;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7) {

private final int retryAttemptLimit;

private final Set<Integer> retryOnErrorCodes;

private final long retryInitialWaitMs;

private final long retryMaxWaitMs;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
this.delegate = delegate;
majorVersion = getMajorVersion();
this.scrollKeepAlive = scrollKeepAlive+"s";
esVersion7 = ElasticMajorVersion.SEVEN.equals(majorVersion);
useMappingTypes = majorVersion.getValue() < 7 || (useMappingTypesForES7 && esVersion7);
retryOnConflictKey = majorVersion.getValue() >= 7 ? "retry_on_conflict" : "_retry_on_conflict";
this.retryAttemptLimit = retryAttemptLimit;
this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes);
this.retryInitialWaitMs = retryInitialWaitMs;
this.retryMaxWaitMs = retryMaxWaitMs;
}

@Override
Expand Down Expand Up @@ -546,13 +562,35 @@ private Response performRequest(String method, String path, byte[] requestData)
return performRequest(new Request(method, path), requestData);
}

private Response performRequestWithRetry(Request request) throws IOException {
int retryCount = 0;
while (true) {
try {
return delegate.performRequest(request);
} catch (ResponseException e) {
if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) {
throw e;
}
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);
}
}
retryCount++;
}
}

private Response performRequest(Request request, byte[] requestData) throws IOException {

final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null;

request.setEntity(entity);

final Response response = delegate.performRequest(request);
final Response response = performRequestWithRetry(request);

if (response.getStatusLine().getStatusCode() >= 400) {
throw new IOException("Error executing request: " + response.getStatusLine().getReasonPhrase());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2024 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.es.rest;

import com.google.common.collect.Sets;
import org.apache.http.StatusLine;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class RestClientRetryTest {
@Mock
private RestClient restClientMock;

@Mock
private ResponseException responseException;

@Mock
private Response response;

@Mock
private StatusLine statusLine;

@Captor
private ArgumentCaptor<Request> requestCaptor;

RestElasticSearchClient createClient(int retryAttemptLimit, Set<Integer> retryErrorCodes) throws IOException {
//Just throw an exception when there's an attempt to look up the ES version during instantiation
when(restClientMock.performRequest(any())).thenThrow(new IOException());

RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false,
retryAttemptLimit, retryErrorCodes, 0, 0);
//There's an initial query to get the ES version we need to accommodate, and then reset for the actual test
Mockito.reset(restClientMock);
return clientUnderTest;
}

@Test
public void testRetryOnConfiguredErrorStatus() throws IOException {
Integer retryCode = 429;
int expectedNumberOfRequestAttempts = 2;
doReturn(retryCode).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
//Just throw an expected exception the second time to confirm the retry occurred
//rather than mock out a parsable response
IOException expectedFinalException = new IOException("Expected");

try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1,
Sets.newHashSet(retryCode))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException)
.thenThrow(expectedFinalException);
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception actualException) {
Assertions.assertSame(expectedFinalException, actualException);
}
verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture());
}

@Test
public void testRetriesExhaustedReturnsLastRetryException() throws IOException {
Integer retryCode = 429;
int expectedNumberOfRequestAttempts = 2;
doReturn(retryCode).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
ResponseException initialRetryException = mock(ResponseException.class);
doReturn(response).when(initialRetryException).getResponse();

try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1,
Sets.newHashSet(retryCode))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
//first throw a different retry exception instance, then make sure it's the latter one
//that was retained and then thrown
.thenThrow(initialRetryException)
.thenThrow(responseException);


restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception e) {
Assertions.assertSame(responseException, e);
}
verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture());
}

@Test
public void testNonRetryErrorCodeException() throws IOException {
doReturn(503).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
try (RestElasticSearchClient restClientUnderTest = createClient(0,
//Other retry error code is configured
Sets.newHashSet(429))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException);
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Assertions.assertSame(responseException, e);
}
verify(restClientMock, times(1)).performRequest(requestCaptor.capture());
}

@Test
public void testNonResponseExceptionErrorThrown() throws IOException {
IOException differentExceptionType = new IOException();
when(restClientMock.performRequest(any()))
.thenThrow(differentExceptionType);
try (RestElasticSearchClient restClientUnderTest = createClient(0, Collections.emptySet())) {
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Assertions.assertSame(differentExceptionType, e);
}
verify(restClientMock, times(1)).performRequest(requestCaptor.capture());
}
}
Loading

1 comment on commit d75cfe6

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: d75cfe6 Previous: 0d601cf Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 13060.812343174288 ms/op 13882.523057793705 ms/op 0.94
org.janusgraph.GraphCentricQueryBenchmark.getVertices 898.7254346514046 ms/op 938.7454327039508 ms/op 0.96
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 216.41343561956523 ms/op 216.6581155155797 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runReindex 338.0366200345238 ms/op 347.79107886769236 ms/op 0.97
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 213.63426028180274 ms/op 230.14241005780215 ms/op 0.93
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4734.969248491554 ms/op 4979.698797423356 ms/op 0.95
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 17066.95763009286 ms/op 16436.099606292057 ms/op 1.04
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 18477.66787504762 ms/op 19329.0038558103 ms/op 0.96
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 53717.76240903333 ms/op 56673.909406633335 ms/op 0.95
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 8195.151031248442 ms/op 8185.252311468057 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 370.4809368506361 ms/op 366.2127994291684 ms/op 1.01
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4149.503219661898 ms/op 4155.013576386989 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getNames 8162.812156355291 ms/op 7846.475429399913 ms/op 1.04
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5661.618940538105 ms/op 5879.818323385454 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getLabels 7208.119404727526 ms/op 7009.9235684485575 ms/op 1.03
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 423.9792960434415 ms/op 406.65979183393114 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 12137.60505813065 ms/op 12538.50282688309 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 357.6897097125631 ms/op 354.5867459421563 ms/op 1.01
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14453.598089354924 ms/op 13625.019270800296 ms/op 1.06
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 251.6403959273193 ms/op 239.03739323588258 ms/op 1.05
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 14161.706561144163 ms/op 13396.492433014544 ms/op 1.06
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 8365.485376687184 ms/op 7823.291265104974 ms/op 1.07
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 9194.375511734524 ms/op 9162.429751679501 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8524.871066793352 ms/op 8753.949937235953 ms/op 0.97

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.