Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

stop historical detector; support return AD task in get detector API #359

Merged
merged 6 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -61,7 +62,7 @@
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -115,10 +116,14 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADCancelTaskAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADCancelTaskTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADTaskProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADTaskProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
Expand All @@ -133,6 +138,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.EntityProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.EntityResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.EntityResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ForwardADTaskAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ForwardADTaskTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
Expand Down Expand Up @@ -508,7 +515,16 @@ public Collection<Object> createComponents(
);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
adTaskManager = new ADTaskManager(settings, clusterService, client, xContentRegistry, anomalyDetectionIndices);
adTaskManager = new ADTaskManager(
settings,
clusterService,
client,
xContentRegistry,
anomalyDetectionIndices,
nodeFilter,
hashRing,
adTaskCacheManager
);
AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler = new AnomalyResultBulkIndexHandler(
client,
settings,
Expand Down Expand Up @@ -579,18 +595,18 @@ protected Clock getClock() {
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return ImmutableList
.of(
new FixedExecutorBuilder(
settings,
new ScalingExecutorBuilder(
AD_THREAD_POOL_NAME,
1,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 4),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
TimeValue.timeValueMinutes(10),
AD_THREAD_POOL_PREFIX + AD_THREAD_POOL_NAME
),
new FixedExecutorBuilder(
settings,
new ScalingExecutorBuilder(
AD_BATCH_TASK_THREAD_POOL_NAME,
1,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 8),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
TimeValue.timeValueMinutes(10),
AD_THREAD_POOL_PREFIX + AD_BATCH_TASK_THREAD_POOL_NAME
)
);
Expand Down Expand Up @@ -671,7 +687,10 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADBatchAnomalyResultAction.INSTANCE, ADBatchAnomalyResultTransportAction.class),
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class)
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class),
kaituo marked this conversation as resolved.
Show resolved Hide resolved
new ActionHandler<>(ADTaskProfileAction.INSTANCE, ADTaskProfileTransportAction.class),
new ActionHandler<>(ADCancelTaskAction.INSTANCE, ADCancelTaskTransportAction.class),
new ActionHandler<>(ForwardADTaskAction.INSTANCE, ForwardADTaskTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.amazon.opendistroforelasticsearch.ad.model.DetectorState;
import com.amazon.opendistroforelasticsearch.ad.model.InitProgressProfile;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileResponse;
Expand All @@ -77,12 +78,14 @@ public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
private Client client;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
private final ADTaskManager adTaskManager;

public AnomalyDetectorProfileRunner(
Client client,
NamedXContentRegistry xContentRegistry,
DiscoveryNodeFilterer nodeFilter,
long requiredSamples
long requiredSamples,
ADTaskManager adTaskManager
) {
super(requiredSamples);
this.client = client;
Expand All @@ -91,6 +94,7 @@ public AnomalyDetectorProfileRunner(
if (requiredSamples <= 0) {
throw new IllegalArgumentException("required samples should be a positive number, but was " + requiredSamples);
}
this.adTaskManager = adTaskManager;
}

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profilesToCollect) {
Expand All @@ -117,7 +121,10 @@ private void calculateTotalResponsesToWait(
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, xContentParser.nextToken(), xContentParser);
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);

if (!detector.isRealTimeDetector() && profilesToCollect.contains(DetectorProfileName.AD_TASK)) {
adTaskManager.getLatestADTaskProfile(detectorId, listener);
return;
}
prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021? Have you figured out how to configure spotless for the year in copyright header?

Copy link
Contributor Author

@ylwu-amzn ylwu-amzn Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check this comment #355 (comment) , paste the comment here

./gradlew spotlessApply will reset copyright year as 2020 as configured in file spotless.license.java.

To make this PR clean, will send out a separate PR to update this license file and apply to all files. Will replace 2020 with $YEAR, then spotless can fill as current year automatically.

*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.common.exception;

public class DuplicateTaskException extends AnomalyDetectionException {

public DuplicateTaskException(String msg) {
super(msg);
this.countedInStats(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ public class CommonErrorMessages {
public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than ";
public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid";
public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for ";
public static String DETECTOR_ALREADY_RUNNING = "Detector is already running";
public static String AD_TASK_ID_MISSING = "AD task ID is missing";
public static String DETECTOR_MISSING = "Detector is missing";
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class CommonName {
public static final String ACTIVE_ENTITIES = "active_entities";
public static final String ENTITY_INFO = "entity_info";
public static final String TOTAL_UPDATES = "total_updates";
public static final String AD_TASK = "ad_task";

// ======================================
// Index mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
String taskType = null;
String checkpointId = null;
AnomalyDetector detector = null;
String parsedTaskId = taskId;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -374,13 +375,16 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
case DETECTOR_FIELD:
detector = AnomalyDetector.parse(parser);
break;
case TASK_ID_FIELD:
parsedTaskId = parser.text();
break;
default:
parser.skipChildren();
break;
}
}
return new Builder()
.taskId(taskId)
.taskId(parsedTaskId)
.lastUpdateTime(lastUpdateTime)
.startedBy(startedBy)
.stoppedBy(stoppedBy)
Expand Down Expand Up @@ -477,6 +481,10 @@ public String getState() {
return state;
}

public void setState(String state) {
this.state = state;
}

public String getDetectorId() {
return detectorId;
}
Expand Down
Loading