diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index ccd430c982..04147619b7 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -101,6 +101,7 @@ public class OpenSearchSink extends AbstractSink> { private final long flushTimeout; private final IndexType indexType; private final String documentIdField; + private final String documentId; private final String routingField; private final String action; private final String documentRootKey; @@ -117,7 +118,6 @@ public class OpenSearchSink extends AbstractSink> { private PluginSetting pluginSetting; private final SinkContext sinkContext; private final ExpressionEvaluator expressionEvaluator; - private final boolean isDocumentIdAnExpression; private FailedBulkOperationConverter failedBulkOperationConverter; @@ -145,7 +145,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.flushTimeout = openSearchSinkConfig.getIndexConfiguration().getFlushTimeout(); this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType(); this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField(); - this.isDocumentIdAnExpression = expressionEvaluator.isValidExpressionStatement(documentIdField); + this.documentId = openSearchSinkConfig.getIndexConfiguration().getDocumentId(); this.routingField = openSearchSinkConfig.getIndexConfiguration().getRoutingField(); this.action = openSearchSinkConfig.getIndexConfiguration().getAction(); this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey(); @@ -337,14 +337,14 @@ private SerializedJson getDocument(final Event event) { String docId = null; - if (isDocumentIdAnExpression) { + if (Objects.nonNull(documentIdField)) { + docId = event.get(documentIdField, String.class); + } else if (Objects.nonNull(documentId)) { try { - docId = (String) expressionEvaluator.evaluate(documentIdField, event); - } catch (final ExpressionEvaluationException e) { - LOG.error("Unable to construct document_id_field from expression {}, the document_id will be generated by OpenSearch", documentIdField); + docId = event.formatString(documentId, expressionEvaluator); + } catch (final ExpressionEvaluationException | EventKeyNotFoundException e) { + LOG.error("Unable to construct document_id with format {}, the document_id will be generated by OpenSearch", documentId, e); } - } else if (Objects.nonNull(documentIdField)) { - docId = event.get(documentIdField, String.class); } String routing = (routingField != null) ? event.get(routingField, String.class) : null; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index ccc2e1c951..e23fdd4e26 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -9,11 +9,14 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.EnumUtils; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3FileReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.services.s3.S3Client; @@ -23,12 +26,15 @@ import java.net.URL; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; public class IndexConfiguration { + private static final Logger LOG = LoggerFactory.getLogger(IndexConfiguration.class); + public static final String SETTINGS = "settings"; public static final String INDEX_ALIAS = "index"; public static final String INDEX_TYPE = "index_type"; @@ -41,6 +47,7 @@ public class IndexConfiguration { public static final String MAX_LOCAL_COMPRESSIONS_FOR_ESTIMATION = "max_local_compressions_for_estimation"; public static final String FLUSH_TIMEOUT = "flush_timeout"; public static final String DOCUMENT_ID_FIELD = "document_id_field"; + public static final String DOCUMENT_ID = "document_id"; public static final String ROUTING_FIELD = "routing_field"; public static final String ISM_POLICY_FILE = "ism_policy_file"; public static final long DEFAULT_BULK_SIZE = 5L; @@ -61,6 +68,7 @@ public class IndexConfiguration { private final String indexAlias; private final Map indexTemplate; private final String documentIdField; + private final String documentId; private final String routingField; private final long bulkSize; private final boolean estimateBulkSizeUsingCompression; @@ -119,12 +127,14 @@ private IndexConfiguration(final Builder builder) { this.routingField = builder.routingField; String documentIdField = builder.documentIdField; + String documentId = builder.documentId; if (indexType.equals(IndexType.TRACE_ANALYTICS_RAW)) { - documentIdField = "spanId"; + documentId = "${spanId}"; } else if (indexType.equals(IndexType.TRACE_ANALYTICS_SERVICE_MAP)) { - documentIdField = "hashId"; + documentId = "${hashId}"; } this.documentIdField = documentIdField; + this.documentId = documentId; this.ismPolicyFile = builder.ismPolicyFile; this.action = builder.action; this.documentRootKey = builder.documentRootKey; @@ -180,10 +190,21 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti final long flushTimeout = pluginSetting.getLongOrDefault(FLUSH_TIMEOUT, DEFAULT_FLUSH_TIMEOUT); builder = builder.withFlushTimeout(flushTimeout); - final String documentId = pluginSetting.getStringOrDefault(DOCUMENT_ID_FIELD, null); - if (documentId != null) { - builder = builder.withDocumentIdField(documentId); + final String documentIdField = pluginSetting.getStringOrDefault(DOCUMENT_ID_FIELD, null); + final String documentId = pluginSetting.getStringOrDefault(DOCUMENT_ID, null); + + + if (Objects.nonNull(documentIdField) && Objects.nonNull(documentId)) { + throw new InvalidPluginConfigurationException("Both document_id_field and document_id cannot be used at the same time. It is preferred to only use document_id as document_id_field is deprecated."); + } + + if (documentIdField != null) { + LOG.warn("document_id_field is deprecated in favor of document_id, and support for document_id_field will be removed in a future major version release."); + builder = builder.withDocumentIdField(documentIdField); + } else if (documentId != null) { + builder = builder.withDocumentId(documentId); } + final String routingField = pluginSetting.getStringOrDefault(ROUTING_FIELD, null); if (routingField != null) { builder = builder.withRoutingField(routingField); @@ -242,6 +263,8 @@ public String getDocumentIdField() { return documentIdField; } + public String getDocumentId() { return documentId; } + public String getRoutingField() { return routingField; } @@ -349,6 +372,7 @@ public static class Builder { private int numReplicas; private String routingField; private String documentIdField; + private String documentId; private long bulkSize = DEFAULT_BULK_SIZE; private boolean estimateBulkSizeUsingCompression = DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION; private int maxLocalCompressionsForEstimation = DEFAULT_MAX_LOCAL_COMPRESSIONS_FOR_ESTIMATION; @@ -391,11 +415,17 @@ public Builder withTemplateFile(final String templateFile) { } public Builder withDocumentIdField(final String documentIdField) { - checkNotNull(documentIdField, "documentId field cannot be null"); + checkNotNull(documentIdField, "document_id_field cannot be null"); this.documentIdField = documentIdField; return this; } + public Builder withDocumentId(final String documentId) { + checkNotNull(documentId, "document_id cannot be null"); + this.documentId = documentId; + return this; + } + public Builder withRoutingField(final String routingField) { this.routingField = routingField; return this; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index 64810bf1f2..cda9476743 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -286,7 +286,7 @@ public void testReadIndexConfig_RawIndexType() { assertEquals(60_000L, indexConfiguration.getFlushTimeout()); assertEquals(false, indexConfiguration.isEstimateBulkSizeUsingCompression()); assertEquals(2, indexConfiguration.getMaxLocalCompressionsForEstimation()); - assertEquals("spanId", indexConfiguration.getDocumentIdField()); + assertEquals("${spanId}", indexConfiguration.getDocumentId()); } @Test @@ -312,7 +312,7 @@ public void testReadIndexConfig_ServiceMapIndexType() { assertEquals(60_000L, indexConfiguration.getFlushTimeout()); assertEquals(false, indexConfiguration.isEstimateBulkSizeUsingCompression()); assertEquals(2, indexConfiguration.getMaxLocalCompressionsForEstimation()); - assertEquals("hashId", indexConfiguration.getDocumentIdField()); + assertEquals("${hashId}", indexConfiguration.getDocumentId()); } @Test @@ -335,7 +335,7 @@ public void testReadIndexConfigCustom() { assertEquals(testFlushTimeout, indexConfiguration.getFlushTimeout()); assertEquals(true, indexConfiguration.isEstimateBulkSizeUsingCompression()); assertEquals(5, indexConfiguration.getMaxLocalCompressionsForEstimation()); - assertEquals(testIdField, indexConfiguration.getDocumentIdField()); + assertEquals(testIdField, indexConfiguration.getDocumentId()); } @Test @@ -356,7 +356,7 @@ public void testReadIndexConfig_ExplicitCustomIndexType() { assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); assertEquals(testBulkSize, indexConfiguration.getBulkSize()); assertEquals(testFlushTimeout, indexConfiguration.getFlushTimeout()); - assertEquals(testIdField, indexConfiguration.getDocumentIdField()); + assertEquals(testIdField, indexConfiguration.getDocumentId()); } @Test @@ -458,7 +458,7 @@ private PluginSetting getPluginSetting(Map metadata) { } private Map initializeConfigMetaData( - String indexType, String indexAlias, String templateFilePath, Long bulkSize, Long flushTimeout, String documentIdField) { + String indexType, String indexAlias, String templateFilePath, Long bulkSize, Long flushTimeout, String documentId) { final Map metadata = new HashMap<>(); if (indexType != null) { metadata.put(IndexConfiguration.INDEX_TYPE, indexType); @@ -475,8 +475,8 @@ private Map initializeConfigMetaData( if (flushTimeout != null) { metadata.put(IndexConfiguration.FLUSH_TIMEOUT, flushTimeout); } - if (documentIdField != null) { - metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, documentIdField); + if (documentId != null) { + metadata.put(IndexConfiguration.DOCUMENT_ID, documentId); } return metadata; }