From de7e8d4c7d3791969dbe3f2b419a01ebed669d3d Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 29 Nov 2023 22:15:14 +0000 Subject: [PATCH] add field based rules support in correlation engine Signed-off-by: Subhobrata Dey --- .../correlation/JoinEngine.java | 85 +++- .../model/CorrelationQuery.java | 30 +- .../model/CorrelationRule.java | 25 +- .../securityanalytics/TestHelpers.java | 392 +++++++++++++++++- .../CorrelationEngineRestApiIT.java | 321 +++++++++++++- 5 files changed, 808 insertions(+), 45 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java index 5e4bb6629..55bbda9a8 100644 --- a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java +++ b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java @@ -4,7 +4,8 @@ */ package org.opensearch.securityanalytics.correlation; -import kotlin.Pair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.join.ScoreMode; @@ -15,6 +16,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.routing.Preference; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.commons.alerting.action.PublishFindingsRequest; @@ -37,11 +39,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; @@ -127,6 +131,7 @@ public void onFailure(Exception e) { private void getValidDocuments(String detectorType, List indices, List correlationRules, List relatedDocIds) { MultiSearchRequest mSearchRequest = new MultiSearchRequest(); List validCorrelationRules = new ArrayList<>(); + List validFields = new ArrayList<>(); for (CorrelationRule rule: correlationRules) { Optional query = rule.getCorrelationQueries().stream() @@ -134,18 +139,26 @@ private void getValidDocuments(String detectorType, List indices, List indices, List filteredCorrelationRules = new ArrayList<>(); + List> filteredCorrelationRules = new ArrayList<>(); int idx = 0; for (MultiSearchResponse.Item response : responses) { @@ -165,14 +178,17 @@ public void onResponse(MultiSearchResponse items) { } if (response.getResponse().getHits().getTotalHits().value > 0L) { - filteredCorrelationRules.add(validCorrelationRules.get(idx)); + filteredCorrelationRules.add(Triple.of(validCorrelationRules.get(idx), + response.getResponse().getHits().getHits(), validFields.get(idx))); } ++idx; } Map> categoryToQueriesMap = new HashMap<>(); - for (CorrelationRule rule: filteredCorrelationRules) { - List queries = rule.getCorrelationQueries(); + Map categoryToTimeWindowMap = new HashMap<>(); + for (Triple rule: filteredCorrelationRules) { + List queries = rule.getLeft().getCorrelationQueries(); + Long timeWindow = rule.getLeft().getCorrTimeWindow(); for (CorrelationQuery query: queries) { List correlationQueries; @@ -181,12 +197,36 @@ public void onResponse(MultiSearchResponse items) { } else { correlationQueries = new ArrayList<>(); } - correlationQueries.add(query); + if (categoryToTimeWindowMap.containsKey(query.getCategory())) { + categoryToTimeWindowMap.put(query.getCategory(), Math.max(timeWindow, categoryToTimeWindowMap.get(query.getCategory()))); + } else { + categoryToTimeWindowMap.put(query.getCategory(), timeWindow); + } + + if (query.getField() == null) { + correlationQueries.add(query); + } else { + SearchHit[] hits = rule.getMiddle(); + StringBuilder qb = new StringBuilder(query.getField()).append(":("); + for (int i = 0; i < hits.length; ++i) { + String value = hits[i].field(rule.getRight()).getValue(); + qb.append(value); + if (i < hits.length-1) { + qb.append(" OR "); + } else { + qb.append(")"); + } + } + if (query.getQuery() != null) { + qb.append(" AND ").append(query.getQuery()); + } + correlationQueries.add(new CorrelationQuery(query.getIndex(), qb.toString(), query.getCategory(), null)); + } categoryToQueriesMap.put(query.getCategory(), correlationQueries); } } - searchFindingsByTimestamp(detectorType, categoryToQueriesMap, - filteredCorrelationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList())); + searchFindingsByTimestamp(detectorType, categoryToQueriesMap, categoryToTimeWindowMap, + filteredCorrelationRules.stream().map(Triple::getLeft).map(CorrelationRule::getId).collect(Collectors.toList())); } @Override @@ -203,15 +243,15 @@ public void onFailure(Exception e) { * this method searches for parent findings given the log category & correlation time window & collects all related docs * for them. */ - private void searchFindingsByTimestamp(String detectorType, Map> categoryToQueriesMap, List correlationRules) { + private void searchFindingsByTimestamp(String detectorType, Map> categoryToQueriesMap, Map categoryToTimeWindowMap, List correlationRules) { long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli(); MultiSearchRequest mSearchRequest = new MultiSearchRequest(); List>> categoryToQueriesPairs = new ArrayList<>(); for (Map.Entry> categoryToQueries: categoryToQueriesMap.entrySet()) { RangeQueryBuilder queryBuilder = QueryBuilders.rangeQuery("timestamp") - .gte(findingTimestamp - corrTimeWindow) - .lte(findingTimestamp + corrTimeWindow); + .gte(findingTimestamp - categoryToTimeWindowMap.get(categoryToQueries.getKey())) + .lte(findingTimestamp + categoryToTimeWindowMap.get(categoryToQueries.getKey())); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(queryBuilder); @@ -222,7 +262,7 @@ private void searchFindingsByTimestamp(String detectorType, Map(categoryToQueries.getKey(), categoryToQueries.getValue())); + categoryToQueriesPairs.add(Pair.of(categoryToQueries.getKey(), categoryToQueries.getValue())); } if (!mSearchRequest.requests().isEmpty()) { @@ -246,17 +286,17 @@ public void onResponse(MultiSearchResponse items) { .map(Object::toString).collect(Collectors.toList())); } - List correlationQueries = categoryToQueriesPairs.get(idx).getSecond(); + List correlationQueries = categoryToQueriesPairs.get(idx).getValue(); List indices = correlationQueries.stream().map(CorrelationQuery::getIndex).collect(Collectors.toList()); List queries = correlationQueries.stream().map(CorrelationQuery::getQuery).collect(Collectors.toList()); - relatedDocsMap.put(categoryToQueriesPairs.get(idx).getFirst(), + relatedDocsMap.put(categoryToQueriesPairs.get(idx).getKey(), new DocSearchCriteria( indices, queries, relatedDocIds)); ++idx; } - searchDocsWithFilterKeys(detectorType, relatedDocsMap, correlationRules); + searchDocsWithFilterKeys(detectorType, relatedDocsMap, categoryToTimeWindowMap, correlationRules); } @Override @@ -272,7 +312,7 @@ public void onFailure(Exception e) { /** * Given the related docs from parent findings, this method filters only those related docs which match parent join criteria. */ - private void searchDocsWithFilterKeys(String detectorType, Map relatedDocsMap, List correlationRules) { + private void searchDocsWithFilterKeys(String detectorType, Map relatedDocsMap, Map categoryToTimeWindowMap, List correlationRules) { MultiSearchRequest mSearchRequest = new MultiSearchRequest(); List categories = new ArrayList<>(); @@ -283,6 +323,7 @@ private void searchDocsWithFilterKeys(String detectorType, Map> filteredRelatedDocIds, List correlationRules) { + private void getCorrelatedFindings(String detectorType, Map> filteredRelatedDocIds, Map categoryToTimeWindowMap, List correlationRules) { long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli(); MultiSearchRequest mSearchRequest = new MultiSearchRequest(); List categories = new ArrayList<>(); @@ -344,8 +385,8 @@ private void getCorrelatedFindings(String detectorType, Map for (Map.Entry> relatedDocIds: filteredRelatedDocIds.entrySet()) { BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() .filter(QueryBuilders.rangeQuery("timestamp") - .gte(findingTimestamp - corrTimeWindow) - .lte(findingTimestamp + corrTimeWindow)) + .gte(findingTimestamp - categoryToTimeWindowMap.get(relatedDocIds.getKey())) + .lte(findingTimestamp + categoryToTimeWindowMap.get(relatedDocIds.getKey()))) .must(QueryBuilders.termsQuery("correlated_doc_ids", relatedDocIds.getValue())); if (relatedDocIds.getKey().equals(detectorType)) { diff --git a/src/main/java/org/opensearch/securityanalytics/model/CorrelationQuery.java b/src/main/java/org/opensearch/securityanalytics/model/CorrelationQuery.java index d2940405c..bec989b8d 100644 --- a/src/main/java/org/opensearch/securityanalytics/model/CorrelationQuery.java +++ b/src/main/java/org/opensearch/securityanalytics/model/CorrelationQuery.java @@ -22,33 +22,45 @@ public class CorrelationQuery implements Writeable, ToXContentObject { private static final String QUERY = "query"; private static final String CATEGORY = "category"; + private static final String FIELD = "field"; + private String index; private String query; private String category; - public CorrelationQuery(String index, String query, String category) { + private String field; + + public CorrelationQuery(String index, String query, String category, String field) { this.index = index; this.query = query; this.category = category; + this.field = field; } public CorrelationQuery(StreamInput sin) throws IOException { - this(sin.readString(), sin.readString(), sin.readString()); + this(sin.readString(), sin.readOptionalString(), sin.readString(), sin.readOptionalString()); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(index); - out.writeString(query); + out.writeOptionalString(query); out.writeString(category); + out.writeOptionalString(field); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(INDEX, index).field(QUERY, query).field(CATEGORY, category); + builder.field(INDEX, index).field(CATEGORY, category); + if (query != null) { + builder.field(QUERY, query); + } + if (field != null) { + builder.field(FIELD, field); + } return builder.endObject(); } @@ -56,6 +68,7 @@ public static CorrelationQuery parse(XContentParser xcp) throws IOException { String index = null; String query = null; String category = null; + String field = null; XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp); while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -72,11 +85,14 @@ public static CorrelationQuery parse(XContentParser xcp) throws IOException { case CATEGORY: category = xcp.text(); break; + case FIELD: + field = xcp.text(); + break; default: xcp.skipChildren(); } } - return new CorrelationQuery(index, query, category); + return new CorrelationQuery(index, query, category, field); } public static CorrelationQuery readFrom(StreamInput sin) throws IOException { @@ -94,4 +110,8 @@ public String getQuery() { public String getCategory() { return category; } + + public String getField() { + return field; + } } \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/model/CorrelationRule.java b/src/main/java/org/opensearch/securityanalytics/model/CorrelationRule.java index f29213e14..46c30f0f0 100644 --- a/src/main/java/org/opensearch/securityanalytics/model/CorrelationRule.java +++ b/src/main/java/org/opensearch/securityanalytics/model/CorrelationRule.java @@ -28,6 +28,7 @@ public class CorrelationRule implements Writeable, ToXContentObject { public static final String NO_ID = ""; public static final Long NO_VERSION = 1L; private static final String CORRELATION_QUERIES = "correlate"; + private static final String CORRELATION_TIME_WINDOW = "time_window"; private String id; @@ -37,15 +38,18 @@ public class CorrelationRule implements Writeable, ToXContentObject { private List correlationQueries; - public CorrelationRule(String id, Long version, String name, List correlationQueries) { + private Long corrTimeWindow; + + public CorrelationRule(String id, Long version, String name, List correlationQueries, Long corrTimeWindow) { this.id = id != null ? id : NO_ID; this.version = version != null ? version : NO_VERSION; this.name = name; this.correlationQueries = correlationQueries; + this.corrTimeWindow = corrTimeWindow != null? corrTimeWindow: 300000L; } public CorrelationRule(StreamInput sin) throws IOException { - this(sin.readString(), sin.readLong(), sin.readString(), sin.readList(CorrelationQuery::readFrom)); + this(sin.readString(), sin.readLong(), sin.readString(), sin.readList(CorrelationQuery::readFrom), sin.readLong()); } @Override @@ -57,6 +61,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws CorrelationQuery[] correlationQueries = new CorrelationQuery[] {}; correlationQueries = this.correlationQueries.toArray(correlationQueries); builder.field(CORRELATION_QUERIES, correlationQueries); + builder.field(CORRELATION_TIME_WINDOW, corrTimeWindow); return builder.endObject(); } @@ -69,6 +74,7 @@ public void writeTo(StreamOutput out) throws IOException { for (CorrelationQuery query : correlationQueries) { query.writeTo(out); } + out.writeLong(corrTimeWindow); } public static CorrelationRule parse(XContentParser xcp, String id, Long version) throws IOException { @@ -81,6 +87,7 @@ public static CorrelationRule parse(XContentParser xcp, String id, Long version) String name = null; List correlationQueries = new ArrayList<>(); + Long corrTimeWindow = null; XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp); while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -98,11 +105,14 @@ public static CorrelationRule parse(XContentParser xcp, String id, Long version) correlationQueries.add(query); } break; + case CORRELATION_TIME_WINDOW: + corrTimeWindow = xcp.longValue(); + break; default: xcp.skipChildren(); } } - return new CorrelationRule(id, version, name, correlationQueries); + return new CorrelationRule(id, version, name, correlationQueries, corrTimeWindow); } public static CorrelationRule readFrom(StreamInput sin) throws IOException { @@ -137,6 +147,10 @@ public List getCorrelationQueries() { return correlationQueries; } + public Long getCorrTimeWindow() { + return corrTimeWindow; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -145,11 +159,12 @@ public boolean equals(Object o) { return id.equals(that.id) && version.equals(that.version) && name.equals(that.name) - && correlationQueries.equals(that.correlationQueries); + && correlationQueries.equals(that.correlationQueries) + && corrTimeWindow.equals(that.corrTimeWindow); } @Override public int hashCode() { - return Objects.hash(id, version, name, correlationQueries); + return Objects.hash(id, version, name, correlationQueries, corrTimeWindow); } } \ No newline at end of file diff --git a/src/test/java/org/opensearch/securityanalytics/TestHelpers.java b/src/test/java/org/opensearch/securityanalytics/TestHelpers.java index db857eb9c..a8c45be9f 100644 --- a/src/test/java/org/opensearch/securityanalytics/TestHelpers.java +++ b/src/test/java/org/opensearch/securityanalytics/TestHelpers.java @@ -175,9 +175,9 @@ public static CorrelationRule randomCorrelationRule(String name) { name = name.isEmpty()? ">": name; return new CorrelationRule(CorrelationRule.NO_ID, CorrelationRule.NO_VERSION, name, List.of( - new CorrelationQuery("vpc_flow1", "dstaddr:192.168.1.*", "network"), - new CorrelationQuery("ad_logs1", "azure.platformlogs.result_type:50126", "ad_ldap") - )); + new CorrelationQuery("vpc_flow1", "dstaddr:192.168.1.*", "network", null), + new CorrelationQuery("ad_logs1", "azure.platformlogs.result_type:50126", "ad_ldap", null) + ), 300000L); } public static String randomRule() { @@ -209,6 +209,29 @@ public static String randomRule() { "level: high"; } + public static String randomRuleForCorrelations(String value) { + return "id: 5f92fff9-82e2-48ab-8fc1-8b133556a551\n" + + "logsource:\n" + + " product: cloudtrail\n" + + "title: AWS User Created\n" + + "description: AWS User Created\n" + + "tags:\n" + + " - attack.test1\n" + + "falsepositives:\n" + + " - Legit User Account Administration\n" + + "level: high\n" + + "date: 2022/01/01\n" + + "status: experimental\n" + + "references:\n" + + " - 'https://github.com/RhinoSecurityLabs/AWS-IAM-Privilege-Escalation'\n" + + "author: toffeebr33k\n" + + "detection:\n" + + " condition: selection_source\n" + + " selection_source:\n" + + " EventName:\n" + + " - " + value; + } + public static String randomRuleForCustomLogType() { return "title: Remote Encrypting File System Abuse\n" + "id: 5f92fff9-82e2-48eb-8fc1-8b133556a551\n" + @@ -787,7 +810,7 @@ public static String windowsIndexMapping() { " \"type\": \"date\"\n" + " },\n" + " \"EventType\": {\n" + - " \"type\": \"integer\"\n" + + " \"type\": \"keyword\"\n" + " },\n" + " \"ExecutionProcessID\": {\n" + " \"type\": \"long\"\n" + @@ -1306,6 +1329,7 @@ public static String randomDoc() { "\"Severity\":\"INFO\",\n" + "\"EventID\":22,\n" + "\"SourceName\":\"Microsoft-Windows-Sysmon\",\n" + + "\"SourceIp\":\"1.2.3.4\",\n" + "\"ProviderGuid\":\"{5770385F-C22A-43E0-BF4C-06F5698FFBD9}\",\n" + "\"Version\":5,\n" + "\"TaskValue\":22,\n" + @@ -1356,6 +1380,60 @@ public static String randomAdLdapDoc() { "}"; } + public static String randomCloudtrailDoc(String user, String event) { + return "{\n" + + " \"eventVersion\": \"1.08\",\n" + + " \"userIdentity\": {\n" + + " \"type\": \"IAMUser\",\n" + + " \"principalId\": \"AIDA6ON6E4XEGITEXAMPLE\",\n" + + " \"arn\": \"arn:aws:iam::888888888888:user/Mary\",\n" + + " \"accountId\": \"888888888888\",\n" + + " \"accessKeyId\": \"AKIAIOSFODNN7EXAMPLE\",\n" + + " \"userName\": \"Mary\",\n" + + " \"sessionContext\": {\n" + + " \"sessionIssuer\": {},\n" + + " \"webIdFederationData\": {},\n" + + " \"attributes\": {\n" + + " \"creationDate\": \"2023-07-19T21:11:57Z\",\n" + + " \"mfaAuthenticated\": \"false\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"eventTime\": \"2023-07-19T21:25:09Z\",\n" + + " \"eventSource\": \"iam.amazonaws.com\",\n" + + " \"EventName\": \"" + event + "\",\n" + + " \"awsRegion\": \"us-east-1\",\n" + + " \"sourceIPAddress\": \"192.0.2.0\",\n" + + " \"AccountName\": \"" + user + "\",\n" + + " \"userAgent\": \"aws-cli/2.13.5 Python/3.11.4 Linux/4.14.255-314-253.539.amzn2.x86_64 exec-env/CloudShell exe/x86_64.amzn.2 prompt/off command/iam.create-user\",\n" + + " \"requestParameters\": {\n" + + " \"userName\": \"" + user + "\"\n" + + " },\n" + + " \"responseElements\": {\n" + + " \"user\": {\n" + + " \"path\": \"/\",\n" + + " \"arn\": \"arn:aws:iam::888888888888:user/Richard\",\n" + + " \"userId\": \"AIDA6ON6E4XEP7EXAMPLE\",\n" + + " \"createDate\": \"Jul 19, 2023 9:25:09 PM\",\n" + + " \"userName\": \"Richard\"\n" + + " }\n" + + " },\n" + + " \"requestID\": \"2d528c76-329e-410b-9516-EXAMPLE565dc\",\n" + + " \"eventID\": \"ba0801a1-87ec-4d26-be87-EXAMPLE75bbb\",\n" + + " \"readOnly\": false,\n" + + " \"eventType\": \"AwsApiCall\",\n" + + " \"managementEvent\": true,\n" + + " \"recipientAccountId\": \"888888888888\",\n" + + " \"eventCategory\": \"Management\",\n" + + " \"tlsDetails\": {\n" + + " \"tlsVersion\": \"TLSv1.2\",\n" + + " \"cipherSuite\": \"ECDHE-RSA-AES128-GCM-SHA256\",\n" + + " \"clientProvidedHostHeader\": \"iam.amazonaws.com\"\n" + + " },\n" + + " \"sessionCredentialFromConsole\": \"true\"\n" + + "}"; + } + public static String randomAppLogDoc() { return "{\n" + " \"endpoint\": \"/customer_records.txt\",\n" + @@ -1386,6 +1464,312 @@ public static String adLdapLogMappings() { " }"; } + public static String cloudtrailMappings() { + return "\"properties\": {\n" + + " \"Records\": {\n" + + " \"properties\": {\n" + + " \"awsRegion\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"eventCategory\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"eventID\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"eventName\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"eventSource\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"eventTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"eventType\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"eventVersion\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"managementEvent\": {\n" + + " \"type\": \"boolean\"\n" + + " },\n" + + " \"readOnly\": {\n" + + " \"type\": \"boolean\"\n" + + " },\n" + + " \"recipientAccountId\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"requestID\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"requestParameters\": {\n" + + " \"properties\": {\n" + + " \"userName\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " },\n" + + " \"responseElements\": {\n" + + " \"properties\": {\n" + + " \"user\": {\n" + + " \"properties\": {\n" + + " \"arn\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"createDate\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"path\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"userId\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"userName\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " },\n" + + " \"sessionCredentialFromConsole\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"sourceIPAddress\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"tlsDetails\": {\n" + + " \"properties\": {\n" + + " \"cipherSuite\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clientProvidedHostHeader\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"tlsVersion\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " },\n" + + " \"userAgent\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"userIdentity\": {\n" + + " \"properties\": {\n" + + " \"accessKeyId\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"accountId\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"arn\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"principalId\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"sessionContext\": {\n" + + " \"properties\": {\n" + + " \"attributes\": {\n" + + " \"properties\": {\n" + + " \"creationDate\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"mfaAuthenticated\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " },\n" + + " \"sessionIssuer\": {\n" + + " \"type\": \"object\"\n" + + " },\n" + + " \"webIdFederationData\": {\n" + + " \"type\": \"object\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"type\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"userName\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }}"; + } + public static String s3AccessLogMappings() { return " \"properties\": {" + " \"aws.cloudtrail.eventSource\": {" + diff --git a/src/test/java/org/opensearch/securityanalytics/correlation/CorrelationEngineRestApiIT.java b/src/test/java/org/opensearch/securityanalytics/correlation/CorrelationEngineRestApiIT.java index 1a001311d..cb816851c 100644 --- a/src/test/java/org/opensearch/securityanalytics/correlation/CorrelationEngineRestApiIT.java +++ b/src/test/java/org/opensearch/securityanalytics/correlation/CorrelationEngineRestApiIT.java @@ -4,6 +4,8 @@ */ package org.opensearch.securityanalytics.correlation; +import org.apache.http.entity.StringEntity; +import org.apache.http.message.BasicHeader; import org.junit.Assert; import org.opensearch.client.Request; import org.opensearch.client.Response; @@ -81,7 +83,7 @@ public void testBasicCorrelationEngineWorkflow() throws IOException, Interrupted String finding = ((List>) getFindingsBody.get("findings")).get(0).get("id").toString(); List> correlatedFindings = searchCorrelatedFindings(finding, "test_windows", 300000L, 10); - Assert.assertEquals(2, correlatedFindings.size()); + Assert.assertEquals(1, correlatedFindings.size()); Assert.assertTrue(correlatedFindings.get(0).get("rules") instanceof List); for (var correlatedFinding: correlatedFindings) { @@ -128,6 +130,282 @@ public void testListCorrelationsWorkflow() throws IOException, InterruptedExcept Assert.assertEquals(1, results.size()); } + public void testBasicCorrelationEngineWorkflowWithFieldBasedRules() throws IOException, InterruptedException { + Long startTime = System.currentTimeMillis(); + String index = createTestIndex("cloudtrail", cloudtrailMappings()); + // Execute CreateMappingsAction to add alias mapping for index + Request createMappingRequest = new Request("POST", SecurityAnalyticsPlugin.MAPPER_BASE_URI); + // both req params and req body are supported + createMappingRequest.setJsonEntity( + "{\n" + + " \"index_name\": \"" + index + "\",\n" + + " \"rule_topic\": \"cloudtrail\",\n" + + " \"partial\": true,\n" + + " \"alias_mappings\": {\n" + + " \"properties\": {\n" + + " \"aws.cloudtrail.event_name\": {\n" + + " \"path\": \"Records.eventName\",\n" + + " \"type\": \"alias\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}" + ); + + Response response = client().performRequest(createMappingRequest); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + + String rule1 = randomRuleForCorrelations("CreateUser"); + Response createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.RULE_BASE_URI, Collections.singletonMap("category", "cloudtrail"), + new StringEntity(rule1), new BasicHeader("Content-Type", "application/json")); + Assert.assertEquals("Create rule failed", RestStatus.CREATED, restStatus(createResponse)); + Map responseBody = asMap(createResponse); + String createdId1 = responseBody.get("_id").toString(); + + String rule2 = randomRuleForCorrelations("DeleteUser"); + createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.RULE_BASE_URI, Collections.singletonMap("category", "cloudtrail"), + new StringEntity(rule2), new BasicHeader("Content-Type", "application/json")); + Assert.assertEquals("Create rule failed", RestStatus.CREATED, restStatus(createResponse)); + responseBody = asMap(createResponse); + String createdId2 = responseBody.get("_id").toString(); + + createCloudtrailFieldBasedRule(index, "requestParameters.userName", null); + + Detector cloudtrailDetector = randomDetectorWithInputsAndTriggersAndType(List.of(new DetectorInput("cloudtrail detector for security analytics", List.of(index), + List.of(new DetectorRule(createdId1), new DetectorRule(createdId2)), + List.of())), + List.of(new DetectorTrigger(null, "test-trigger", "1", List.of("cloudtrail"), List.of(), List.of(), List.of(), List.of())), "cloudtrail"); + + createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.DETECTOR_BASE_URI, Collections.emptyMap(), toHttpEntity(cloudtrailDetector)); + Assert.assertEquals("Create detector failed", RestStatus.CREATED, restStatus(createResponse)); + + responseBody = asMap(createResponse); + + String createdId = responseBody.get("_id").toString(); + + String request = "{\n" + + " \"query\" : {\n" + + " \"match\":{\n" + + " \"_id\": \"" + createdId + "\"\n" + + " }\n" + + " }\n" + + "}"; + List hits = executeSearch(Detector.DETECTORS_INDEX, request); + SearchHit hit = hits.get(0); + + String monitorId = ((List) ((Map) hit.getSourceAsMap().get("detector")).get("monitor_id")).get(0); + + indexDoc(index, "1", randomCloudtrailDoc("Richard", "CreateUser")); + executeAlertingMonitor(monitorId, Collections.emptyMap()); + Thread.sleep(1000); + indexDoc(index, "4", randomCloudtrailDoc("deysubho", "CreateUser")); + executeAlertingMonitor(monitorId, Collections.emptyMap()); + Thread.sleep(1000); + + indexDoc(index, "2", randomCloudtrailDoc("Richard", "DeleteUser")); + executeAlertingMonitor(monitorId, Collections.emptyMap()); + + // Call GetFindings API + Map params = new HashMap<>(); + params.put("detectorType", "cloudtrail"); + Response getFindingsResponse = makeRequest(client(), "GET", SecurityAnalyticsPlugin.FINDINGS_BASE_URI + "/_search", params, null); + Map getFindingsBody = entityAsMap(getFindingsResponse); + + Thread.sleep(5000); + + int count = 0; + while (true) { + try { + Long endTime = System.currentTimeMillis(); + Request restRequest = new Request("GET", "/_plugins/_security_analytics/correlations?start_timestamp=" + startTime + "&end_timestamp=" + endTime); + response = client().performRequest(restRequest); + + Map responseMap = entityAsMap(response); + List results = (List) responseMap.get("findings"); + if (results.size() == 1) { + Assert.assertTrue(true); + break; + } + } catch (Exception ex) { + // suppress ex + } + ++count; + Thread.sleep(5000); + if (count >= 12) { + Assert.assertTrue(false); + break; + } + } + } + + public void testBasicCorrelationEngineWorkflowWithFieldBasedRulesOnMultipleLogTypes() throws IOException, InterruptedException { + + LogIndices indices = new LogIndices(); + indices.windowsIndex = createTestIndex(randomIndex(), windowsIndexMapping()); + indices.vpcFlowsIndex = createTestIndex("vpc_flow", vpcFlowMappings()); + + String vpcFlowMonitorId = createVpcFlowDetector(indices.vpcFlowsIndex); + String testWindowsMonitorId = createTestWindowsDetector(indices.windowsIndex); + + String ruleId = createNetworkToWindowsFieldBasedRule(indices); + + indexDoc(indices.windowsIndex, "2", randomDoc()); + Response executeResponse = executeAlertingMonitor(testWindowsMonitorId, Collections.emptyMap()); + Map executeResults = entityAsMap(executeResponse); + int noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(5, noOfSigmaRuleMatches); + + indexDoc(indices.vpcFlowsIndex, "1", randomVpcFlowDoc()); + executeResponse = executeAlertingMonitor(vpcFlowMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(1, noOfSigmaRuleMatches); + Thread.sleep(5000); + + // Call GetFindings API + Map params = new HashMap<>(); + params.put("detectorType", "test_windows"); + Response getFindingsResponse = makeRequest(client(), "GET", SecurityAnalyticsPlugin.FINDINGS_BASE_URI + "/_search", params, null); + Map getFindingsBody = entityAsMap(getFindingsResponse); + String finding = ((List>) getFindingsBody.get("findings")).get(0).get("id").toString(); + + int count = 0; + while (true) { + try { + List> correlatedFindings = searchCorrelatedFindings(finding, "test_windows", 300000L, 10); + if (correlatedFindings.size() == 1) { + Assert.assertTrue(true); + + Assert.assertTrue(correlatedFindings.get(0).get("rules") instanceof List); + + for (var correlatedFinding: correlatedFindings) { + if (correlatedFinding.get("detector_type").equals("network")) { + Assert.assertEquals(1, ((List) correlatedFinding.get("rules")).size()); + Assert.assertTrue(((List) correlatedFinding.get("rules")).contains(ruleId)); + } + } + break; + } + } catch (Exception ex) { + // suppress ex + } + ++count; + Thread.sleep(5000); + if (count >= 12) { + Assert.assertTrue(false); + break; + } + } + } + + public void testBasicCorrelationEngineWorkflowWithFieldBasedRulesAndDynamicTimeWindow() throws IOException, InterruptedException { + Long startTime = System.currentTimeMillis(); + String index = createTestIndex("cloudtrail", cloudtrailMappings()); + // Execute CreateMappingsAction to add alias mapping for index + Request createMappingRequest = new Request("POST", SecurityAnalyticsPlugin.MAPPER_BASE_URI); + // both req params and req body are supported + createMappingRequest.setJsonEntity( + "{\n" + + " \"index_name\": \"" + index + "\",\n" + + " \"rule_topic\": \"cloudtrail\",\n" + + " \"partial\": true,\n" + + " \"alias_mappings\": {\n" + + " \"properties\": {\n" + + " \"aws.cloudtrail.event_name\": {\n" + + " \"path\": \"Records.eventName\",\n" + + " \"type\": \"alias\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}" + ); + + Response response = client().performRequest(createMappingRequest); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + + String rule1 = randomRuleForCorrelations("CreateUser"); + Response createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.RULE_BASE_URI, Collections.singletonMap("category", "cloudtrail"), + new StringEntity(rule1), new BasicHeader("Content-Type", "application/json")); + Assert.assertEquals("Create rule failed", RestStatus.CREATED, restStatus(createResponse)); + Map responseBody = asMap(createResponse); + String createdId1 = responseBody.get("_id").toString(); + + String rule2 = randomRuleForCorrelations("DeleteUser"); + createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.RULE_BASE_URI, Collections.singletonMap("category", "cloudtrail"), + new StringEntity(rule2), new BasicHeader("Content-Type", "application/json")); + Assert.assertEquals("Create rule failed", RestStatus.CREATED, restStatus(createResponse)); + responseBody = asMap(createResponse); + String createdId2 = responseBody.get("_id").toString(); + + createCloudtrailFieldBasedRule(index, "requestParameters.userName", 5000L); + + Detector cloudtrailDetector = randomDetectorWithInputsAndTriggersAndType(List.of(new DetectorInput("cloudtrail detector for security analytics", List.of(index), + List.of(new DetectorRule(createdId1), new DetectorRule(createdId2)), + List.of())), + List.of(new DetectorTrigger(null, "test-trigger", "1", List.of("cloudtrail"), List.of(), List.of(), List.of(), List.of())), "cloudtrail"); + + createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.DETECTOR_BASE_URI, Collections.emptyMap(), toHttpEntity(cloudtrailDetector)); + Assert.assertEquals("Create detector failed", RestStatus.CREATED, restStatus(createResponse)); + + responseBody = asMap(createResponse); + + String createdId = responseBody.get("_id").toString(); + + String request = "{\n" + + " \"query\" : {\n" + + " \"match\":{\n" + + " \"_id\": \"" + createdId + "\"\n" + + " }\n" + + " }\n" + + "}"; + List hits = executeSearch(Detector.DETECTORS_INDEX, request); + SearchHit hit = hits.get(0); + + String monitorId = ((List) ((Map) hit.getSourceAsMap().get("detector")).get("monitor_id")).get(0); + + indexDoc(index, "1", randomCloudtrailDoc("Richard", "CreateUser")); + executeAlertingMonitor(monitorId, Collections.emptyMap()); + Thread.sleep(30000); + indexDoc(index, "4", randomCloudtrailDoc("deysubho", "CreateUser")); + executeAlertingMonitor(monitorId, Collections.emptyMap()); + Thread.sleep(1000); + + indexDoc(index, "2", randomCloudtrailDoc("Richard", "DeleteUser")); + executeAlertingMonitor(monitorId, Collections.emptyMap()); + + // Call GetFindings API + Map params = new HashMap<>(); + params.put("detectorType", "cloudtrail"); + Response getFindingsResponse = makeRequest(client(), "GET", SecurityAnalyticsPlugin.FINDINGS_BASE_URI + "/_search", params, null); + Map getFindingsBody = entityAsMap(getFindingsResponse); + + Thread.sleep(5000); + + int count = 0; + while (true) { + try { + Long endTime = System.currentTimeMillis(); + Request restRequest = new Request("GET", "/_plugins/_security_analytics/correlations?start_timestamp=" + startTime + "&end_timestamp=" + endTime); + response = client().performRequest(restRequest); + + Map responseMap = entityAsMap(response); + List results = (List) responseMap.get("findings"); + if (results.size() == 1) { + Assert.assertTrue(true); + break; + } + } catch (Exception ex) { + // suppress ex + } + ++count; + Thread.sleep(5000); + if (count >= 2) { + break; + } + } + Assert.assertEquals(2, count); + } + private LogIndices createIndices() throws IOException { LogIndices indices = new LogIndices(); indices.adLdapLogsIndex = createTestIndex("ad_logs", adLdapLogMappings()); @@ -138,12 +416,25 @@ private LogIndices createIndices() throws IOException { return indices; } + private String createNetworkToWindowsFieldBasedRule(LogIndices indices) throws IOException { + CorrelationQuery query1 = new CorrelationQuery(indices.vpcFlowsIndex, null, "network", "srcaddr"); + CorrelationQuery query4 = new CorrelationQuery(indices.windowsIndex, null, "test_windows", "SourceIp"); + + CorrelationRule rule = new CorrelationRule(CorrelationRule.NO_ID, CorrelationRule.NO_VERSION, "network to windows", List.of(query1, query4), 300000L); + Request request = new Request("POST", "/_plugins/_security_analytics/correlation/rules"); + request.setJsonEntity(toJsonString(rule)); + Response response = client().performRequest(request); + + Assert.assertEquals(201, response.getStatusLine().getStatusCode()); + return entityAsMap(response).get("_id").toString(); + } + private String createNetworkToAdLdapToWindowsRule(LogIndices indices) throws IOException { - CorrelationQuery query1 = new CorrelationQuery(indices.vpcFlowsIndex, "dstaddr:4.5.6.7", "network"); - CorrelationQuery query2 = new CorrelationQuery(indices.adLdapLogsIndex, "ResultType:50126", "ad_ldap"); - CorrelationQuery query4 = new CorrelationQuery(indices.windowsIndex, "Domain:NTAUTHORI*", "test_windows"); + CorrelationQuery query1 = new CorrelationQuery(indices.vpcFlowsIndex, "dstaddr:4.5.6.7", "network", null); + CorrelationQuery query2 = new CorrelationQuery(indices.adLdapLogsIndex, "ResultType:50126", "ad_ldap", null); + CorrelationQuery query4 = new CorrelationQuery(indices.windowsIndex, "Domain:NTAUTHORI*", "test_windows", null); - CorrelationRule rule = new CorrelationRule(CorrelationRule.NO_ID, CorrelationRule.NO_VERSION, "network to ad_ldap to windows", List.of(query1, query2, query4)); + CorrelationRule rule = new CorrelationRule(CorrelationRule.NO_ID, CorrelationRule.NO_VERSION, "network to ad_ldap to windows", List.of(query1, query2, query4), 300000L); Request request = new Request("POST", "/_plugins/_security_analytics/correlation/rules"); request.setJsonEntity(toJsonString(rule)); Response response = client().performRequest(request); @@ -153,11 +444,11 @@ private String createNetworkToAdLdapToWindowsRule(LogIndices indices) throws IOE } private String createWindowsToAppLogsToS3LogsRule(LogIndices indices) throws IOException { - CorrelationQuery query1 = new CorrelationQuery(indices.windowsIndex, "HostName:EC2AMAZ*", "test_windows"); - CorrelationQuery query2 = new CorrelationQuery(indices.appLogsIndex, "endpoint:\\/customer_records.txt", "others_application"); - CorrelationQuery query4 = new CorrelationQuery(indices.s3AccessLogsIndex, "aws.cloudtrail.eventName:ReplicateObject", "s3"); + CorrelationQuery query1 = new CorrelationQuery(indices.windowsIndex, "HostName:EC2AMAZ*", "test_windows", null); + CorrelationQuery query2 = new CorrelationQuery(indices.appLogsIndex, "endpoint:\\/customer_records.txt", "others_application", null); + CorrelationQuery query4 = new CorrelationQuery(indices.s3AccessLogsIndex, "aws.cloudtrail.eventName:ReplicateObject", "s3", null); - CorrelationRule rule = new CorrelationRule(CorrelationRule.NO_ID, CorrelationRule.NO_VERSION, "windows to app_logs to s3 logs", List.of(query1, query2, query4)); + CorrelationRule rule = new CorrelationRule(CorrelationRule.NO_ID, CorrelationRule.NO_VERSION, "windows to app_logs to s3 logs", List.of(query1, query2, query4), 300000L); Request request = new Request("POST", "/_plugins/_security_analytics/correlation/rules"); request.setJsonEntity(toJsonString(rule)); Response response = client().performRequest(request); @@ -166,6 +457,18 @@ private String createWindowsToAppLogsToS3LogsRule(LogIndices indices) throws IOE return entityAsMap(response).get("_id").toString(); } + private String createCloudtrailFieldBasedRule(String index, String field, Long timeWindow) throws IOException { + CorrelationQuery query1 = new CorrelationQuery(index, "EventName:CreateUser", "cloudtrail", field); + CorrelationQuery query2 = new CorrelationQuery(index, "EventName:DeleteUser", "cloudtrail", field); + + CorrelationRule rule = new CorrelationRule(CorrelationRule.NO_ID, CorrelationRule.NO_VERSION, "cloudtrail field based", List.of(query1, query2), timeWindow); + Request request = new Request("POST", "/_plugins/_security_analytics/correlation/rules"); + request.setJsonEntity(toJsonString(rule)); + Response response = client().performRequest(request); + Assert.assertEquals(201, response.getStatusLine().getStatusCode()); + return entityAsMap(response).get("_id").toString(); + } + @SuppressWarnings("unchecked") private String createVpcFlowDetector(String indexName) throws IOException { Detector vpcFlowDetector = randomDetectorWithInputsAndTriggersAndType(List.of(new DetectorInput("vpc flow detector for security analytics", List.of(indexName), List.of(),