Skip to content

Commit

Permalink
Allow detectors to be stopped if underlying workflow is deleted. Don'…
Browse files Browse the repository at this point in the history
…t allow them to then be started/editted (#810)

* Allow detectors to be stopped if underlying workflow is deleted. Don't allow them to then be started

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add copyright headers

Signed-off-by: Chase Engelbrecht <[email protected]>

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Apr 3, 2024
1 parent 68f37d3 commit 3547aa4
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ public void setAlertsHistoryIndexPattern(String alertsHistoryIndexPattern) {
this.alertsHistoryIndexPattern = alertsHistoryIndexPattern;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

Check warning on line 577 in src/main/java/org/opensearch/securityanalytics/model/Detector.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/model/Detector.java#L576-L577

Added lines #L576 - L577 were not covered by tests

public void setEnabledTime(Instant enabledTime) {
this.enabledTime = enabledTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.ExceptionChecker;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -61,6 +63,11 @@
public class TransportDeleteDetectorAction extends HandledTransportAction<DeleteDetectorRequest, DeleteDetectorResponse> {

private static final Logger log = LogManager.getLogger(TransportDeleteDetectorAction.class);
private static final List<ThrowableCheckingPredicates> ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS = List.of(

Check warning on line 66 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L66

Added line #L66 was not covered by tests
ThrowableCheckingPredicates.MONITOR_NOT_FOUND,
ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND,
ThrowableCheckingPredicates.ALERTING_CONFIG_INDEX_NOT_FOUND
);

private final Client client;

Expand All @@ -83,9 +90,13 @@ public class TransportDeleteDetectorAction extends HandledTransportAction<Delete

private final DetectorIndices detectorIndices;

private final ExceptionChecker exceptionChecker;

@Inject
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices, ClusterService clusterService,
Settings settings) {
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client,
ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices,
DetectorIndices detectorIndices, ClusterService clusterService, Settings settings,
ExceptionChecker exceptionChecker) {
super(DeleteDetectorAction.NAME, transportService, actionFilters, DeleteDetectorRequest::new);
this.client = client;
this.ruleTopicIndices = ruleTopicIndices;
Expand All @@ -100,6 +111,7 @@ public TransportDeleteDetectorAction(TransportService transportService, IndexTem

this.enabledWorkflowUsage = SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE.get(this.settings);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
this.exceptionChecker = exceptionChecker;

Check warning on line 114 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L114

Added line #L114 was not covered by tests
}

@Override
Expand Down Expand Up @@ -204,7 +216,8 @@ public void onResponse(Collection<DeleteMonitorResponse> responses) {

@Override
public void onFailure(Exception e) {
if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) {
logAcceptableEntityMissingException(e, detector.getId());

Check warning on line 220 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L220

Added line #L220 was not covered by tests
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
} else {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e);
Expand Down Expand Up @@ -243,7 +256,8 @@ private void deleteWorkflow(Detector detector, ActionListener<AcknowledgedRespon

private void handleDeleteWorkflowFailure(final String detectorId, final Exception deleteWorkflowException,
final ActionListener<AcknowledgedResponse> actionListener) {
if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(deleteWorkflowException, detectorId)) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(deleteWorkflowException, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) {
logAcceptableEntityMissingException(deleteWorkflowException, detectorId);

Check warning on line 260 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L260

Added line #L260 was not covered by tests
actionListener.onResponse(new AcknowledgedResponse(true));
} else {
actionListener.onFailure(deleteWorkflowException);
Expand Down Expand Up @@ -305,39 +319,12 @@ private void finishHim(String detectorId, Exception t) {
}
}));
}

private boolean isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(
Exception ex,
String detectorId
) {
// grouped action listener listens on mutliple listeners but throws only one exception. If multiple
// listeners fail the other exceptions are added as suppressed exceptions to the first failure.
int len = ex.getSuppressed().length;
for (int i = 0; i <= len; i++) {
Throwable e = i == len ? ex : ex.getSuppressed()[i];
if (isMonitorNotFoundException(e) || isWorkflowNotFoundException(e) || isAlertingConfigIndexNotFoundException(e)) {
log.error(
String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." +
" Proceeding with detector %s deletion", detectorId),
e);
} else {
return false;
}
}
return true;
}
}

private boolean isMonitorNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)");
}

private boolean isWorkflowNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Workflow(.*) not found(.*)");
}

private boolean isAlertingConfigIndexNotFoundException(final Throwable e) {
return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]");
private void logAcceptableEntityMissingException(final Exception e, final String detectorId) {
final String errorMsg = String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." +

Check warning on line 325 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L325

Added line #L325 was not covered by tests
" Proceeding with detector %s deletion", detectorId);
log.error(errorMsg, e);

Check warning on line 327 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L327

Added line #L327 was not covered by tests
}

private void setEnabledWorkflowUsage(boolean enabledWorkflowUsage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.DetectorUtils;
import org.opensearch.securityanalytics.util.ExceptionChecker;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -156,6 +158,8 @@ public class TransportIndexDetectorAction extends HandledTransportAction<IndexDe
private final MonitorService monitorService;
private final IndexNameExpressionResolver indexNameExpressionResolver;

private final ExceptionChecker exceptionChecker;

private final TimeValue indexTimeout;
@Inject
public TransportIndexDetectorAction(TransportService transportService,
Expand All @@ -170,7 +174,8 @@ public TransportIndexDetectorAction(TransportService transportService,
Settings settings,
NamedWriteableRegistry namedWriteableRegistry,
LogTypeService logTypeService,
IndexNameExpressionResolver indexNameExpressionResolver) {
IndexNameExpressionResolver indexNameExpressionResolver,
ExceptionChecker exceptionChecker) {
super(IndexDetectorAction.NAME, transportService, actionFilters, IndexDetectorRequest::new);
this.client = client;
this.xContentRegistry = xContentRegistry;
Expand All @@ -192,6 +197,7 @@ public TransportIndexDetectorAction(TransportService transportService,

this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.FILTER_BY_BACKEND_ROLES, this::setFilterByEnabled);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
this.exceptionChecker = exceptionChecker;

Check warning on line 200 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L200

Added line #L200 was not covered by tests
}

@Override
Expand Down Expand Up @@ -617,8 +623,7 @@ public void onResponse(IndexWorkflowResponse workflowResponse) {
}
@Override
public void onFailure(Exception e) {
log.error("Failed to update the workflow");
listener.onFailure(e);
handleUpsertWorkflowFailure(e, listener, detector, monitorsToBeDeleted, refreshPolicy, updatedMonitors);

Check warning on line 626 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L626

Added line #L626 was not covered by tests
}
});
}
Expand Down Expand Up @@ -675,6 +680,25 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List<Pair<String, Rule>
return new IndexMonitorRequest(monitorId, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, refreshPolicy, restMethod, monitor, null);
}

private void handleUpsertWorkflowFailure(final Exception e, final ActionListener<List<IndexMonitorResponse>> listener,
final Detector detector, final List<String> monitorsToBeDeleted,
final RefreshPolicy refreshPolicy, final List<IndexMonitorResponse> updatedMonitors) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND))) {
if (detector.getEnabled()) {
final String errorMessage = String.format("Underlying workflow associated with detector %s not found. " +
"Delete and recreate the detector to restore functionality.", detector.getName());
log.error(errorMessage);
listener.onFailure(new SecurityAnalyticsException(errorMessage, RestStatus.BAD_REQUEST, e));
} else {
log.error("Underlying workflow associated with detector {} not found. Proceeding to disable detector.", detector.getName());
deleteMonitorStep(monitorsToBeDeleted, refreshPolicy, updatedMonitors, listener);

Check warning on line 694 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L688-L694

Added lines #L688 - L694 were not covered by tests
}
} else {
log.error("Failed to update the workflow");
listener.onFailure(e);

Check warning on line 698 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L697-L698

Added lines #L697 - L698 were not covered by tests
}
}

Check warning on line 700 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L700

Added line #L700 was not covered by tests

/**
* Creates doc level monitor which generates per document alerts for the findings of the bucket level delegate monitors in a workflow.
* This monitor has match all query applied to generate the alerts per each finding doc.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class ExceptionChecker {

public boolean doesGroupedActionListenerExceptionMatch(final Exception ex, final List<ThrowableCheckingPredicates> exceptionMatchers) {
// grouped action listener listens on multiple listeners but throws only one exception. If multiple
// listeners fail the other exceptions are added as suppressed exceptions to the first failure.
return Stream.concat(Arrays.stream(ex.getSuppressed()), Stream.of(ex))
.allMatch(throwable -> doesExceptionMatch(throwable, exceptionMatchers));
}

private boolean doesExceptionMatch(final Throwable throwable, final List<ThrowableCheckingPredicates> exceptionMatchers) {
return exceptionMatchers.stream()
.map(ThrowableCheckingPredicates::getMatcherPredicate)
.anyMatch(matcher -> matcher.test(throwable));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import java.util.function.Predicate;

public enum ThrowableCheckingPredicates {
MONITOR_NOT_FOUND(ThrowableCheckingPredicates::isMonitorNotFoundException),
WORKFLOW_NOT_FOUND(ThrowableCheckingPredicates::isWorkflowNotFoundException),
ALERTING_CONFIG_INDEX_NOT_FOUND(ThrowableCheckingPredicates::isAlertingConfigIndexNotFoundException);

private final Predicate<Throwable> matcherPredicate;
ThrowableCheckingPredicates(final Predicate<Throwable> matcherPredicate) {
this.matcherPredicate = matcherPredicate;
}

public Predicate<Throwable> getMatcherPredicate() {
return this.matcherPredicate;
}

private static boolean isMonitorNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)");
}

public static boolean isWorkflowNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Workflow(.*) not found(.*)");
}

public static boolean isAlertingConfigIndexNotFoundException(final Throwable e) {
return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,50 @@ public void testUpdateADetectorWithIndexNotExists() throws IOException {
}
}

public void testDisableEnableADetectorWithWorkflowNotExists() throws IOException {
final String index = createTestIndex(randomIndex(), windowsIndexMapping());

// Execute CreateMappingsAction to add alias mapping for index
final Request createMappingRequest = new Request("POST", SecurityAnalyticsPlugin.MAPPER_BASE_URI);
// both req params and req body are supported
createMappingRequest.setJsonEntity(
"{ \"index_name\":\"" + index + "\"," +
" \"rule_topic\":\"" + randomDetectorType() + "\", " +
" \"partial\":true" +
"}"
);

final Response createMappingResponse = client().performRequest(createMappingRequest);
assertEquals(HttpStatus.SC_OK, createMappingResponse.getStatusLine().getStatusCode());

final Detector detector = randomDetector(getRandomPrePackagedRules());
final Response createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.DETECTOR_BASE_URI, Collections.emptyMap(), toHttpEntity(detector));
Assert.assertEquals("Create detector failed", RestStatus.CREATED, restStatus(createResponse));

final Map<String, Object> createResponseAsMap = asMap(createResponse);
final String detectorId = createResponseAsMap.get("_id").toString();

final Map<String, Object> detectorSourceAsMap = getDetectorSourceAsMap(detectorId);
final String workflowId = ((List<String>) detectorSourceAsMap.get("workflow_ids")).get(0);

final Response deleteWorkflowResponse = deleteAlertingWorkflow(workflowId);
assertEquals(200, deleteWorkflowResponse.getStatusLine().getStatusCode());
entityAsMap(deleteWorkflowResponse);

detector.setEnabled(false);
Response updateResponse = makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector));
Assert.assertEquals(200, updateResponse.getStatusLine().getStatusCode());

try {
detector.setEnabled(true);
makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector));
} catch (ResponseException ex) {
Assert.assertEquals(400, ex.getResponse().getStatusLine().getStatusCode());
Assert.assertEquals(true, ex.getMessage().contains(String.format("Underlying workflow associated with detector %s not found. " +
"Delete and recreate the detector to restore functionality.", detector.getName())));
}
}

@SuppressWarnings("unchecked")
public void testDeletingADetector_single_ruleTopicIndex() throws IOException {
String index = createTestIndex(randomIndex(), windowsIndexMapping());
Expand Down
Loading

0 comments on commit 3547aa4

Please sign in to comment.