Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
stop historical detector; support return AD task in get detector API (#…
Browse files Browse the repository at this point in the history
…359)

* stop historical detector; support return AD task in get detector API

* change to scaling executor

* remove detector cache; handle node crash

* add more comments/logs; tune method name
  • Loading branch information
ylwu-amzn authored Jan 20, 2021
1 parent 19b8f9d commit 74230c5
Show file tree
Hide file tree
Showing 67 changed files with 3,766 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -61,7 +62,7 @@
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -115,10 +116,14 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADCancelTaskAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADCancelTaskTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADTaskProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADTaskProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
Expand All @@ -133,6 +138,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.EntityProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.EntityResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.EntityResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ForwardADTaskAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ForwardADTaskTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
Expand Down Expand Up @@ -508,7 +515,16 @@ public Collection<Object> createComponents(
);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
adTaskManager = new ADTaskManager(settings, clusterService, client, xContentRegistry, anomalyDetectionIndices);
adTaskManager = new ADTaskManager(
settings,
clusterService,
client,
xContentRegistry,
anomalyDetectionIndices,
nodeFilter,
hashRing,
adTaskCacheManager
);
AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler = new AnomalyResultBulkIndexHandler(
client,
settings,
Expand Down Expand Up @@ -579,18 +595,18 @@ protected Clock getClock() {
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return ImmutableList
.of(
new FixedExecutorBuilder(
settings,
new ScalingExecutorBuilder(
AD_THREAD_POOL_NAME,
1,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 4),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
TimeValue.timeValueMinutes(10),
AD_THREAD_POOL_PREFIX + AD_THREAD_POOL_NAME
),
new FixedExecutorBuilder(
settings,
new ScalingExecutorBuilder(
AD_BATCH_TASK_THREAD_POOL_NAME,
1,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 8),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
TimeValue.timeValueMinutes(10),
AD_THREAD_POOL_PREFIX + AD_BATCH_TASK_THREAD_POOL_NAME
)
);
Expand Down Expand Up @@ -671,7 +687,10 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADBatchAnomalyResultAction.INSTANCE, ADBatchAnomalyResultTransportAction.class),
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class)
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class),
new ActionHandler<>(ADTaskProfileAction.INSTANCE, ADTaskProfileTransportAction.class),
new ActionHandler<>(ADCancelTaskAction.INSTANCE, ADCancelTaskTransportAction.class),
new ActionHandler<>(ForwardADTaskAction.INSTANCE, ForwardADTaskTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalCardinality;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.transport.TransportService;

import com.amazon.opendistroforelasticsearch.ad.common.exception.ResourceNotFoundException;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
Expand All @@ -62,6 +63,7 @@
import com.amazon.opendistroforelasticsearch.ad.model.DetectorState;
import com.amazon.opendistroforelasticsearch.ad.model.InitProgressProfile;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileResponse;
Expand All @@ -77,12 +79,16 @@ public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
private Client client;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
private final TransportService transportService;
private final ADTaskManager adTaskManager;

public AnomalyDetectorProfileRunner(
Client client,
NamedXContentRegistry xContentRegistry,
DiscoveryNodeFilterer nodeFilter,
long requiredSamples
long requiredSamples,
TransportService transportService,
ADTaskManager adTaskManager
) {
super(requiredSamples);
this.client = client;
Expand All @@ -91,6 +97,8 @@ public AnomalyDetectorProfileRunner(
if (requiredSamples <= 0) {
throw new IllegalArgumentException("required samples should be a positive number, but was " + requiredSamples);
}
this.transportService = transportService;
this.adTaskManager = adTaskManager;
}

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profilesToCollect) {
Expand All @@ -117,7 +125,10 @@ private void calculateTotalResponsesToWait(
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, xContentParser.nextToken(), xContentParser);
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);

if (!detector.isRealTimeDetector() && profilesToCollect.contains(DetectorProfileName.AD_TASK)) {
adTaskManager.getLatestADTaskProfile(detectorId, transportService, listener);
return;
}
prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public long estimateModelSize(AnomalyDetector detector, int numberOfTrees) {
* @param numSamples The number of samples in RCF
* @return estimated model size in bytes
*/
private long estimateModelSize(int dimension, int numberOfTrees, int numSamples) {
public long estimateModelSize(int dimension, int numberOfTrees, int numSamples) {
long totalSamples = (long) numberOfTrees * (long) numSamples;
long rcfSize = totalSamples * (40 * dimension + 132);
long samplerSize = totalSamples * 36;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.common.exception;

public class DuplicateTaskException extends AnomalyDetectionException {

public DuplicateTaskException(String msg) {
super(msg);
this.countedInStats(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ public class CommonErrorMessages {
public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than ";
public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid";
public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for ";
public static String DETECTOR_IS_RUNNING = "Detector is already running";
public static String DETECTOR_MISSING = "Detector is missing";
public static String AD_TASK_ACTION_MISSING = "AD task action is missing";
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public class CommonName {
public static final String ACTIVE_ENTITIES = "active_entities";
public static final String ENTITY_INFO = "entity_info";
public static final String TOTAL_UPDATES = "total_updates";
public static final String AD_TASK = "ad_task";
public static final String AD_TASK_REMOTE = "ad_task_remote";
public static final String CANCEL_TASK = "cancel_task";

// ======================================
// Index mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class ADTask implements ToXContentObject, Writeable {
public static final String IS_LATEST_FIELD = "is_latest";
public static final String TASK_TYPE_FIELD = "task_type";
public static final String CHECKPOINT_ID_FIELD = "checkpoint_id";
public static final String COORDINATING_NODE_FIELD = "coordinating_node";
public static final String WORKER_NODE_FIELD = "worker_node";
public static final String DETECTOR_FIELD = "detector";

private String taskId = null;
Expand All @@ -70,6 +72,9 @@ public class ADTask implements ToXContentObject, Writeable {
private String checkpointId = null;
private AnomalyDetector detector = null;

private String coordinatingNode = null;
private String workerNode = null;

private ADTask() {}

public ADTask(StreamInput input) throws IOException {
Expand All @@ -93,6 +98,8 @@ public ADTask(StreamInput input) throws IOException {
this.lastUpdateTime = input.readOptionalInstant();
this.startedBy = input.readOptionalString();
this.stoppedBy = input.readOptionalString();
this.coordinatingNode = input.readOptionalString();
this.workerNode = input.readOptionalString();
}

@Override
Expand All @@ -118,6 +125,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalInstant(lastUpdateTime);
out.writeOptionalString(startedBy);
out.writeOptionalString(stoppedBy);
out.writeOptionalString(coordinatingNode);
out.writeOptionalString(workerNode);
}

public static Builder builder() {
Expand All @@ -141,6 +150,8 @@ public static class Builder {
private Instant lastUpdateTime = null;
private String startedBy = null;
private String stoppedBy = null;
private String coordinatingNode = null;
private String workerNode = null;

public Builder() {}

Expand Down Expand Up @@ -224,6 +235,16 @@ public Builder detector(AnomalyDetector detector) {
return this;
}

public Builder coordinatingNode(String coordinatingNode) {
this.coordinatingNode = coordinatingNode;
return this;
}

public Builder workerNode(String workerNode) {
this.workerNode = workerNode;
return this;
}

public ADTask build() {
ADTask adTask = new ADTask();
adTask.taskId = this.taskId;
Expand All @@ -242,6 +263,8 @@ public ADTask build() {
adTask.detector = this.detector;
adTask.startedBy = this.startedBy;
adTask.stoppedBy = this.stoppedBy;
adTask.coordinatingNode = this.coordinatingNode;
adTask.workerNode = this.workerNode;

return adTask;
}
Expand Down Expand Up @@ -296,6 +319,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (checkpointId != null) {
xContentBuilder.field(CHECKPOINT_ID_FIELD, checkpointId);
}
if (coordinatingNode != null) {
xContentBuilder.field(COORDINATING_NODE_FIELD, coordinatingNode);
}
if (workerNode != null) {
xContentBuilder.field(WORKER_NODE_FIELD, workerNode);
}
if (detector != null) {
xContentBuilder.field(DETECTOR_FIELD, detector);
}
Expand All @@ -322,6 +351,9 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
String taskType = null;
String checkpointId = null;
AnomalyDetector detector = null;
String parsedTaskId = taskId;
String coordinatingNode = null;
String workerNode = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -374,13 +406,44 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
case DETECTOR_FIELD:
detector = AnomalyDetector.parse(parser);
break;
case TASK_ID_FIELD:
parsedTaskId = parser.text();
break;
case COORDINATING_NODE_FIELD:
coordinatingNode = parser.text();
break;
case WORKER_NODE_FIELD:
workerNode = parser.text();
break;
default:
parser.skipChildren();
break;
}
}
AnomalyDetector anomalyDetector = detector == null
? null
: new AnomalyDetector(
detectorId,
detector.getVersion(),
detector.getName(),
detector.getDescription(),
detector.getTimeField(),
detector.getIndices(),
detector.getFeatureAttributes(),
detector.getFilterQuery(),
detector.getDetectionInterval(),
detector.getWindowDelay(),
detector.getShingleSize(),
detector.getUiMetadata(),
detector.getSchemaVersion(),
detector.getLastUpdateTime(),
detector.getCategoryField(),
detector.getUser(),
detector.getDetectorType(),
detector.getDetectionDateRange()
);
return new Builder()
.taskId(taskId)
.taskId(parsedTaskId)
.lastUpdateTime(lastUpdateTime)
.startedBy(startedBy)
.stoppedBy(stoppedBy)
Expand All @@ -395,7 +458,9 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
.isLatest(isLatest)
.taskType(taskType)
.checkpointId(checkpointId)
.detector(detector)
.coordinatingNode(coordinatingNode)
.workerNode(workerNode)
.detector(anomalyDetector)
.build();
}

Expand All @@ -422,6 +487,8 @@ public boolean equals(Object o) {
&& Objects.equal(getLatest(), that.getLatest())
&& Objects.equal(getTaskType(), that.getTaskType())
&& Objects.equal(getCheckpointId(), that.getCheckpointId())
&& Objects.equal(getCoordinatingNode(), that.getCoordinatingNode())
&& Objects.equal(getWorkerNode(), that.getWorkerNode())
&& Objects.equal(getDetector(), that.getDetector());
}

Expand All @@ -445,6 +512,8 @@ public int hashCode() {
isLatest,
taskType,
checkpointId,
coordinatingNode,
workerNode,
detector
);
}
Expand Down Expand Up @@ -477,6 +546,10 @@ public String getState() {
return state;
}

public void setState(String state) {
this.state = state;
}

public String getDetectorId() {
return detectorId;
}
Expand Down Expand Up @@ -516,4 +589,13 @@ public String getCheckpointId() {
public AnomalyDetector getDetector() {
return detector;
}

public String getCoordinatingNode() {
return coordinatingNode;
}

public String getWorkerNode() {
return workerNode;
}

}
Loading

0 comments on commit 74230c5

Please sign in to comment.