Skip to content

Commit

Permalink
Implemented bulk retry test & implemented splitting up a bulk request…
Browse files Browse the repository at this point in the history
… if large enough
  • Loading branch information
criminosis committed May 30, 2024
1 parent 5c2b132 commit 81b07ac
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 48 deletions.
1 change: 1 addition & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ Elasticsearch index configuration

| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE |
| index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE |
| index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ public class ElasticSearchIndex implements IndexProvider {
"Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " +
"E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]);

public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
"chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000);

public static final int HOST_PORT_DEFAULT = 9200;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT);
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES))
.mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
int bulkChunkLimitBytes = config.getOrDefault(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES);
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7,
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs);
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs, bulkChunkLimitBytes);
if (config.has(ElasticSearchIndex.BULK_REFRESH)) {
client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH));
}
Expand Down Expand Up @@ -115,9 +116,9 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) {

protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
long retryMaxWaitMs, int bulkChunkSerializedLimit) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes,
retryInitialWaitMs, retryMaxWaitMs);
retryInitialWaitMs, retryMaxWaitMs, bulkChunkSerializedLimit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.tinkerpop.shaded.jackson.annotation.JsonIgnoreProperties;
import org.apache.tinkerpop.shaded.jackson.core.JsonParseException;
import org.apache.tinkerpop.shaded.jackson.core.JsonProcessingException;
import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
import org.apache.tinkerpop.shaded.jackson.databind.JsonMappingException;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -48,10 +51,12 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -124,9 +129,11 @@ public class RestElasticSearchClient implements ElasticSearchClient {

private final long retryMaxWaitMs;

private final int bulkChunkSerializedLimitBytes;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
long retryMaxWaitMs, int bulkChunkSerializedLimitBytes) {
this.delegate = delegate;
majorVersion = getMajorVersion();
this.scrollKeepAlive = scrollKeepAlive+"s";
Expand All @@ -137,6 +144,7 @@ public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean
this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes);
this.retryInitialWaitMs = retryInitialWaitMs;
this.retryMaxWaitMs = retryMaxWaitMs;
this.bulkChunkSerializedLimitBytes = bulkChunkSerializedLimitBytes;
}

@Override
Expand Down Expand Up @@ -383,9 +391,11 @@ public void clearStore(String indexName, String storeName) throws IOException {
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final ElasticSearchMutation request : requests) {
private class RequestBytes {
final byte [] requestBytes;
final byte [] requestSource;

private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
Map<String, Object> requestData = new HashMap<>();
if (useMappingTypes) {
requestData.put("_index", request.getIndex());
Expand All @@ -400,15 +410,39 @@ private Pair<String, byte[]> buildBulkRequestInput(List<ElasticSearchMutation> r
requestData.put(retryOnConflictKey, retryOnConflict);
}

outputStream.write(mapWriter.writeValueAsBytes(
ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData))
);
outputStream.write(NEW_LINE_BYTES);
this.requestBytes = mapWriter.writeValueAsBytes(ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData));
if (request.getSource() != null) {
outputStream.write(mapWriter.writeValueAsBytes(request.getSource()));
this.requestSource = mapWriter.writeValueAsBytes(request.getSource());
} else {
this.requestSource = null;
}
}

private int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
serializedSize += this.requestSource.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
}
return serializedSize;
}

private void writeTo(OutputStream outputStream) throws IOException {
outputStream.write(this.requestBytes);
outputStream.write(NEW_LINE_BYTES);
if (this.requestSource != null) {
outputStream.write(requestSource);
outputStream.write(NEW_LINE_BYTES);
}
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final RequestBytes request : requests) {
request.writeTo(outputStream);
}

final StringBuilder builder = new StringBuilder();
if (ingestPipeline != null) {
Expand All @@ -421,13 +455,13 @@ private Pair<String, byte[]> buildBulkRequestInput(List<ElasticSearchMutation> r
return Pair.with(builder.toString(), outputStream.toByteArray());
}

private List<Triplet<Object, Integer, ElasticSearchMutation>> pairErrorsWithSubmittedMutation(
private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
//Bulk API is documented to return bulk item responses in the same order of submission
//https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body
//As such we only need to retry elements that failed
final List<Map<String, RestBulkResponse.RestBulkItemResponse>> bulkResponseItems,
final List<ElasticSearchMutation> submittedBulkRequestItems) {
final List<Triplet<Object, Integer, ElasticSearchMutation>> errors = new ArrayList<>(bulkResponseItems.size());
final List<RequestBytes> submittedBulkRequestItems) {
final List<Triplet<Object, Integer, RequestBytes>> errors = new ArrayList<>(bulkResponseItems.size());
for (int itemIndex = 0; itemIndex < bulkResponseItems.size(); itemIndex++) {
Collection<RestBulkResponse.RestBulkItemResponse> bulkResponseItem = bulkResponseItems.get(itemIndex).values();
if (bulkResponseItem.size() > 1) {
Expand All @@ -441,32 +475,77 @@ private List<Triplet<Object, Integer, ElasticSearchMutation>> pairErrorsWithSubm
return errors;
}

private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under
//the specified limit

//https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;

private BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
for (ElasticSearchMutation request : requests) {
serializedRequests.add(new RequestBytes(request));
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
}

@Override
public boolean hasNext() {
return requestIterator.hasNext();
}

@Override
public List<RequestBytes> next() {
List<RequestBytes> serializedRequests = new ArrayList<>();
int chunkSerializedTotal = 0;
while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
if (peeked.getSerializedSize() + chunkSerializedTotal < bulkChunkSerializedLimitBytes) {
chunkSerializedTotal += peeked.getSerializedSize();
serializedRequests.add(requestIterator.next());
} else {
//Adding this element would exceed the limit, so return the chunk
return serializedRequests;
}
}
//All remaining requests fit in this chunk
return serializedRequests;
}
}

@Override
public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
List<ElasticSearchMutation> requestsToSend = requests;
int retryCount = 0;
while (true) {
final Pair<String, byte[]> bulkRequestInput = buildBulkRequestInput(requestsToSend, ingestPipeline);
final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
List<Triplet<Object, Integer, ElasticSearchMutation>> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), requestsToSend);
if (!bulkItemsThatFailed.isEmpty()) {
//Only retry the bulk request if *all* the bulk response item error codes are retry error codes
final Set<Integer> errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet());
if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) {
//Build up the next request batch, of only the failed mutations
requestsToSend = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList());
performRetryWait(retryCount);
retryCount++;
public void bulkRequest(final List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
BulkRequestChunker bulkRequestChunker = new BulkRequestChunker(requests);
while (bulkRequestChunker.hasNext()) {
List<RequestBytes> bulkRequestChunk = bulkRequestChunker.next();
int retryCount = 0;
while (true) {
final Pair<String, byte[]> bulkRequestInput = buildBulkRequestInput(bulkRequestChunk, ingestPipeline);
final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
List<Triplet<Object, Integer, RequestBytes>> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), bulkRequestChunk);
if (!bulkItemsThatFailed.isEmpty()) {
//Only retry the bulk request if *all* the bulk response item error codes are retry error codes
final Set<Integer> errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet());
if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) {
//Build up the next request batch, of only the failed mutations
bulkRequestChunk = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList());
performRetryWait(retryCount);
retryCount++;
} else {
final List<Object> errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList());
errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems);
}
} else {
final List<Object> errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList());
errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems);
//The entire bulk request was successful, leave the loop
break;
}
} else {
//The entire bulk request was successful, leave the loop
break;
}
}
}
Expand Down
Loading

1 comment on commit 81b07ac

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 81b07ac Previous: 487e10c Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 12996.809192937533 ms/op 12689.439979944173 ms/op 1.02
org.janusgraph.GraphCentricQueryBenchmark.getVertices 927.2585284062714 ms/op 925.013828296019 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 217.0543131134058 ms/op 216.46225680797102 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runReindex 344.7253195078846 ms/op 340.16287398523804 ms/op 1.01
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 216.24349519202963 ms/op 217.50069746084958 ms/op 0.99
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4626.2025866088625 ms/op 5053.275978996042 ms/op 0.92
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 16456.866194069808 ms/op 16275.762675504468 ms/op 1.01
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 19865.518750940406 ms/op 19161.81803811333 ms/op 1.04
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 57160.32049226668 ms/op 54807.3874225 ms/op 1.04
org.janusgraph.CQLMultiQueryDropBenchmark.dropVertices 1531.4247519764333 ms/op 1525.6592376795456 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 7930.418827566936 ms/op 8025.759033073809 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 382.19500969643025 ms/op 376.13373544806296 ms/op 1.02
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4060.6175052615413 ms/op 4086.6631890165872 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getNames 8184.366128346179 ms/op 8416.939881828739 ms/op 0.97
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5915.738632355534 ms/op 5611.851519805606 ms/op 1.05
org.janusgraph.CQLMultiQueryBenchmark.getLabels 6964.380682508596 ms/op 6828.816333819391 ms/op 1.02
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 427.5512408738886 ms/op 434.35062048578123 ms/op 0.98
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 11869.97008625678 ms/op 11980.571977528334 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 350.9726223875369 ms/op 355.802058751578 ms/op 0.99
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14540.850235371456 ms/op 14729.77276273078 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 243.09292580915317 ms/op 243.61035626593375 ms/op 1.00
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 14330.428514166288 ms/op 13656.572724210222 ms/op 1.05
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 8009.458073944366 ms/op 8290.3478513332 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 8817.899736002206 ms/op 9070.803732347935 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8281.071263446525 ms/op 8637.306367440338 ms/op 0.96

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.