Skip to content

Commit

Permalink
Set pipeline for bulk request in OpenSearch sink
Browse files Browse the repository at this point in the history
Signed-off-by: miguel-vila <[email protected]>
  • Loading branch information
miguel-vila committed Sep 20, 2024
1 parent aaef847 commit 25f7244
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ private void doInitializeInternal() throws IOException {
final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled();
if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) {
final int maxLocalCompressionsForEstimation = openSearchSinkConfig.getIndexConfiguration().getMaxLocalCompressionsForEstimation();
bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias), bulkSize, maxLocalCompressionsForEstimation);
bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(baseBulkRequestBuilder(requireAlias), bulkSize, maxLocalCompressionsForEstimation);
} else if (isEstimateBulkSizeUsingCompression) {
LOG.warn("Estimate bulk request size using compression was enabled but request compression is disabled. " +
"Estimating bulk request size without compression.");
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias));
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(baseBulkRequestBuilder(requireAlias));
} else {
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias));
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(baseBulkRequestBuilder(requireAlias));
}

final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries();
Expand Down Expand Up @@ -662,4 +662,10 @@ private boolean isUsingDocumentFilters() {
(sinkContext.getExcludeKeys() != null && !sinkContext.getExcludeKeys().isEmpty()) ||
sinkContext.getTagsTargetKey() != null;
}

private BulkRequest.Builder baseBulkRequestBuilder(boolean requireAlias) {
BulkRequest.Builder base = new BulkRequest.Builder().requireAlias(requireAlias);
Optional.ofNullable(this.pipeline).ifPresent(base::pipeline);
return base;
}
}

0 comments on commit 25f7244

Please sign in to comment.