Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jan 15, 2025
1 parent 41d5949 commit 352075b
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 85 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ systemProp.org.gradle.warning.mode=fail
systemProp.jdk.tls.client.protocols=TLSv1.2

# jvm args for faster test execution by default
systemProp.tests.jvm.argline=-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m
systemProp.tests.jvm.argline=-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.CHECKPOINT_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE;

import java.io.IOException;
import java.util.EnumMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ protected static String getMappings(String mappingFileRelativePath) throws IOExc
return Resources.toString(url, Charsets.UTF_8);
}

public static String getScripts(String scriptFileRelativePath) throws IOException {
URL url = IndexManagement.class.getClassLoader().getResource(scriptFileRelativePath);
return Resources.toString(url, Charsets.UTF_8);
}

protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenIndex) {
request
.settings(
Expand Down Expand Up @@ -1019,26 +1024,23 @@ public void initFlattenedResultIndex(String indexName, ActionListener<CreateInde
logger.info("Initializing flattened result index: {}", indexName);

CreateIndexRequest request = new CreateIndexRequest(indexName)
.mapping(getFlattenedResultIndexMappings(), XContentType.JSON)
.settings(settings);
.mapping(getFlattenedResultIndexMappings(), XContentType.JSON)
.settings(settings);
choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(
response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {}", indexName);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
},
exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
}
));
adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {}", indexName);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.constant.CommonMessages.CATEGORICAL_FIELD_TYPE_ERR_MSG;
import static org.opensearch.timeseries.constant.CommonMessages.TIMESTAMP_VALIDATION_FAILED;
import static org.opensearch.timeseries.indices.IndexManagement.getScripts;
import static org.opensearch.timeseries.util.ParseUtils.parseAggregators;
import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
import static org.opensearch.timeseries.util.RestHandlerUtils.isExceptionCausedByInvalidQuery;
Expand All @@ -25,14 +26,18 @@
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.ingest.DeletePipelineRequest;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
import org.opensearch.client.Client;
Expand All @@ -41,11 +46,14 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand All @@ -68,6 +76,7 @@
import org.opensearch.timeseries.model.TimeSeriesTask;
import org.opensearch.timeseries.model.ValidationAspect;
import org.opensearch.timeseries.model.ValidationIssueType;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.task.TaskCacheManager;
import org.opensearch.timeseries.task.TaskManager;
import org.opensearch.timeseries.util.*;
Expand Down Expand Up @@ -454,39 +463,35 @@ private void handlePutRequest(boolean indexingDryRun, ActionListener<T> listener
}

private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listener) {
createConfig(indexingDryRun, ActionListener.wrap(
createConfigResponse -> {
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
String detectorId = response.getId();
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase();
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase();

timeSeriesIndices.initFlattenedResultIndex(indexName, ActionListener.wrap(
initResponse -> setupIngestPipeline(detectorId, ActionListener.wrap(
pipelineResponse -> {
updateResultIndexSetting(pipelineId, indexName, ActionListener.wrap(
updateResponse -> listener.onResponse(createConfigResponse),
listener::onFailure
));
},
listener::onFailure
)),
listener::onFailure
));
} else {
listener.onResponse(createConfigResponse);
}
},
listener::onFailure
));
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
String detectorId = response.getId();
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase();
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase();

timeSeriesIndices
.initFlattenedResultIndex(
indexName,
ActionListener.wrap(initResponse -> setupIngestPipeline(detectorId, ActionListener.wrap(pipelineResponse -> {
updateResultIndexSetting(
pipelineId,
indexName,
ActionListener.wrap(updateResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
);
}, listener::onFailure)), listener::onFailure)
);
} else {
listener.onResponse(createConfigResponse);
}
}, listener::onFailure));
}

private boolean shouldHandleFlattening(boolean indexingDryRun, Object createConfigResponse) {
return !indexingDryRun
&& config.getCustomResultIndexOrAlias() != null
&& config.getFlattenResultIndexMapping()
&& createConfigResponse instanceof IndexAnomalyDetectorResponse;
&& config.getCustomResultIndexOrAlias() != null
&& config.getFlattenResultIndexMapping()
&& createConfigResponse instanceof IndexAnomalyDetectorResponse;
}

protected void setupIngestPipeline(String detectorId, ActionListener<T> listener) {
Expand All @@ -498,22 +503,19 @@ protected void setupIngestPipeline(String detectorId, ActionListener<T> listener

PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);

client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(
response -> {
if (response.isAcknowledged()) {
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
listener.onResponse(null);
} else {
String errorMessage = "Ingest pipeline creation was not acknowledged for pipelineId: " + pipelineId;
logger.error(errorMessage);
listener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}
},
exception -> {
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
listener.onFailure(exception);
}
));
client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
listener.onResponse(null);
} else {
String errorMessage = "Ingest pipeline creation was not acknowledged for pipelineId: " + pipelineId;
logger.error(errorMessage);
listener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}
}, exception -> {
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
listener.onFailure(exception);
}));

} catch (IOException e) {
logger.error("Exception while building ingest pipeline definition for pipeline ID: {}", pipelineId, e);
Expand Down Expand Up @@ -555,22 +557,19 @@ protected void updateResultIndexSetting(String pipelineId, String flattenedResul

updateSettingsRequest.settings(settingsBuilder);

client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(
response -> {
if (response.isAcknowledged()) {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId);
listener.onResponse(null);
} else {
String errorMsg = "Settings update not acknowledged for index: " + flattenedResultIndex;
logger.error(errorMsg);
listener.onFailure(new OpenSearchStatusException(errorMsg, RestStatus.INTERNAL_SERVER_ERROR));
}
},
exception -> {
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId, exception);
listener.onFailure(exception);
}
));
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId);
listener.onResponse(null);
} else {
String errorMsg = "Settings update not acknowledged for index: " + flattenedResultIndex;
logger.error(errorMsg);
listener.onFailure(new OpenSearchStatusException(errorMsg, RestStatus.INTERNAL_SERVER_ERROR));
}
}, exception -> {
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId, exception);
listener.onFailure(exception);
}));
}

private void handleFlattenResultIndexMappingUpdate(ActionListener<T> listener) {
Expand Down Expand Up @@ -611,7 +610,6 @@ public void onFailure(Exception e) {
}
}
});
>>>>>>> 2a322387 (add a feature that flattens custom result index when enabled)
}
}

Expand Down Expand Up @@ -1018,7 +1016,7 @@ public void onFailure(Exception e) {
});
}

protected void onCreateMappingsResponse(CreateIndexResponse response, boolean indexingDryRun, ActionListener<T> listener) throws IOException {
protected void onCreateMappingsResponse(CreateIndexResponse response, boolean indexingDryRun, ActionListener<T> listener) {
if (response.isAcknowledged()) {
logger.info("Created {} with mappings.", CommonName.CONFIG_INDEX);
prepareConfigIndexing(indexingDryRun, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,6 @@ public class TimeSeriesSettings {

// max entities to track per detector
public static final int MAX_TRACKING_ENTITIES = 1000000;

public static final String FLATTEN_CUSTOM_RESULT_INDEX_PAINLESS = "scripts/flatten-custom-result-index-painless.txt";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Create a map to store the relationship between feature_id and feature_name from feature_data
def featureNameMap = [:];

// Populate the map from feature_data
if (ctx.containsKey('feature_data') && ctx.feature_data != null) {
for (int i = 0; i < ctx.feature_data.length; i++) {
def feature = ctx.feature_data[i];
if (feature != null && feature.containsKey('feature_id') && feature.containsKey('feature_name')) {
featureNameMap[feature.feature_id] = feature.feature_name;
ctx['feature_data_' + feature.feature_name] = feature.data; // Flatten feature_data as before
}
}
}

// Flatten nested entity field
if (ctx.containsKey('entity') && ctx.entity != null) {
for (int i = 0; i < ctx.entity.length; i++) {
def entity = ctx.entity[i];
if (entity != null && entity.containsKey('name') && entity.containsKey('value')) {
ctx['entity_' + entity.name] = entity.value;
}
}
}

// Flatten nested relevant_attribution field
if (ctx.containsKey('relevant_attribution') && ctx.relevant_attribution != null) {
for (int i = 0; i < ctx.relevant_attribution.length; i++) {
def attribution = ctx.relevant_attribution[i];
if (attribution != null && attribution.containsKey('feature_id') && attribution.containsKey('data')) {
def featureName = featureNameMap[attribution.feature_id];
if (featureName != null) {
ctx['relevant_attribution_' + featureName] = attribution.data;
}
}
}
}

// Flatten nested expected_values field
if (ctx.containsKey('expected_values') && ctx.expected_values != null) {
for (int i = 0; i < ctx.expected_values.length; i++) {
def expected = ctx.expected_values[i];
if (expected != null && expected.containsKey('value_list') && expected.value_list != null) {
for (int j = 0; j < expected.value_list.length; j++) {
def value = expected.value_list[j];
if (value != null && value.containsKey('feature_id') && value.containsKey('data')) {
def featureName = featureNameMap[value.feature_id];
if (featureName != null) {
ctx['expected_values_' + featureName] = value.data;
}
}
}
}
}
}

// Flatten nested past_values field
if (ctx.containsKey('past_values') && ctx.past_values != null) {
for (int i = 0; i < ctx.past_values.length; i++) {
def pastValue = ctx.past_values[i];
if (pastValue != null && pastValue.containsKey('feature_id') && pastValue.containsKey('data')) {
def featureName = featureNameMap[pastValue.feature_id];
if (featureName != null) {
ctx['past_value_' + featureName] = pastValue.data;
}
}
}
}

0 comments on commit 352075b

Please sign in to comment.