diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 2248ba669a..27f687aba2 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -261,13 +261,13 @@ private void doInitializeInternal() throws IOException { final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled(); if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) { final int maxLocalCompressionsForEstimation = openSearchSinkConfig.getIndexConfiguration().getMaxLocalCompressionsForEstimation(); - bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias), bulkSize, maxLocalCompressionsForEstimation); + bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(baseBulkRequestBuilder(requireAlias), bulkSize, maxLocalCompressionsForEstimation); } else if (isEstimateBulkSizeUsingCompression) { LOG.warn("Estimate bulk request size using compression was enabled but request compression is disabled. " + "Estimating bulk request size without compression."); - bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias)); + bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(baseBulkRequestBuilder(requireAlias)); } else { - bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias)); + bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(baseBulkRequestBuilder(requireAlias)); } final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries(); @@ -662,4 +662,10 @@ private boolean isUsingDocumentFilters() { (sinkContext.getExcludeKeys() != null && !sinkContext.getExcludeKeys().isEmpty()) || sinkContext.getTagsTargetKey() != null; } + + private BulkRequest.Builder baseBulkRequestBuilder(boolean requireAlias) { + BulkRequest.Builder base = new BulkRequest.Builder().requireAlias(requireAlias); + Optional.ofNullable(this.pipeline).ifPresent(base::pipeline); + return base; + } }