Skip to content

Commit

Permalink
Deprecate document_id_field and add support for document_id with form…
Browse files Browse the repository at this point in the history
…atting (#3153)

Deprecate document_id_field and add support for document_id with formatting

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Aug 16, 2023
1 parent 1f0ad76 commit 1ede4b6
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private final long flushTimeout;
private final IndexType indexType;
private final String documentIdField;
private final String documentId;
private final String routingField;
private final String action;
private final String documentRootKey;
Expand All @@ -117,7 +118,6 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private PluginSetting pluginSetting;
private final SinkContext sinkContext;
private final ExpressionEvaluator expressionEvaluator;
private final boolean isDocumentIdAnExpression;

private FailedBulkOperationConverter failedBulkOperationConverter;

Expand Down Expand Up @@ -145,7 +145,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.flushTimeout = openSearchSinkConfig.getIndexConfiguration().getFlushTimeout();
this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType();
this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField();
this.isDocumentIdAnExpression = expressionEvaluator.isValidExpressionStatement(documentIdField);
this.documentId = openSearchSinkConfig.getIndexConfiguration().getDocumentId();
this.routingField = openSearchSinkConfig.getIndexConfiguration().getRoutingField();
this.action = openSearchSinkConfig.getIndexConfiguration().getAction();
this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey();
Expand Down Expand Up @@ -337,14 +337,14 @@ private SerializedJson getDocument(final Event event) {

String docId = null;

if (isDocumentIdAnExpression) {
if (Objects.nonNull(documentIdField)) {
docId = event.get(documentIdField, String.class);
} else if (Objects.nonNull(documentId)) {
try {
docId = (String) expressionEvaluator.evaluate(documentIdField, event);
} catch (final ExpressionEvaluationException e) {
LOG.error("Unable to construct document_id_field from expression {}, the document_id will be generated by OpenSearch", documentIdField);
docId = event.formatString(documentId, expressionEvaluator);
} catch (final ExpressionEvaluationException | EventKeyNotFoundException e) {
LOG.error("Unable to construct document_id with format {}, the document_id will be generated by OpenSearch", documentId, e);
}
} else if (Objects.nonNull(documentIdField)) {
docId = event.get(documentIdField, String.class);
}

String routing = (routingField != null) ? event.get(routingField, String.class) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.EnumUtils;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3FileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.s3.S3Client;

Expand All @@ -23,12 +26,15 @@
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class IndexConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(IndexConfiguration.class);

public static final String SETTINGS = "settings";
public static final String INDEX_ALIAS = "index";
public static final String INDEX_TYPE = "index_type";
Expand All @@ -41,6 +47,7 @@ public class IndexConfiguration {
public static final String MAX_LOCAL_COMPRESSIONS_FOR_ESTIMATION = "max_local_compressions_for_estimation";
public static final String FLUSH_TIMEOUT = "flush_timeout";
public static final String DOCUMENT_ID_FIELD = "document_id_field";
public static final String DOCUMENT_ID = "document_id";
public static final String ROUTING_FIELD = "routing_field";
public static final String ISM_POLICY_FILE = "ism_policy_file";
public static final long DEFAULT_BULK_SIZE = 5L;
Expand All @@ -61,6 +68,7 @@ public class IndexConfiguration {
private final String indexAlias;
private final Map<String, Object> indexTemplate;
private final String documentIdField;
private final String documentId;
private final String routingField;
private final long bulkSize;
private final boolean estimateBulkSizeUsingCompression;
Expand Down Expand Up @@ -119,12 +127,14 @@ private IndexConfiguration(final Builder builder) {
this.routingField = builder.routingField;

String documentIdField = builder.documentIdField;
String documentId = builder.documentId;
if (indexType.equals(IndexType.TRACE_ANALYTICS_RAW)) {
documentIdField = "spanId";
documentId = "${spanId}";
} else if (indexType.equals(IndexType.TRACE_ANALYTICS_SERVICE_MAP)) {
documentIdField = "hashId";
documentId = "${hashId}";
}
this.documentIdField = documentIdField;
this.documentId = documentId;
this.ismPolicyFile = builder.ismPolicyFile;
this.action = builder.action;
this.documentRootKey = builder.documentRootKey;
Expand Down Expand Up @@ -180,10 +190,21 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti

final long flushTimeout = pluginSetting.getLongOrDefault(FLUSH_TIMEOUT, DEFAULT_FLUSH_TIMEOUT);
builder = builder.withFlushTimeout(flushTimeout);
final String documentId = pluginSetting.getStringOrDefault(DOCUMENT_ID_FIELD, null);
if (documentId != null) {
builder = builder.withDocumentIdField(documentId);
final String documentIdField = pluginSetting.getStringOrDefault(DOCUMENT_ID_FIELD, null);
final String documentId = pluginSetting.getStringOrDefault(DOCUMENT_ID, null);


if (Objects.nonNull(documentIdField) && Objects.nonNull(documentId)) {
throw new InvalidPluginConfigurationException("Both document_id_field and document_id cannot be used at the same time. It is preferred to only use document_id as document_id_field is deprecated.");
}

if (documentIdField != null) {
LOG.warn("document_id_field is deprecated in favor of document_id, and support for document_id_field will be removed in a future major version release.");
builder = builder.withDocumentIdField(documentIdField);
} else if (documentId != null) {
builder = builder.withDocumentId(documentId);
}

final String routingField = pluginSetting.getStringOrDefault(ROUTING_FIELD, null);
if (routingField != null) {
builder = builder.withRoutingField(routingField);
Expand Down Expand Up @@ -242,6 +263,8 @@ public String getDocumentIdField() {
return documentIdField;
}

public String getDocumentId() { return documentId; }

public String getRoutingField() {
return routingField;
}
Expand Down Expand Up @@ -349,6 +372,7 @@ public static class Builder {
private int numReplicas;
private String routingField;
private String documentIdField;
private String documentId;
private long bulkSize = DEFAULT_BULK_SIZE;
private boolean estimateBulkSizeUsingCompression = DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION;
private int maxLocalCompressionsForEstimation = DEFAULT_MAX_LOCAL_COMPRESSIONS_FOR_ESTIMATION;
Expand Down Expand Up @@ -391,11 +415,17 @@ public Builder withTemplateFile(final String templateFile) {
}

public Builder withDocumentIdField(final String documentIdField) {
checkNotNull(documentIdField, "documentId field cannot be null");
checkNotNull(documentIdField, "document_id_field cannot be null");
this.documentIdField = documentIdField;
return this;
}

public Builder withDocumentId(final String documentId) {
checkNotNull(documentId, "document_id cannot be null");
this.documentId = documentId;
return this;
}

public Builder withRoutingField(final String routingField) {
this.routingField = routingField;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void testReadIndexConfig_RawIndexType() {
assertEquals(60_000L, indexConfiguration.getFlushTimeout());
assertEquals(false, indexConfiguration.isEstimateBulkSizeUsingCompression());
assertEquals(2, indexConfiguration.getMaxLocalCompressionsForEstimation());
assertEquals("spanId", indexConfiguration.getDocumentIdField());
assertEquals("${spanId}", indexConfiguration.getDocumentId());
}

@Test
Expand All @@ -312,7 +312,7 @@ public void testReadIndexConfig_ServiceMapIndexType() {
assertEquals(60_000L, indexConfiguration.getFlushTimeout());
assertEquals(false, indexConfiguration.isEstimateBulkSizeUsingCompression());
assertEquals(2, indexConfiguration.getMaxLocalCompressionsForEstimation());
assertEquals("hashId", indexConfiguration.getDocumentIdField());
assertEquals("${hashId}", indexConfiguration.getDocumentId());
}

@Test
Expand All @@ -335,7 +335,7 @@ public void testReadIndexConfigCustom() {
assertEquals(testFlushTimeout, indexConfiguration.getFlushTimeout());
assertEquals(true, indexConfiguration.isEstimateBulkSizeUsingCompression());
assertEquals(5, indexConfiguration.getMaxLocalCompressionsForEstimation());
assertEquals(testIdField, indexConfiguration.getDocumentIdField());
assertEquals(testIdField, indexConfiguration.getDocumentId());
}

@Test
Expand All @@ -356,7 +356,7 @@ public void testReadIndexConfig_ExplicitCustomIndexType() {
assertFalse(indexConfiguration.getIndexTemplate().isEmpty());
assertEquals(testBulkSize, indexConfiguration.getBulkSize());
assertEquals(testFlushTimeout, indexConfiguration.getFlushTimeout());
assertEquals(testIdField, indexConfiguration.getDocumentIdField());
assertEquals(testIdField, indexConfiguration.getDocumentId());
}

@Test
Expand Down Expand Up @@ -458,7 +458,7 @@ private PluginSetting getPluginSetting(Map<String, Object> metadata) {
}

private Map<String, Object> initializeConfigMetaData(
String indexType, String indexAlias, String templateFilePath, Long bulkSize, Long flushTimeout, String documentIdField) {
String indexType, String indexAlias, String templateFilePath, Long bulkSize, Long flushTimeout, String documentId) {
final Map<String, Object> metadata = new HashMap<>();
if (indexType != null) {
metadata.put(IndexConfiguration.INDEX_TYPE, indexType);
Expand All @@ -475,8 +475,8 @@ private Map<String, Object> initializeConfigMetaData(
if (flushTimeout != null) {
metadata.put(IndexConfiguration.FLUSH_TIMEOUT, flushTimeout);
}
if (documentIdField != null) {
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, documentIdField);
if (documentId != null) {
metadata.put(IndexConfiguration.DOCUMENT_ID, documentId);
}
return metadata;
}
Expand Down

0 comments on commit 1ede4b6

Please sign in to comment.