Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Reference the ElasticSearch documentation in the bulk chunker size limit config option
- Cleaning up the population of the ES bulk request path logic
- Submitting bulk request items that are below the configured limit, then throwing for overly large items

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed Jun 25, 2024
1 parent d255396 commit bb48e63
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Elasticsearch index configuration

| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. If a single bulk item exceeds this limit an exception will be thrown after the smaller bulk items are submitted. Ensure that this limit is always less than or equal to the configured limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html). | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE |
| index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE |
| index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ public class ElasticSearchIndex implements IndexProvider {
public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
"chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000);
"chunked to this size. If a single bulk item exceeds this limit an exception will be thrown after the " +
"smaller bulk items are submitted. Ensure that this limit is always less than or equal to the configured " +
"limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the " +
"[Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html).",
ConfigOption.Type.LOCAL, Integer.class, 100_000_000);

public static final int HOST_PORT_DEFAULT = 9200;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.janusgraph.diskstorage.es.rest;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -392,11 +394,13 @@ public void clearStore(String indexName, String storeName) throws IOException {
}
}

private class RequestBytes {
@VisibleForTesting
class RequestBytes {
final byte [] requestBytes;
final byte [] requestSource;

private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
@VisibleForTesting
RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
Map<String, Object> requestData = new HashMap<>();
if (useMappingTypes) {
requestData.put("_index", request.getIndex());
Expand All @@ -419,7 +423,8 @@ private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingE
}
}

private int getSerializedSize() {
@VisibleForTesting
int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
Expand All @@ -445,15 +450,15 @@ private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests,
request.writeTo(outputStream);
}

final StringBuilder builder = new StringBuilder();
final StringBuilder bulkRequestQueryParameters = new StringBuilder();
if (ingestPipeline != null) {
APPEND_OP.apply(builder).append("pipeline=").append(ingestPipeline);
APPEND_OP.apply(bulkRequestQueryParameters).append("pipeline=").append(ingestPipeline);
}
if (bulkRefreshEnabled) {
APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh);
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
}
builder.insert(0, REQUEST_SEPARATOR + "_bulk");
return Pair.with(builder.toString(), outputStream.toByteArray());
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
return Pair.with(bulkRequestPath, outputStream.toByteArray());
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
Expand All @@ -476,7 +481,8 @@ private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMuta
return errors;
}

private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
@VisibleForTesting
class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under
//the specified limit

Expand All @@ -485,18 +491,33 @@ private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;
private final int[] exceptionallyLargeRequests;

private BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
@VisibleForTesting
BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
List<Integer> requestSizesThatWereTooLarge = new ArrayList<>();
for (ElasticSearchMutation request : requests) {
serializedRequests.add(new RequestBytes(request));
RequestBytes requestBytes = new RequestBytes(request);
int requestSerializedSize = requestBytes.getSerializedSize();
if (requestSerializedSize <= bulkChunkSerializedLimitBytes) {
//Only keep items that we can actually send in memory
serializedRequests.add(requestBytes);
} else {
requestSizesThatWereTooLarge.add(requestSerializedSize);
}
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
//Condense request sizes that are too large into an int array to remove Boxed & List memory overhead
this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.isEmpty() ? null :
requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray();
}

@Override
public boolean hasNext() {
return requestIterator.hasNext();
//Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted
//This allows next() to throw after all well sized requests have been chunked for submission
return requestIterator.hasNext() || exceptionallyLargeRequests != null;
}

@Override
Expand All @@ -505,20 +526,21 @@ public List<RequestBytes> next() {
int chunkSerializedTotal = 0;
while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
int requestSerializedSize = peeked.getSerializedSize();
if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
chunkSerializedTotal += requestSerializedSize;
chunkSerializedTotal += peeked.getSerializedSize();
if (chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
serializedRequests.add(requestIterator.next());
} else if (requestSerializedSize > bulkChunkSerializedLimitBytes) {
//we've encountered an element we cannot send to Elasticsearch given the configured limit
throw new IllegalArgumentException(String.format(
"Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s",
bulkChunkSerializedLimitBytes, requestSerializedSize));
} else {
//Adding this element would exceed the limit, so return the chunk
return serializedRequests;
}
}
//Check if we should throw an exception for items that were exceptionally large and therefore undeliverable.
//This is only done after all items that could be sent have been sent
if (serializedRequests.isEmpty() && this.exceptionallyLargeRequests != null) {
throw new IllegalArgumentException(String.format(
"Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) %s",
bulkChunkSerializedLimitBytes, Arrays.toString(this.exceptionallyLargeRequests)));
}
//All remaining requests fit in this chunk
return serializedRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -105,6 +106,44 @@ public void testSplittingOfLargeBulkItems() throws IOException {
}
}

@Test
public void testThrowingForOverlyLargeBulkItemOnlyAfterSmallerItemsAreChunked() throws IOException {
int bulkLimit = 1_000_000;
StringBuilder overlyLargePayloadBuilder = new StringBuilder();
IntStream.range(0, bulkLimit * 10).forEach(value -> overlyLargePayloadBuilder.append("a"));
String overlyLargePayload = overlyLargePayloadBuilder.toString();
ElasticSearchMutation overlyLargeMutation = ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id2",
Collections.singletonMap("someKey", overlyLargePayload));
List<ElasticSearchMutation> bulkItems = Arrays.asList(
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id1",
Collections.singletonMap("someKey", "small_payload1")),
overlyLargeMutation,
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id3",
Collections.singletonMap("someKey", "small_payload2"))
);

try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) {
RestElasticSearchClient.BulkRequestChunker chunkerUnderTest = restClientUnderTest.new BulkRequestChunker(bulkItems);
int overlyLargeRequestExpectedSize = restClientUnderTest.new RequestBytes(overlyLargeMutation).getSerializedSize();

//The chunker should chunk this request first as a list of the 2 smaller items
List<RestElasticSearchClient.RequestBytes> smallItemsChunk = chunkerUnderTest.next();
Assertions.assertEquals(2, smallItemsChunk.size());

//Then the chunker should still return true for hasNext()
Assertions.assertTrue(chunkerUnderTest.hasNext());

//Then the next call for next() should throw to report the exceptionally large item
IllegalArgumentException thrownException = Assertions.assertThrows(IllegalArgumentException.class, chunkerUnderTest::next,
"Should have thrown due to bulk request item being too large");

String expectedExceptionMessage = String.format("Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) [%s]",
bulkLimit, overlyLargeRequestExpectedSize);

Assertions.assertEquals(expectedExceptionMessage, thrownException.getMessage());
}
}

@Test
public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException {
int bulkLimit = 800;
Expand Down

1 comment on commit bb48e63

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: bb48e63 Previous: d255396 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 12698.508254050557 ms/op 15017.170187305353 ms/op 0.85
org.janusgraph.GraphCentricQueryBenchmark.getVertices 907.3763148705536 ms/op 961.779849406255 ms/op 0.94
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 215.39148166666664 ms/op 217.56629310797098 ms/op 0.99
org.janusgraph.MgmtOlapJobBenchmark.runReindex 343.5108976694047 ms/op 361.57722250333336 ms/op 0.95
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 200.39858062462673 ms/op 299.2003695664086 ms/op 0.67
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4905.405478102953 ms/op 6071.426485185737 ms/op 0.81
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 16567.88122769223 ms/op 19975.681407350366 ms/op 0.83
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 19225.226661422726 ms/op 23952.72953915508 ms/op 0.80
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 56353.8804716 ms/op 68171.69029556667 ms/op 0.83
org.janusgraph.CQLMultiQueryDropBenchmark.dropVertices 1563.1737935455142 ms/op 1816.9581586647128 ms/op 0.86
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 8126.2207515714435 ms/op 9443.527656875849 ms/op 0.86
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 384.54646019644537 ms/op 428.072660988753 ms/op 0.90
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4265.696879206127 ms/op 5586.392145922853 ms/op 0.76
org.janusgraph.CQLMultiQueryBenchmark.getNames 8158.814215260108 ms/op 11370.815512474728 ms/op 0.72
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5564.848503908332 ms/op 7631.645434302394 ms/op 0.73
org.janusgraph.CQLMultiQueryBenchmark.getLabels 6893.764861539951 ms/op 8902.651176663585 ms/op 0.77
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 426.738591296529 ms/op 501.7901263066493 ms/op 0.85
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 12823.230933340394 ms/op 15968.194510484615 ms/op 0.80
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 350.2759184152866 ms/op 423.02341747485923 ms/op 0.83
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 13604.706302256785 ms/op 17368.035613674252 ms/op 0.78
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 250.44073867666893 ms/op 282.14623086325463 ms/op 0.89
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 14248.328336844022 ms/op 20111.529798068364 ms/op 0.71
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 8145.719964876012 ms/op 10801.221835707169 ms/op 0.75
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 9018.680465993577 ms/op 10771.838655087315 ms/op 0.84
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8470.377540812293 ms/op 10574.69229554027 ms/op 0.80

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.