Skip to content

Commit

Permalink
Add index_types for OTEL logs and metrics #3148 (#3929)
Browse files Browse the repository at this point in the history
* Add ISM policies for logs and metrics

Signed-off-by: Jürgen Walter <[email protected]>

* Add index templates for logs

Signed-off-by: Jürgen Walter <[email protected]>

* Add index templates for metrics

Signed-off-by: Jürgen Walter <[email protected]>

* Add index_types for OTEL logs and metrics

Signed-off-by: Jürgen Walter <[email protected]>

* Test OpenSearch sink

works with log-analytics and metric-analytics
and index type

Signed-off-by: Jürgen Walter <[email protected]>

* Test log and metric index types

Signed-off-by: Jürgen Walter <[email protected]>

* Document log and metrics index_type usage

Signed-off-by: Jürgen Walter <[email protected]>

* Minor: Remove incorrect html tag

Signed-off-by: Jürgen Walter <[email protected]>

* Fix test by adding date_detection false

Fixes testInstantiateSinkMetricsDefaultMetricSink

Alertnative would have been to adjust the test

Signed-off-by: Jürgen Walter <[email protected]>

* Rename test

Signed-off-by: Jürgen Walter <[email protected]>

* Fix metric ism file constants

Signed-off-by: Jürgen Walter <[email protected]>

* Rename index template file for metrics

Signed-off-by: Jürgen Walter <[email protected]>

* Add assertions to tests

Signed-off-by: Jürgen Walter <[email protected]>

* Fix index patterns for logs and metrics

Signed-off-by: Jürgen Walter <[email protected]>

* Update to plugins ISM API

Signed-off-by: Jürgen Walter <[email protected]>

* Add assertion

Signed-off-by: Jürgen Walter <[email protected]>

* Revert "Update to plugins ISM API"

This reverts commit 3fd61af.

Signed-off-by: Jürgen Walter <[email protected]>

* Add fields data prepper writes

Signed-off-by: Jürgen Walter <[email protected]>

* Use field type data prepper writes

Signed-off-by: Jürgen Walter <[email protected]>

---------

Signed-off-by: Jürgen Walter <[email protected]>
  • Loading branch information
juergen-walter authored Oct 30, 2024
1 parent e59917d commit 569e3d1
Show file tree
Hide file tree
Showing 15 changed files with 999 additions and 4 deletions.
43 changes: 41 additions & 2 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pipeline:

The OpenSearch sink will reserve `otel-v1-apm-span-*` as index pattern and `otel-v1-apm-span` as index alias for record ingestion.

### </a>Service map trace analytics
### Service map trace analytics

```
pipeline:
Expand All @@ -45,6 +45,45 @@ pipeline:

The OpenSearch sink will reserve `otel-v1-apm-service-map` as index for record ingestion.

### Log analytics

```
pipeline:
...
sink:
opensearch:
hosts: ["https://localhost:9200"]
cert: path/to/cert
username: YOUR_USERNAME_HERE
password: YOUR_PASSWORD_HERE
index_type: log-analytics
dlq_file: /your/local/dlq-file
max_retries: 20
bulk_size: 4
```

The OpenSearch sink will reserve `logs-otel-v1-*` as index pattern and `logs-otel-v1` as index alias for record ingestion.

### Metric analytics

```
pipeline:
...
sink:
opensearch:
hosts: ["https://localhost:9200"]
cert: path/to/cert
username: YOUR_USERNAME_HERE
password: YOUR_PASSWORD_HERE
index_type: metric-analytics
dlq_file: /your/local/dlq-file
max_retries: 20
bulk_size: 4
```

The OpenSearch sink will reserve `metrics-otel-v1-*` as index pattern and `metrics-otel-v1` as index alias for record ingestion.


### Amazon OpenSearch Service

The OpenSearch sink can also be configured for an Amazon OpenSearch Service domain. See [security](security.md) for details.
Expand Down Expand Up @@ -93,7 +132,7 @@ Default is null.

- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.

- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `metric-analytics`, `log-analytics`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.

- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. For `distribution_version` set to `es6`, default value is `false`, otherwise default value is `true`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public class OpenSearchSinkIT {
private static final String DEFAULT_SERVICE_MAP_FILE = "service-map-1.json";
private static final String INCLUDE_TYPE_NAME_FALSE_URI = "?include_type_name=false";
private static final String TRACE_INGESTION_TEST_DISABLED_REASON = "Trace ingestion is not supported for ES 6";
private static final String LOG_INGESTION_TEST_DISABLED_REASON = "Log ingestion is not supported for ES 6";
private static final String METRIC_INGESTION_TEST_DISABLED_REASON = "Metric ingestion is not supported for ES 6";

private RestClient client;
private SinkContext sinkContext;
Expand Down Expand Up @@ -188,6 +190,7 @@ public void testInstantiateSinkRawSpanDefault() throws IOException {
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
assertThat(indexAlias, equalTo("otel-v1-apm-span"));
Request request = new Request(HttpMethod.HEAD, indexAlias);
Response response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
Expand Down Expand Up @@ -226,6 +229,96 @@ public void testInstantiateSinkRawSpanDefault() throws IOException {
}
}

@Test
@DisabledIf(value = "isES6", disabledReason = LOG_INGESTION_TEST_DISABLED_REASON)
public void testInstantiateSinkLogsDefaultLogSink() throws IOException {
final PluginSetting pluginSetting = generatePluginSetting(IndexType.LOG_ANALYTICS.getValue(), null, null);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.LOG_ANALYTICS);
assertThat(indexAlias, equalTo("logs-otel-v1"));
Request request = new Request(HttpMethod.HEAD, indexAlias);
Response response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
final String index = String.format("%s-000001", indexAlias);
final Map<String, Object> mappings = getIndexMappings(index);
assertThat(mappings, notNullValue());
assertThat((boolean) mappings.get("date_detection"), equalTo(false));
sink.shutdown();

if (isOSBundle()) {
// Check managed index
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(getIndexPolicyId(index), equalTo(IndexConstants.LOGS_ISM_POLICY));
}
);
}

// roll over initial index
request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias));
request.setJsonEntity("{ \"conditions\" : { } }\n");
response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));

// Instantiate sink again
sink = createObjectUnderTest(pluginSetting, true);
// Make sure no new write index *-000001 is created under alias
final String rolloverIndexName = String.format("%s-000002", indexAlias);
request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias");
response = client.performRequest(request);
assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true));
sink.shutdown();

if (isOSBundle()) {
// Check managed index
assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.LOGS_ISM_POLICY));
}
}

@Test
@DisabledIf(value = "isES6", disabledReason = METRIC_INGESTION_TEST_DISABLED_REASON)
public void testInstantiateSinkMetricsDefaultMetricSink() throws IOException {
final PluginSetting pluginSetting = generatePluginSetting(IndexType.METRIC_ANALYTICS.getValue(), null, null);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.METRIC_ANALYTICS);
assertThat(indexAlias, equalTo("metrics-otel-v1"));
Request request = new Request(HttpMethod.HEAD, indexAlias);
Response response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
final String index = String.format("%s-000001", indexAlias);
final Map<String, Object> mappings = getIndexMappings(index);
assertThat(mappings, notNullValue());
assertThat((boolean) mappings.get("date_detection"), equalTo(false));
sink.shutdown();

if (isOSBundle()) {
// Check managed index
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(getIndexPolicyId(index), equalTo(IndexConstants.METRICS_ISM_POLICY));
}
);
}

// roll over initial index
request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias));
request.setJsonEntity("{ \"conditions\" : { } }\n");
response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));

// Instantiate sink again
sink = createObjectUnderTest(pluginSetting, true);
// Make sure no new write index *-000001 is created under alias
final String rolloverIndexName = String.format("%s-000002", indexAlias);
request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias");
response = client.performRequest(request);
assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true));
sink.shutdown();

if (isOSBundle()) {
// Check managed index
assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.METRICS_ISM_POLICY));
}
}

@Test
@DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON)
public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
templateURL = loadExistingTemplate(templateType, IndexConstants.RAW_DEFAULT_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.TRACE_ANALYTICS_SERVICE_MAP)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.SERVICE_MAP_DEFAULT_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.LOG_ANALYTICS)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.LOGS_DEFAULT_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.METRIC_ANALYTICS)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_DEFAULT_TEMPLATE_FILE);
} else if (templateFile != null) {
if (templateFile.toLowerCase().startsWith(S3_PREFIX)) {
FileReader s3FileReader = new S3FileReader(s3Client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ public class IndexConstants {
public static final String RAW_ISM_POLICY = "raw-span-policy";
public static final String RAW_ISM_FILE_NO_ISM_TEMPLATE = "raw-span-policy-no-ism-template.json";
public static final String RAW_ISM_FILE_WITH_ISM_TEMPLATE = "raw-span-policy-with-ism-template.json";

public static final String LOGS_DEFAULT_TEMPLATE_FILE = "logs-otel-v1-index-template.json";
public static final String LOGS_ISM_POLICY = "logs-policy";
public static final String LOGS_ISM_FILE_NO_ISM_TEMPLATE = "logs-policy-no-ism-template.json";
public static final String LOGS_ISM_FILE_WITH_ISM_TEMPLATE = "logs-policy-with-ism-template.json";

public static final String METRICS_DEFAULT_TEMPLATE_FILE = "metrics-otel-v1-index-template.json";
public static final String METRICS_ISM_POLICY = "metrics-policy";
public static final String METRICS_ISM_FILE_NO_ISM_TEMPLATE = "metrics-policy-no-ism-template.json";
public static final String METRICS_ISM_FILE_WITH_ISM_TEMPLATE = "metrics-policy-with-ism-template.json";

public static final String ISM_ENABLED_SETTING = "opendistro.index_state_management.enabled";
public static final String ISM_POLICY_ID_SETTING = "opendistro.index_state_management.policy_id";
public static final String ISM_ROLLOVER_ALIAS_SETTING = "opendistro.index_state_management.rollover_alias";
Expand All @@ -26,7 +37,9 @@ public class IndexConstants {

static {
// TODO: extract out version number into version enum
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_RAW, "otel-v1-apm-span");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_SERVICE_MAP, "otel-v1-apm-service-map");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_RAW, "otel-v1-apm-span");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS, "logs-otel-v1");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS, "metrics-otel-v1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ public final IndexManager getIndexManager(final IndexType indexType,
indexManager = new TraceAnalyticsServiceMapIndexManager(
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
break;
case LOG_ANALYTICS:
indexManager = new LogAnalyticsIndexManager(
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
break;
case METRIC_ANALYTICS:
indexManager = new MetricAnalyticsIndexManager(
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
break;
case MANAGEMENT_DISABLED:
indexManager = new ManagementDisabledIndexManager(
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
Expand Down Expand Up @@ -140,6 +148,42 @@ public TraceAnalyticsServiceMapIndexManager(final RestHighLevelClient restHighLe
}
}

private static class LogAnalyticsIndexManager extends AbstractIndexManager {

public LogAnalyticsIndexManager(final RestHighLevelClient restHighLevelClient,
final OpenSearchClient openSearchClient,
final OpenSearchSinkConfiguration openSearchSinkConfiguration,
final ClusterSettingsParser clusterSettingsParser,
final TemplateStrategy templateStrategy,
final String indexAlias) {
super(restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
this.ismPolicyManagementStrategy = new IsmPolicyManagement(
openSearchClient,
restHighLevelClient,
IndexConstants.LOGS_ISM_POLICY,
IndexConstants.LOGS_ISM_FILE_WITH_ISM_TEMPLATE,
IndexConstants.LOGS_ISM_FILE_NO_ISM_TEMPLATE);
}
}

private static class MetricAnalyticsIndexManager extends AbstractIndexManager {

public MetricAnalyticsIndexManager(final RestHighLevelClient restHighLevelClient,
final OpenSearchClient openSearchClient,
final OpenSearchSinkConfiguration openSearchSinkConfiguration,
final ClusterSettingsParser clusterSettingsParser,
final TemplateStrategy templateStrategy,
final String indexAlias) {
super(restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
this.ismPolicyManagementStrategy = new IsmPolicyManagement(
openSearchClient,
restHighLevelClient,
IndexConstants.METRICS_ISM_POLICY,
IndexConstants.METRICS_ISM_FILE_WITH_ISM_TEMPLATE,
IndexConstants.METRICS_ISM_FILE_NO_ISM_TEMPLATE);
}
}

private class ManagementDisabledIndexManager extends AbstractIndexManager {
protected ManagementDisabledIndexManager(final RestHighLevelClient restHighLevelClient,
final OpenSearchClient openSearchClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
public enum IndexType {
TRACE_ANALYTICS_RAW("trace-analytics-raw"),
TRACE_ANALYTICS_SERVICE_MAP("trace-analytics-service-map"),
LOG_ANALYTICS("log-analytics"),
METRIC_ANALYTICS("metric-analytics"),
CUSTOM("custom"),
MANAGEMENT_DISABLED("management_disabled");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
{
"version": 1,
"template": {
"mappings": {
"date_detection": false,
"dynamic_templates": [
{
"resource_attributes_map": {
"mapping": {
"type": "keyword"
},
"path_match": "resource.attributes.*"
}
},
{
"log_attributes_map": {
"mapping": {
"type": "keyword"
},
"path_match": "log.attributes.*"
}
}
],
"_source": {
"enabled": true
},
"properties": {
"severity": {
"properties": {
"number": {
"type": "long"
},
"text": {
"type": "keyword"
}
}
},
"body": {
"type": "text"
},
"@timestamp": {
"type": "date_nanos"
},
"time": {
"type": "date_nanos"
},
"observedTimestamp": {
"type": "date_nanos"
},
"observedTime": {
"type": "alias",
"path": "observedTimestamp"
},
"traceId": {
"ignore_above": 256,
"type": "keyword"
},
"spanId": {
"ignore_above": 256,
"type": "keyword"
},
"schemaUrl": {
"type": "keyword"
},
"instrumentationScope": {
"properties": {
"name": {
"type": "keyword"
},
"version": {
"type": "keyword"
}
}
},
"event": {
"properties": {
"kind": {
"type": "keyword"
},
"domain": {
"type": "keyword"
},
"category": {
"type": "keyword"
},
"type": {
"type": "keyword"
},
"result": {
"type": "keyword"
},
"exception": {
"properties": {
"message": {
"type": "text"
},
"stacktrace": {
"type": "text"
},
"type": {
"type": "keyword"
}
}
}
}
}
}
}
}
}
Loading

0 comments on commit 569e3d1

Please sign in to comment.