Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
stop historical detector; support return AD task in get detector API
Browse files Browse the repository at this point in the history
  • Loading branch information
ylwu-amzn committed Jan 12, 2021
1 parent 19b8f9d commit 57ab219
Show file tree
Hide file tree
Showing 60 changed files with 3,225 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,14 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADCancelTaskAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADCancelTaskTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADTaskProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADTaskProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
Expand All @@ -133,6 +137,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.EntityProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.EntityResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.EntityResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ForwardADTaskAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ForwardADTaskTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
Expand Down Expand Up @@ -508,7 +514,16 @@ public Collection<Object> createComponents(
);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
adTaskManager = new ADTaskManager(settings, clusterService, client, xContentRegistry, anomalyDetectionIndices);
adTaskManager = new ADTaskManager(
settings,
clusterService,
client,
xContentRegistry,
anomalyDetectionIndices,
nodeFilter,
hashRing,
adTaskCacheManager
);
AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler = new AnomalyResultBulkIndexHandler(
client,
settings,
Expand Down Expand Up @@ -671,7 +686,10 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADBatchAnomalyResultAction.INSTANCE, ADBatchAnomalyResultTransportAction.class),
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class)
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class),
new ActionHandler<>(ADTaskProfileAction.INSTANCE, ADTaskProfileTransportAction.class),
new ActionHandler<>(ADCancelTaskAction.INSTANCE, ADCancelTaskTransportAction.class),
new ActionHandler<>(ForwardADTaskAction.INSTANCE, ForwardADTaskTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.amazon.opendistroforelasticsearch.ad.model.DetectorState;
import com.amazon.opendistroforelasticsearch.ad.model.InitProgressProfile;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileResponse;
Expand All @@ -77,12 +78,14 @@ public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
private Client client;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
private final ADTaskManager adTaskManager;

public AnomalyDetectorProfileRunner(
Client client,
NamedXContentRegistry xContentRegistry,
DiscoveryNodeFilterer nodeFilter,
long requiredSamples
long requiredSamples,
ADTaskManager adTaskManager
) {
super(requiredSamples);
this.client = client;
Expand All @@ -91,6 +94,7 @@ public AnomalyDetectorProfileRunner(
if (requiredSamples <= 0) {
throw new IllegalArgumentException("required samples should be a positive number, but was " + requiredSamples);
}
this.adTaskManager = adTaskManager;
}

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profilesToCollect) {
Expand All @@ -117,7 +121,10 @@ private void calculateTotalResponsesToWait(
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, xContentParser.nextToken(), xContentParser);
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);

if (!detector.isRealTimeDetector() && profilesToCollect.contains(DetectorProfileName.AD_TASK)) {
adTaskManager.getLatestADTaskProfile(detectorId, listener);
return;
}
prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.common.exception;

public class DuplicateTaskException extends AnomalyDetectionException {

public DuplicateTaskException(String msg) {
super(msg);
this.countedInStats(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ public class CommonErrorMessages {
public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than ";
public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid";
public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for ";
public static String DETECTOR_ALREADY_RUNNING = "Detector is already running";
public static String AD_TASK_ID_MISSING = "AD task ID is missing";
public static String DETECTOR_MISSING = "Detector is missing";
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class CommonName {
public static final String ACTIVE_ENTITIES = "active_entities";
public static final String ENTITY_INFO = "entity_info";
public static final String TOTAL_UPDATES = "total_updates";
public static final String AD_TASK = "ad_task";

// ======================================
// Index mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
String taskType = null;
String checkpointId = null;
AnomalyDetector detector = null;
String parsedTaskId = taskId;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -374,13 +375,16 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
case DETECTOR_FIELD:
detector = AnomalyDetector.parse(parser);
break;
case TASK_ID_FIELD:
parsedTaskId = parser.text();
break;
default:
parser.skipChildren();
break;
}
}
return new Builder()
.taskId(taskId)
.taskId(parsedTaskId)
.lastUpdateTime(lastUpdateTime)
.startedBy(startedBy)
.stoppedBy(stoppedBy)
Expand Down Expand Up @@ -477,6 +481,10 @@ public String getState() {
return state;
}

public void setState(String state) {
this.state = state;
}

public String getDetectorId() {
return detectorId;
}
Expand Down
Loading

0 comments on commit 57ab219

Please sign in to comment.