From 9ff8c7728d2e539f867446220e1e64279d4acdf8 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 5 Oct 2023 11:43:00 -0700 Subject: [PATCH] SearchBackPressure Service Node/Cluster RCA (#437) (#496) * Remove log files and add DCO (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Remove extra files (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Remove styling difference (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Remove unnecessary file changes (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add RCA_Decider (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Extract Heap Usage from SQlitedb (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Extract required searchbp metrics for deciders (signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add SearchBackPressureRCA Metric (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Use SearchBackPressureRCAMetrics to aggregate metrics (signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add the conf file extracted part for SearchBackPressureRcaConfig.java (signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add MinMaxSlidingWindow in OldGenRca (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Rename SearchBackPressureClusterRCA and add it to AnalysisGraph (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add basic UTs for SearchBackPressureRCA cluster/node level (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add unhealthy/healthy stats UTs for SearchBackPressureRCA cluster/node level (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add healthy resource unit UT (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add UT s both shard/task level (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add a new SearchBp Resource Unit (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add UTs to test shard/task level resource include-ness (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Remove styling changes for Version.java (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Add metadata to resourceSummary (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Update to more general framework (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Refactor the MinMaxSlidingWindow and bug fix (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Refactor Heap Stats Metrics Getter(Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Refactor HeapUsed and HeapMax Getters (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Refactor operate() (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Refactor operate() and remove dead comments (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Merged Main (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Merged Main (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * remove trailing space in build.gradle (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * nit javadoc update (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * nit javadoc updates (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * nit javadoc updates (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Remove dead comments (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * update javadoc (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * LOG Level Change (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey * Change from static class to enum (Signed-off-by: Jeffrey Liu ujeffliu@amazon.com) Signed-off-by: CoderJeffrey --------- Signed-off-by: CoderJeffrey (cherry picked from commit 2b970e35b3ff45ec8cf869ffc3d45f8710c06d78) Co-authored-by: Jeffrey Liu --- .../model/MetricsModel.java | 5 + .../configs/SearchBackPressureRcaConfig.java | 148 +++++ .../framework/api/metrics/Searchbp_Stats.java | 18 + .../framework/api/summaries/ResourceUtil.java | 18 + .../rca/framework/core/RcaConf.java | 5 + .../rca/store/OpenSearchAnalysisGraph.java | 32 ++ .../rca/store/rca/OldGenRca.java | 58 +- .../SearchBackPressureClusterRCA.java | 25 + .../SearchBackPressureRCA.java | 520 ++++++++++++++++++ .../model/SearchBackPressureRCAMetric.java | 101 ++++ src/main/proto/inter_node_rpc_service.proto | 6 + .../SearchBackPressureRcaTest.java | 439 +++++++++++++++ 12 files changed, 1374 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/Searchbp_Stats.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/model/SearchBackPressureRCAMetric.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRcaTest.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java index 5b144ac12..f7c781ac1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java @@ -464,6 +464,11 @@ public class MetricsModel { MetricUnits.MILLISECOND.toString(), AllMetrics.ShardIndexingPressureDimension.values())); + // Search Back Pressure Metrics + allMetricsInitializer.put( + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString(), + new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values())); ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java b/src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java new file mode 100644 index 000000000..a4c81a53b --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java @@ -0,0 +1,148 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.rca.configs; + + +import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; + +public class SearchBackPressureRcaConfig { + public static final String CONFIG_NAME = "search-back-pressure-rca-policy"; + + /* Metadata fields for thresholds */ + public static final String INCREASE_THRESHOLD_BY_JVM_STR = "increase_jvm"; + public static final String DECREASE_THRESHOLD_BY_JVM_STR = "decrease_jvm"; + + public static final int SLIDING_WINDOW_SIZE_IN_MINS = 1; + + // Interval period in seconds + public static final long DEFAULT_EVALUATION_INTERVAL_IN_S = 60; + + /* interval period to call operate() */ + public static final long EVAL_INTERVAL_IN_S = 5; + + /* Increase Threshold */ + // node max heap usage in last 60 secs is less than 70% + public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD = 70; + private Integer maxHeapIncreasePercentageThreshold; + + // cancellationCount due to heap is more than 50% of all task cancellations in shard level + public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD = 50; + private Integer maxShardHeapCancellationPercentageThreshold; + + // cancellationCount due to heap is more than 50% of all task cancellations in task level + public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD = 50; + private Integer maxTaskHeapCancellationPercentageThreshold; + + /* Decrease Threshold */ + // node min heap usage in last 60 secs is more than 80% + public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD = 80; + private Integer minHeapDecreasePercentageThreshold; + + // cancellationCount due to heap is less than 30% of all task cancellations in shard level + public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD = 30; + private Integer minShardHeapCancellationPercentageThreshold; + + // cancellationCount due to heap is less than 30% of all task cancellations in task level + public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD = 30; + private Integer minTaskHeapCancellationPercentageThreshold; + + public SearchBackPressureRcaConfig(final RcaConf conf) { + // (s) -> s > 0 is the validator, if validated, fields from conf file will be returned, + // else, default value gets returned + maxHeapIncreasePercentageThreshold = + conf.readRcaConfig( + CONFIG_NAME, + SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_INCREASE_FIELD.toString(), + DEFAULT_MAX_HEAP_INCREASE_THRESHOLD, + (s) -> s >= 0 && s <= 100, + Integer.class); + maxShardHeapCancellationPercentageThreshold = + conf.readRcaConfig( + CONFIG_NAME, + SearchBackPressureRcaConfigKeys.MAX_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD + .toString(), + DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD, + (s) -> s >= 0 && s <= 100, + Integer.class); + maxTaskHeapCancellationPercentageThreshold = + conf.readRcaConfig( + CONFIG_NAME, + SearchBackPressureRcaConfigKeys.MAX_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD + .toString(), + DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD, + (s) -> s >= 0 && s <= 100, + Integer.class); + minHeapDecreasePercentageThreshold = + conf.readRcaConfig( + CONFIG_NAME, + SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_DECREASE_FIELD.toString(), + DEFAULT_MIN_HEAP_DECREASE_THRESHOLD, + (s) -> s >= 0 && s <= 100, + Integer.class); + minShardHeapCancellationPercentageThreshold = + conf.readRcaConfig( + CONFIG_NAME, + SearchBackPressureRcaConfigKeys.MIN_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD + .toString(), + DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD, + (s) -> s >= 0 && s <= 100, + Integer.class); + minTaskHeapCancellationPercentageThreshold = + conf.readRcaConfig( + CONFIG_NAME, + SearchBackPressureRcaConfigKeys.MIN_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD + .toString(), + DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD, + (s) -> s >= 0 && s <= 100, + Integer.class); + } + + // Getters for private field + public Integer getMaxHeapIncreasePercentageThreshold() { + return maxHeapIncreasePercentageThreshold; + } + + public Integer getMaxShardHeapCancellationPercentageThreshold() { + return maxShardHeapCancellationPercentageThreshold; + } + + public Integer getMaxTaskHeapCancellationPercentageThreshold() { + return maxTaskHeapCancellationPercentageThreshold; + } + + public Integer getMinHeapDecreasePercentageThreshold() { + return minHeapDecreasePercentageThreshold; + } + + public Integer getMinShardHeapCancellationPercentageThreshold() { + return minShardHeapCancellationPercentageThreshold; + } + + public Integer getMinTaskHeapCancellationPercentageThreshold() { + return minTaskHeapCancellationPercentageThreshold; + } + + // name for the configuration field + public enum SearchBackPressureRcaConfigKeys { + MAX_HEAP_USAGE_INCREASE_FIELD("max-heap-usage-increase"), + MAX_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD("max-shard-heap-cancellation-percentage"), + MAX_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD("max-task-heap-cancellation-percentage"), + MAX_HEAP_USAGE_DECREASE_FIELD("max-heap-usage-decrease"), + MIN_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD("min-shard-heap-cancellation-percentage"), + MIN_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD("min-task-heap-cancellation-percentage"); + + private final String value; + + SearchBackPressureRcaConfigKeys(final String value) { + this.value = value; + } + + @Override + public String toString() { + return this.value; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/Searchbp_Stats.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/Searchbp_Stats.java new file mode 100644 index 000000000..e655e4edc --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/Searchbp_Stats.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.rca.framework.api.metrics; + + +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; +import org.opensearch.performanceanalyzer.rca.framework.api.Metric; + +public class Searchbp_Stats extends Metric { + public Searchbp_Stats(long evaluationIntervalSeconds) { + super( + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString(), + evaluationIntervalSeconds); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java index 876cd4ca1..03db4299b 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java @@ -135,6 +135,24 @@ public class ResourceUtil { .setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE) .setMetricEnum(MetricEnum.CACHE_MAX_SIZE) .build(); + /* + * searchbackpressure related resource + * SEARCHBACKPRESSURE_SHARD resource indicate a searchbackpressure unhealthy resource unit is caused by shard level cancellation + */ + public static final Resource SEARCHBACKPRESSURE_SHARD = + Resource.newBuilder() + .setResourceEnum(ResourceEnum.SEARCHBP) + .setMetricEnum(MetricEnum.SEARCHBP_SHARD) + .build(); + + /* + * SEARCHBACKPRESSURE_TASK resource indicate a searchbackpressure unhealthy resource unit is caused by task level cancellation + */ + public static final Resource SEARCHBACKPRESSURE_TASK = + Resource.newBuilder() + .setResourceEnum(ResourceEnum.SEARCHBP) + .setMetricEnum(MetricEnum.SEARCHBP_TASK) + .build(); /** * Read the resourceType name from the ResourceType object diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/core/RcaConf.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/core/RcaConf.java index 4005e1c15..0ff06f0af 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/core/RcaConf.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/core/RcaConf.java @@ -51,6 +51,7 @@ import org.opensearch.performanceanalyzer.rca.configs.HotShardRcaConfig; import org.opensearch.performanceanalyzer.rca.configs.OldGenContendedRcaConfig; import org.opensearch.performanceanalyzer.rca.configs.QueueRejectionRcaConfig; +import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; import org.opensearch.performanceanalyzer.rca.configs.ShardRequestCacheRcaConfig; import org.opensearch.performanceanalyzer.rca.framework.api.summaries.bucket.BasicBucketCalculator; import org.opensearch.performanceanalyzer.rca.framework.api.summaries.bucket.BucketCalculator; @@ -232,6 +233,10 @@ public OldGenContendedRcaConfig getOldGenContendedRcaConfig() { return new OldGenContendedRcaConfig(this); } + public SearchBackPressureRcaConfig getSearchBackPressureRcaConfig() { + return new SearchBackPressureRcaConfig(this); + } + public T readRcaConfig( String rcaName, String key, T defaultValue, Class clazz) { return readRcaConfig(rcaName, key, defaultValue, (s) -> true, clazz); diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java index 80763befb..1b11c014e 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java @@ -41,6 +41,7 @@ import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Heap_Max; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Heap_Used; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.IndexWriter_Memory; +import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Searchbp_Stats; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_RejectedReqs; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Thread_Blocked_Time; @@ -85,6 +86,8 @@ import org.opensearch.performanceanalyzer.rca.store.rca.jvmsizing.LargeHeapClusterRca; import org.opensearch.performanceanalyzer.rca.store.rca.jvmsizing.OldGenContendedRca; import org.opensearch.performanceanalyzer.rca.store.rca.jvmsizing.OldGenReclamationRca; +import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureClusterRCA; +import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureRCA; import org.opensearch.performanceanalyzer.rca.store.rca.temperature.NodeTemperatureRca; import org.opensearch.performanceanalyzer.rca.store.rca.temperature.dimension.CpuUtilDimensionTemperatureRca; import org.opensearch.performanceanalyzer.rca.store.rca.temperature.dimension.HeapAllocRateTemperatureRca; @@ -117,6 +120,9 @@ public void construct() { MetricsDB.AVG, AllMetrics.CommonDimension.OPERATION.toString()); + // SearchBackpressure Metric + Metric searchbp_Stats = new Searchbp_Stats(EVALUATION_INTERVAL_SECONDS); + heapUsed.addTag( RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); @@ -141,6 +147,9 @@ public void construct() { threadWaitedTime.addTag( RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); + searchbp_Stats.addTag( + RcaConsts.RcaTagConstants.TAG_LOCUS, + RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); addLeaf(heapUsed); addLeaf(gcEvent); @@ -150,6 +159,7 @@ public void construct() { addLeaf(cpuUtilizationGroupByOperation); addLeaf(threadBlockedTime); addLeaf(threadWaitedTime); + addLeaf(searchbp_Stats); // add node stats metrics List nodeStatsMetrics = constructNodeStatsMetrics(); @@ -433,6 +443,28 @@ public void construct() { shardRequestCacheClusterRca, highHeapUsageClusterRca)); + // Search Back Pressure Service RCA enabled + SearchBackPressureRCA searchBackPressureRCA = + new SearchBackPressureRCA(RCA_PERIOD, heapMax, heapUsed, searchbp_Stats); + searchBackPressureRCA.addTag( + RcaConsts.RcaTagConstants.TAG_LOCUS, + RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); + searchBackPressureRCA.addAllUpstreams(Arrays.asList(heapMax, heapUsed, searchbp_Stats)); + + // Search Back Pressure Service Cluster RCA enabled + SearchBackPressureClusterRCA searchBackPressureClusterRCA = + new SearchBackPressureClusterRCA(RCA_PERIOD, searchBackPressureRCA); + searchBackPressureClusterRCA.addTag( + RcaConsts.RcaTagConstants.TAG_LOCUS, + RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); + searchBackPressureClusterRCA.addAllUpstreams( + Collections.singletonList(searchBackPressureRCA)); + searchBackPressureClusterRCA.addTag( + RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM, + RcaConsts.RcaTagConstants.LOCUS_DATA_NODE); + + // TODO: Add SearchBackPressure Decider + AdmissionControlDecider admissionControlDecider = buildAdmissionControlDecider(heapUsed, heapMax); diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java index fc3558339..e2056eec8 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java @@ -222,7 +222,7 @@ protected boolean isOldGenCollectorCMS() { return true; } - /** Sliding window to check the minimal olg gen usage within a given time frame */ + /** Sliding window to check the minimal old gen usage within a given time frame */ public static class MinOldGenSlidingWindow extends SlidingWindow { public MinOldGenSlidingWindow(int SLIDING_WINDOW_SIZE_IN_TIMESTAMP, TimeUnit timeUnit) { @@ -250,4 +250,60 @@ public double readMin() { return Double.NaN; } } + + /** + * Sliding window to check the max/min old gen usage within a given time frame + * + * @param isMinSlidingWindow true if the sliding window is for min usage, false for max usage + * Provides a more general framework than MinOldGenSlidingWindow as this sliding window can + * be implemented as minSlidingWindow or maxSlidingWindow depending on the need. + */ + public static class MinMaxSlidingWindow extends SlidingWindow { + boolean isMinSlidingWindow; + + public MinMaxSlidingWindow( + int SLIDING_WINDOW_SIZE_IN_TIMESTAMP, + TimeUnit timeUnit, + boolean isMinSlidingWindow) { + super(SLIDING_WINDOW_SIZE_IN_TIMESTAMP, timeUnit); + this.isMinSlidingWindow = isMinSlidingWindow; + } + + @Override + public void next(SlidingWindowData e) { + if (isMinSlidingWindow) { + // monotonically decreasing sliding window + while (!windowDeque.isEmpty() + && windowDeque.peekFirst().getValue() >= e.getValue()) { + windowDeque.pollFirst(); + } + } else { + // monotonically increasing sliding window + while (!windowDeque.isEmpty() + && windowDeque.peekFirst().getValue() < e.getValue()) { + windowDeque.pollFirst(); + } + } + + windowDeque.addFirst(e); + while (!windowDeque.isEmpty() + && TimeUnit.MILLISECONDS.toSeconds( + e.getTimeStamp() - windowDeque.peekLast().getTimeStamp()) + > SLIDING_WINDOW_SIZE) { + windowDeque.pollLast(); + } + } + + /* + * read last element in the window + * if the sliding window is MinSlidingWindow then returns the min element + * else return the max element in the deque + */ + public double readLastElementInWindow() { + if (!windowDeque.isEmpty()) { + return windowDeque.peekLast().getValue(); + } + return Double.NaN; + } + } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java new file mode 100644 index 000000000..a7b95bada --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure; + + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.rca.framework.api.Rca; +import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import org.opensearch.performanceanalyzer.rca.store.rca.cluster.BaseClusterRca; + +public class SearchBackPressureClusterRCA extends BaseClusterRca { + + public static final String RCA_TABLE_NAME = SearchBackPressureClusterRCA.class.getSimpleName(); + private static final Logger LOG = LogManager.getLogger(SearchBackPressureClusterRCA.class); + + public >> SearchBackPressureClusterRCA( + final int rcaPeriod, final R SearchBackPressureRCA) { + super(rcaPeriod, SearchBackPressureRCA); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java new file mode 100644 index 000000000..d274e6f52 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java @@ -0,0 +1,520 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure; + +import static org.opensearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil.readDataFromSqlResult; +import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_SHARD; +import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_TASK; + +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.Field; +import org.jooq.impl.DSL; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; +import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage; +import org.opensearch.performanceanalyzer.metricsdb.MetricsDB; +import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; +import org.opensearch.performanceanalyzer.rca.framework.api.Metric; +import org.opensearch.performanceanalyzer.rca.framework.api.Rca; +import org.opensearch.performanceanalyzer.rca.framework.api.Resources; +import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.SlidingWindow; +import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.SlidingWindowData; +import org.opensearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; +import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; +import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import org.opensearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; +import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails; +import org.opensearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; +import org.opensearch.performanceanalyzer.rca.store.rca.OldGenRca.MinMaxSlidingWindow; +import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.model.SearchBackPressureRCAMetric; + +public class SearchBackPressureRCA extends Rca> { + private static final Logger LOG = LogManager.getLogger(SearchBackPressureRCA.class); + private static final double BYTES_TO_GIGABYTES = Math.pow(1024, 3); + private static final long EVAL_INTERVAL_IN_S = SearchBackPressureRcaConfig.EVAL_INTERVAL_IN_S; + private static final double CONVERT_BYTES_TO_MEGABYTES = Math.pow(1024, 2); + + // Key metrics used to determine RCA Flow Unit health status + private final Metric heapUsed; + private final Metric heapMax; + private final Metric searchbp_Stats; + + // default value for heapUsed and heapMax + private static final double DEFAULT_HEAP_VAL = 0.0; + + /* + * threshold to increase heap limits + */ + private long heapUsedIncreaseThreshold; + // shard-level searchbp heap cancellation increase threshold + private long heapShardCancellationIncreaseMaxThreshold; + + // task-level searchbp heap cancellation increase threshold + private long heapTaskCancellationIncreaseMaxThreshold; + + /* + * threshold to decrease heap limits + */ + private long heapUsedDecreaseThreshold; + // shard-level searchbp heap cancellation decrease threshold + private long heapShardCancellationDecreaseMinThreashold; + + // task-level searchbp heap cancellation decrease threshold + private long heapTaskCancellationDecreaseMinThreashold; + + /* + * Sliding Window + * SearchBackPressureRCA keep track the continous performance of 3 key metrics + * TaskJVMCancellationPercent/ShardJVMCancellationPercent/HeapUsagePercent + */ + private final SlidingWindow taskJVMCancellationSlidingWindow; + private final SlidingWindow shardJVMCancellationSlidingWindow; + private final MinMaxSlidingWindow minHeapUsageSlidingWindow; + private final MinMaxSlidingWindow maxHeapUsageSlidingWindow; + + // Sliding Window Interval + private static final int SLIDING_WINDOW_SIZE_IN_MINS = + SearchBackPressureRcaConfig.SLIDING_WINDOW_SIZE_IN_MINS; + private static final int SLIDING_WINDOW_SIZE_IN_SECS = SLIDING_WINDOW_SIZE_IN_MINS * 60; + + // currentIterationNumber to check the samples has been taken, only emit flow units when + // currentIterationNumber equals to + // rcaPeriod + private long currentIterationNumber; + + // Required amount of RCA period this RCA needs to run before sending out a flowunit + private final int rcaPeriod; + + // Current time + protected Clock clock; + + public SearchBackPressureRCA( + final int rcaPeriod, final M heapMax, final M heapUsed, M searchbp_Stats) { + super(EVAL_INTERVAL_IN_S); + this.heapUsed = heapUsed; + this.heapMax = heapMax; + this.rcaPeriod = rcaPeriod; + this.clock = Clock.systemUTC(); + this.searchbp_Stats = searchbp_Stats; + + // threshold for heap usage + this.heapUsedIncreaseThreshold = + SearchBackPressureRcaConfig.DEFAULT_MAX_HEAP_INCREASE_THRESHOLD; + this.heapUsedDecreaseThreshold = + SearchBackPressureRcaConfig.DEFAULT_MIN_HEAP_DECREASE_THRESHOLD; + + /* + * threshold for search back pressure service stats + * currently, only consider the percentage of JVM Usage cancellation count compared to the total cancellation count + */ + this.heapShardCancellationIncreaseMaxThreshold = + SearchBackPressureRcaConfig.DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD; + this.heapTaskCancellationIncreaseMaxThreshold = + SearchBackPressureRcaConfig.DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD; + + this.heapShardCancellationDecreaseMinThreashold = + SearchBackPressureRcaConfig.DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD; + this.heapTaskCancellationDecreaseMinThreashold = + SearchBackPressureRcaConfig.DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD; + + // sliding window for heap usage + this.minHeapUsageSlidingWindow = + new MinMaxSlidingWindow(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, true); + this.maxHeapUsageSlidingWindow = + new MinMaxSlidingWindow(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, false); + + // sliding window for JVM + this.shardJVMCancellationSlidingWindow = + new SlidingWindow<>(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES); + this.taskJVMCancellationSlidingWindow = + new SlidingWindow<>(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES); + + LOG.debug("SearchBackPressureRCA initialized"); + } + + /* + * generateFlowUnitListFromWire() compute the flow units from other hosts in the cluster + * for a given Metric and try to send the subscription requests + * to stale or new hosts in cluster if need be + */ + @Override + public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { + final List flowUnitMessages = + args.getWireHopper().readFromWire(args.getNode()); + final List> flowUnitList = new ArrayList<>(); + LOG.info("rca: Executing fromWire: {}", this.getClass().getSimpleName()); + for (FlowUnitMessage flowUnitMessage : flowUnitMessages) { + flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage)); + } + setFlowUnits(flowUnitList); + } + + /* + * operate() evaluates the current stats against threshold + * Unhealthy Flow Units is a marker that this resource at current instance is not healthy + * Autotune decision would be made by downstream classes + */ + @Override + public ResourceFlowUnit operate() { + currentIterationNumber += 1; + ResourceContext context = null; + long currentTimeMillis = System.currentTimeMillis(); + + // read key metrics into searchBackPressureRCAMetric for easier management + SearchBackPressureRCAMetric searchBackPressureRCAMetric = getSearchBackPressureRCAMetric(); + + // print out oldGenUsed and maxOldGen + LOG.debug( + "SearchBackPressureRCA: oldGenUsed: {} maxOldGen: {}, heapUsedPercentage: {}, searchbpShardCancellationCount: {}, searchbpTaskCancellationCount: {}, searchbpJVMShardCancellationCount: {}, searchbpJVMTaskCancellationCount: {}", + searchBackPressureRCAMetric.getUsedHeap(), + searchBackPressureRCAMetric.getMaxHeap(), + searchBackPressureRCAMetric.getHeapUsagePercent(), + searchBackPressureRCAMetric.getSearchbpShardCancellationCount(), + searchBackPressureRCAMetric.getSearchbpTaskCancellationCount(), + searchBackPressureRCAMetric.getSearchbpJVMShardCancellationCount(), + searchBackPressureRCAMetric.getSearchbpJVMTaskCancellationCount()); + + updateAllSlidingWindows(searchBackPressureRCAMetric, currentTimeMillis); + + LOG.debug("SearchBackPressureRCA currentIterationNumber is {}", currentIterationNumber); + // if currentIterationNumber matches the rca period, emit the flow unit + if (currentIterationNumber == this.rcaPeriod) { + LOG.debug( + "SearchBackPressureRCA currentIterationNumber in rcaPeriod is {}", + currentIterationNumber); + currentTimeMillis = System.currentTimeMillis(); + + // reset currentIterationNumber + currentIterationNumber = 0; + + double maxHeapUsagePercentage = maxHeapUsageSlidingWindow.readLastElementInWindow(); + double minHeapUsagePercentage = minHeapUsageSlidingWindow.readLastElementInWindow(); + double avgShardJVMCancellationPercentage = shardJVMCancellationSlidingWindow.readAvg(); + double avgTaskJVMCancellationPercentage = taskJVMCancellationSlidingWindow.readAvg(); + + LOG.debug( + "SearchBackPressureRCA: maxHeapUsagePercentage: {}, minHeapUsagePercentage: {}, SearchBackPressureRCA: avgShardJVMCancellationPercentage: {}, SearchBackPressureRCA: avgTaskJVMCancellationPercentage: {}", + maxHeapUsagePercentage, + minHeapUsagePercentage, + avgShardJVMCancellationPercentage, + avgTaskJVMCancellationPercentage); + InstanceDetails instanceDetails = getInstanceDetails(); + HotNodeSummary nodeSummary = + new HotNodeSummary( + instanceDetails.getInstanceId(), instanceDetails.getInstanceIp()); + + /* + * 2 cases we send Unhealthy ResourceContext when we need to autotune the threshold + * (increase) node max heap usage in last 60 secs is less than 70% and cancellationCountPercentage due to heap is more than 50% of all task cancellations + * (decrease) node min heap usage in last 60 secs is more than 80% and cancellationCountPercetange due to heap is less than 30% of all task cancellations + */ + boolean maxHeapBelowIncreaseThreshold = + maxHeapUsagePercentage < heapUsedIncreaseThreshold; + boolean minHeapAboveDecreaseThreshold = + minHeapUsagePercentage > heapUsedDecreaseThreshold; + boolean shardHeapCancellationPercentageAboveThreshold = + avgShardJVMCancellationPercentage > heapShardCancellationIncreaseMaxThreshold; + boolean shardHeapCancellationPercentageBelowThreshold = + avgShardJVMCancellationPercentage < heapShardCancellationDecreaseMinThreashold; + boolean taskHeapCancellationPercentageAboveThreshold = + avgTaskJVMCancellationPercentage > heapTaskCancellationIncreaseMaxThreshold; + boolean taskHeapCancellationPercentageBelowThreshold = + avgTaskJVMCancellationPercentage < heapTaskCancellationDecreaseMinThreashold; + + // shard level thresholds + boolean increaseJVMThresholdMetByShard = + maxHeapBelowIncreaseThreshold && shardHeapCancellationPercentageAboveThreshold; + boolean decreaseJVMThresholdMetByShard = + minHeapAboveDecreaseThreshold && shardHeapCancellationPercentageBelowThreshold; + + // task level thresholds + boolean increaseJVMThresholdMetByTask = + maxHeapBelowIncreaseThreshold && taskHeapCancellationPercentageAboveThreshold; + boolean decreaseJVMThresholdMetByTask = + minHeapAboveDecreaseThreshold && taskHeapCancellationPercentageBelowThreshold; + + // Generate a flow unit with an Unhealthy ResourceContext + LOG.debug( + "Increase/Decrease Condition Meet for Shard, maxHeapUsagePercentage: {} is less than threshold: {}, avgShardJVMCancellationPercentage: {} is bigger than heapShardCancellationIncreaseMaxThreshold: {}", + maxHeapUsagePercentage, + heapUsedIncreaseThreshold, + avgShardJVMCancellationPercentage, + heapShardCancellationIncreaseMaxThreshold); + + if (increaseJVMThresholdMetByShard || decreaseJVMThresholdMetByShard) { + context = new ResourceContext(Resources.State.UNHEALTHY); + HotResourceSummary resourceSummary; + // metadata fields indicate the reason for Unhealthy Resource Unit + if (increaseJVMThresholdMetByShard) { + resourceSummary = + new HotResourceSummary( + SEARCHBACKPRESSURE_SHARD, + 0, + 0, + 0, + SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR); + } else { + resourceSummary = + new HotResourceSummary( + SEARCHBACKPRESSURE_SHARD, + 0, + 0, + 0, + SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR); + } + + nodeSummary.appendNestedSummary(resourceSummary); + return new ResourceFlowUnit<>( + currentTimeMillis, + context, + nodeSummary, + !instanceDetails.getIsClusterManager()); + } else if (increaseJVMThresholdMetByTask || decreaseJVMThresholdMetByTask) { + LOG.debug( + "Increase/Decrease Condition Meet for Task, maxHeapUsagePercentage: {} is less than threshold: {}, avgShardJVMCancellationPercentage: {} is bigger than heapShardCancellationIncreaseMaxThreshold: {}", + maxHeapUsagePercentage, + heapUsedIncreaseThreshold, + avgTaskJVMCancellationPercentage, + heapTaskCancellationIncreaseMaxThreshold); + + context = new ResourceContext(Resources.State.UNHEALTHY); + HotResourceSummary resourceSummary; + if (increaseJVMThresholdMetByTask) { + resourceSummary = + new HotResourceSummary( + SEARCHBACKPRESSURE_TASK, + 0, + 0, + 0, + SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR); + } else { + resourceSummary = + new HotResourceSummary( + SEARCHBACKPRESSURE_TASK, + 0, + 0, + 0, + SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR); + } + + nodeSummary.appendNestedSummary(resourceSummary); + return new ResourceFlowUnit<>( + currentTimeMillis, + context, + nodeSummary, + !instanceDetails.getIsClusterManager()); + } else { + // if autotune is not triggered, return healthy state + context = new ResourceContext(Resources.State.HEALTHY); + return new ResourceFlowUnit<>( + currentTimeMillis, + context, + nodeSummary, + !instanceDetails.getIsClusterManager()); + } + + } else { + // Return Empty ResourceFlowUnit if none of the thresholds is met + LOG.debug("Empty FlowUnit returned for SearchbackPressureRCA"); + currentTimeMillis = System.currentTimeMillis(); + return new ResourceFlowUnit<>(currentTimeMillis); + } + } + + /** + * Get the Heap Related Stats (Heap Used and Heap Size in gigabytes) + * + * @param isHeapUsed is true meaning get the value of used heap in gigabytes otherwise, meaning + * get the value of max heap in gigabytes + */ + public double getHeapStats(boolean isHeapUsed) { + double heapStats = DEFAULT_HEAP_VAL; + List heapStatsMetrics; + if (isHeapUsed == true) { + if (heapUsed == null) { + throw new IllegalStateException( + "RCA: " + + this.name() + + "was not configured in the graph to " + + "take heap_Used as a metric. Please check the analysis graph!"); + } + + heapStatsMetrics = heapUsed.getFlowUnits(); + } else { + if (heapMax == null) { + throw new IllegalStateException( + "RCA: " + + this.name() + + "was not configured in the graph to " + + "take heap_Max as a metric. Please check the analysis graph!"); + } + + heapStatsMetrics = heapMax.getFlowUnits(); + } + + for (MetricFlowUnit heapStatsMetric : heapStatsMetrics) { + if (heapStatsMetric.isEmpty()) { + continue; + } + + double ret = + SQLParsingUtil.readDataFromSqlResult( + heapStatsMetric.getData(), + AllMetrics.HeapDimension.MEM_TYPE.getField(), + AllMetrics.GCType.HEAP.toString(), + MetricsDB.MAX); + if (Double.isNaN(ret)) { + LOG.error( + "Failed to parse metric in FlowUnit from {}", + heapUsed.getClass().getName()); + } else { + heapStats = ret / CONVERT_BYTES_TO_MEGABYTES; + } + } + + return heapStats; + } + + private SearchBackPressureRCAMetric getSearchBackPressureRCAMetric() { + // Get Heap Usage related metrics + double prevHeapUsage = getHeapStats(true); + double maxHeapSize = getHeapStats(false); + + // Log prevHeapUsage and maxHeapSize + LOG.debug("prevHeapUsage: {}, maxHeapSize: {}", prevHeapUsage, maxHeapSize); + + // Get SearchBack Pressure related metrics from stats type field + Field searchbp_stats_type_field = + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM + .toString()), + String.class); + + double searchbpShardCancellationCount = + getMetric( + this.searchbp_Stats, + searchbp_stats_type_field, + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString()); + double searchbpTaskCancellationCount = + getMetric( + this.searchbp_Stats, + searchbp_stats_type_field, + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString()); + double searchbpJVMShardCancellationCount = + getMetric( + this.searchbp_Stats, + searchbp_stats_type_field, + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + double searchbpJVMTaskCancellationCount = + getMetric( + this.searchbp_Stats, + searchbp_stats_type_field, + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + + return new SearchBackPressureRCAMetric( + prevHeapUsage, + maxHeapSize, + searchbpShardCancellationCount, + searchbpTaskCancellationCount, + searchbpJVMShardCancellationCount, + searchbpJVMTaskCancellationCount); + } + + private double getMetric(M metric, Field field, String fieldName) { + if (metric == null) { + throw new IllegalStateException( + "RCA: " + + this.name() + + "was not configured in the graph to " + + "take " + + metric.name() + + " as a metric. Please check the analysis graph!"); + } + + double response = 0.0; + for (MetricFlowUnit flowUnit : metric.getFlowUnits()) { + if (!flowUnit.isEmpty()) { + double metricResponse = + readDataFromSqlResult(flowUnit.getData(), field, fieldName, MetricsDB.MAX); + LOG.debug("Searchbp metricResponse is: {}", metricResponse); + if (!Double.isNaN(metricResponse) && metricResponse >= 0.0) { + response = metricResponse; + } + } + } + LOG.debug("Searchbp response is: {}", response); + return response; + } + + /** + * read threshold values from rca.conf + * + * @param conf RcaConf object + */ + @Override + public void readRcaConf(RcaConf conf) { + final SearchBackPressureRcaConfig config = conf.getSearchBackPressureRcaConfig(); + + // threshold read from config file + this.heapUsedIncreaseThreshold = config.getMaxHeapIncreasePercentageThreshold(); + LOG.debug( + "SearchBackPressureRCA heapUsedIncreaseThreshold is set to {}", + this.heapUsedIncreaseThreshold); + this.heapShardCancellationIncreaseMaxThreshold = + config.getMaxShardHeapCancellationPercentageThreshold(); + this.heapTaskCancellationIncreaseMaxThreshold = + config.getMaxTaskHeapCancellationPercentageThreshold(); + this.heapUsedDecreaseThreshold = config.getMinHeapDecreasePercentageThreshold(); + this.heapShardCancellationDecreaseMinThreashold = + config.getMinShardHeapCancellationPercentageThreshold(); + this.heapTaskCancellationDecreaseMinThreashold = + config.getMinTaskHeapCancellationPercentageThreshold(); + } + + /* + * Update Stats for all Sliding Windows + */ + private void updateAllSlidingWindows( + SearchBackPressureRCAMetric searchBackPressureRCAMetric, long currentTimeMillis) { + double prevheapUsagePercentage = searchBackPressureRCAMetric.getHeapUsagePercent(); + if (!Double.isNaN(prevheapUsagePercentage)) { + minHeapUsageSlidingWindow.next( + new SlidingWindowData(currentTimeMillis, prevheapUsagePercentage)); + maxHeapUsageSlidingWindow.next( + new SlidingWindowData(currentTimeMillis, prevheapUsagePercentage)); + } + + double shardJVMCancellationPercentage = + searchBackPressureRCAMetric.getShardJVMCancellationPercent(); + if (!Double.isNaN(shardJVMCancellationPercentage)) { + shardJVMCancellationSlidingWindow.next( + new SlidingWindowData(currentTimeMillis, shardJVMCancellationPercentage)); + } + + double taskJVMCancellationPercentage = + searchBackPressureRCAMetric.getTaskJVMCancellationPercent(); + if (!Double.isNaN(taskJVMCancellationPercentage)) { + taskJVMCancellationSlidingWindow.next( + new SlidingWindowData(currentTimeMillis, taskJVMCancellationPercentage)); + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/model/SearchBackPressureRCAMetric.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/model/SearchBackPressureRCAMetric.java new file mode 100644 index 000000000..ef74e1763 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/model/SearchBackPressureRCAMetric.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.model; + +public class SearchBackPressureRCAMetric { + private final double usedHeap; + private final double maxHeap; + private final double searchbpShardCancellationCount; + private final double searchbpTaskCancellationCount; + private final double searchbpJVMShardCancellationCount; + private final double searchbpJVMTaskCancellationCount; + + public SearchBackPressureRCAMetric( + double usedHeap, + double maxHeap, + double searchbpShardCancellationCount, + double searchbpTaskCancellationCount, + double searchbpJVMShardCancellationCount, + double searchbpJVMTaskCancellationCount) { + this.usedHeap = usedHeap; + this.maxHeap = maxHeap; + this.searchbpShardCancellationCount = searchbpShardCancellationCount; + this.searchbpTaskCancellationCount = searchbpTaskCancellationCount; + this.searchbpJVMShardCancellationCount = searchbpJVMShardCancellationCount; + this.searchbpJVMTaskCancellationCount = searchbpJVMTaskCancellationCount; + } + + public double getUsedHeap() { + return usedHeap; + } + + public double getMaxHeap() { + return maxHeap; + } + + public double getSearchbpShardCancellationCount() { + return searchbpShardCancellationCount; + } + + public double getSearchbpTaskCancellationCount() { + return searchbpTaskCancellationCount; + } + + public double getSearchbpJVMShardCancellationCount() { + return searchbpJVMShardCancellationCount; + } + + public double getSearchbpJVMTaskCancellationCount() { + return searchbpJVMTaskCancellationCount; + } + + public double getHeapUsagePercent() { + if (this.getMaxHeap() == 0) { + return 0; + } + return 100 * this.getUsedHeap() / this.getMaxHeap(); + } + + public double getShardJVMCancellationPercent() { + if (this.getSearchbpShardCancellationCount() == 0) { + return 0; + } + return 100 + * this.getSearchbpJVMShardCancellationCount() + / this.getSearchbpShardCancellationCount(); + } + + public double getTaskJVMCancellationPercent() { + if (this.getSearchbpTaskCancellationCount() == 0) { + return 0; + } + return 100 + * this.getSearchbpJVMTaskCancellationCount() + / this.getSearchbpTaskCancellationCount(); + } + + public boolean hasValues() { + return this.getUsedHeap() != 0 && this.getMaxHeap() != 0; + } + + @Override + public String toString() { + return "HeapMetric{" + + "usedHeap=" + + usedHeap + + ", maxHeap=" + + maxHeap + + ", searchbpShardCancellationCount=" + + searchbpShardCancellationCount + + ", searchbpTaskCancellationCount=" + + searchbpTaskCancellationCount + + ", searchbpJVMShardCancellationCount=" + + searchbpJVMShardCancellationCount + + ", searchbpJVMTaskCancellationCount=" + + searchbpJVMTaskCancellationCount + + '}'; + } +} diff --git a/src/main/proto/inter_node_rpc_service.proto b/src/main/proto/inter_node_rpc_service.proto index fe5c864c9..89ea45d93 100644 --- a/src/main/proto/inter_node_rpc_service.proto +++ b/src/main/proto/inter_node_rpc_service.proto @@ -77,6 +77,9 @@ enum ResourceEnum { // Heap HEAP = 20 [(additional_fields).name = "heap"]; + // Search Back Pressure + SEARCHBP = 21 [(additional_fields).name = "search back pressure"]; + } enum MetricEnum { @@ -106,6 +109,9 @@ enum MetricEnum { OLD_GEN_USAGE_AFTER_FULL_GC = 31 [(additional_fields).name = "full gc", (additional_fields).description = "old gen usage after full gc in mb"]; // GC FULL_GC = 32 [(additional_fields).name = "full gc", (additional_fields).description = "full gc pause time in ms"]; + // Searchbp + SEARCHBP_SHARD = 33 [(additional_fields).name = "searchbackpressure shard", (additional_fields).description = "default value to indicate an unhealthy resource unit is from shard-level cancellation"]; + SEARCHBP_TASK = 34 [(additional_fields).name = "searchbackpressure task", (additional_fields).description = "default value to indicate an unhealthy resource unit is from task-level cancellation"]; } /* diff --git a/src/test/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRcaTest.java b/src/test/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRcaTest.java new file mode 100644 index 000000000..f371064e9 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRcaTest.java @@ -0,0 +1,439 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_SHARD; +import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_TASK; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.IntStream; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; +import org.opensearch.performanceanalyzer.metricsdb.MetricsDB; +import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; +import org.opensearch.performanceanalyzer.rca.framework.api.Metric; +import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; +import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import org.opensearch.performanceanalyzer.rca.framework.api.metrics.MetricTestHelper; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; + +public class SearchBackPressureRcaTest { + // Mock Metrics + @Mock private Metric mockHeapUsed; + + @Mock private Metric mockHeapMax; + + @Mock private Metric mockGcType; + + @Mock private Metric mockSearchbpStats; + + // every 5s operate() gets initiated + private static final int RCA_PERIOD = 5; + + private SearchBackPressureRCA testRca; + private MetricTestHelper metricTestHelper; + private static final double DEFAULT_MAX_HEAP_SIZE = 4294967296.0; + + // mock heap metric columns + private final List heapTableColumns = + Arrays.asList( + AllMetrics.HeapDimension.MEM_TYPE.toString(), + MetricsDB.SUM, + MetricsDB.AVG, + MetricsDB.MIN, + MetricsDB.MAX); + + // mock search back pressure metric columns + private final List searchbpTableColumns = + Arrays.asList( + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM.toString(), + MetricsDB.SUM, + MetricsDB.AVG, + MetricsDB.MIN, + MetricsDB.MAX); + + // dummy field to create a mock gcType Metric + private static final String CMS_COLLECTOR = "ConcurrentMarkSweep"; + + /* + * initialization before running any test + * + */ + @Before + public void setup() throws Exception { + initMocks(this); + this.metricTestHelper = new MetricTestHelper(RCA_PERIOD); + setupMockHeapMetric(mockHeapUsed, 80.0); + setupMockHeapMetric(mockHeapMax, 100.0); + + // set up SearchBp_Stats table + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 7.0); + + this.testRca = + new SearchBackPressureRCA(RCA_PERIOD, mockHeapMax, mockHeapUsed, mockSearchbpStats); + } + + /* + * Test SearchBackPressure RCA returns empty resourceFlowUnit if counter is less than the rcaPeriod + */ + @Test + public void testSearchBpGetResourceContextLessRcaPeriod() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.8); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 7.0); + + ResourceFlowUnit flowUnit = testRca.operate(); + + // counter = 1 + // counter needs to equal to RcaPeriod (5 in this case) to get nonempty resourceflowunit + assertTrue(flowUnit.isEmpty()); + } + + /* + * Test SearchBackPressure RCA returns nonempty resourceFlowUnit if counter equals to rcaPeriod + */ + @Test + public void testSearchBpGetResourceContextEqualRcaPeriod() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.8); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 7.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + + // counter = RCA_PERIOD + // counter needs to equal to RcaPeriod (5 in this case) to get nonempty resourceflowunit + assertFalse(flowUnit.isEmpty()); + } + + /* + * Test SearchBackPressure RCA returns healthy nonempty flow units if the settings does not trigger autotune + * Meeting None of Increasing or Decreasing Threshold for both shard/task level + */ + @Test + public void testSearchBpGetHealthyFlowUnit() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.8); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 7.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + assertFalse(flowUnit.isEmpty()); + assertTrue(flowUnit.getResourceContext().isHealthy()); + } + + /* + * Test SearchBackPressure RCA returns unhealthy nonempty flow units if the settings does trigger autotune by increasing threshold + * Increasing threshold: + * node max heap usage in last 60 secs is less than 70% + * cancellationCount due to heap is more than 50% of all task cancellations (Shard-Level) + */ + @Test + public void testSearchBpGetUnHealthyFlowUnitByShardIncreaseThreshold() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.3); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 4.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + assertFalse(flowUnit.isEmpty()); + assertFalse(flowUnit.getResourceContext().isHealthy()); + } + + /* + * Test SearchBackPressure RCA returns unhealthy nonempty flow units if the settings does trigger autotune by increasing threshold + * Increasing threshold: + * node max heap usage in last 60 secs is less than 70% + * cancellationCount due to heap is more than 50% of all task cancellations (Task-Level). + */ + @Test + public void testSearchBpGetUnHealthyFlowUnitByTaskIncreaseThreshold() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.3); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 4.0, 8.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + assertFalse(flowUnit.isEmpty()); + assertFalse(flowUnit.getResourceContext().isHealthy()); + } + + /* + * Test SearchBackPressure RCA returns unhealthy nonempty flow units if the settings does trigger autotune by decreasing threshold + * decreasing threshold: + * node min heap usage in last 60 secs is more than 80% + * cancellationCount due to heap is less than 30% of all task cancellations (Shard-Level) + */ + @Test + public void testSearchBpGetUnHealthyFlowUnitByShardDecreaseThreshold() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.9); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 2.0, 8.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + assertFalse(flowUnit.isEmpty()); + assertFalse(flowUnit.getResourceContext().isHealthy()); + } + + /* + * Test SearchBackPressure RCA returns unhealthy nonempty flow units if the settings does trigger autotune by decreasing threshold + * decreasing threshold: + * node min heap usage in last 60 secs is more than 80% + * cancellationCount due to heap is less than 30% of all task cancellations (Task-Level) + */ + @Test + public void testSearchBpGetUnHealthyFlowUnitByTaskDecreaseThreshold() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.9); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 2.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + assertFalse(flowUnit.isEmpty()); + assertFalse(flowUnit.getResourceContext().isHealthy()); + } + + /* + * Test SearchBackPressure RCA returns unhealthy nonempty flow units with a HotResourceSummary of SEARCHBACKPRESSURE_SHARD Resource + * indicating the autotune (unhealthy resource unit) is caused by meeting the threshold in shard-level in decrease threshold + * decreasing threshold: + * node min heap usage in last 60 secs is more than 80% + * cancellationCount due to heap is less than 30% of all task cancellations (Shard-Level) + */ + @Test + public void testSearchBpGetUnHealthyFlowUnitInShardLevelByDecreaseThreshold() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.95); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 2.0, 8.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + assertFalse(flowUnit.isEmpty()); + assertFalse(flowUnit.getResourceContext().isHealthy()); + + HotNodeSummary hotNodeSummary = flowUnit.getSummary(); + List hotResourceSummaries = hotNodeSummary.getHotResourceSummaryList(); + boolean found_shard_resource = + hotResourceSummaries.stream() + .anyMatch( + hotResourceSummary -> + (hotResourceSummary.getResource() + == SEARCHBACKPRESSURE_SHARD) + && (hotResourceSummary.getMetaData() + == SearchBackPressureRcaConfig + .DECREASE_THRESHOLD_BY_JVM_STR)); + + assertTrue(found_shard_resource); + } + + /* + * Test SearchBackPressure RCA returns unhealthy nonempty flow units with a HotResourceSummary of SEARCHBACKPRESSURE_SHARD Resource + * indicating the autotune (unhealthy resource unit) is caused by meeting the threshold in shard-level in increase threshold + * Increasing threshold: + * node max heap usage in last 60 secs is less than 70% + * cancellationCount due to heap is more than 50% of all task cancellations (Shard-Level) + */ + @Test + public void testSearchBpGetUnHealthyFlowUnitInShardLevelByIncreaseThreshold() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.5); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 2.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + assertFalse(flowUnit.isEmpty()); + assertFalse(flowUnit.getResourceContext().isHealthy()); + + HotNodeSummary hotNodeSummary = flowUnit.getSummary(); + List hotResourceSummaries = hotNodeSummary.getHotResourceSummaryList(); + boolean found_shard_resource_and_increase_metadata = + hotResourceSummaries.stream() + .anyMatch( + hotResourceSummary -> + (hotResourceSummary.getResource() + == SEARCHBACKPRESSURE_SHARD) + && (hotResourceSummary.getMetaData() + == SearchBackPressureRcaConfig + .INCREASE_THRESHOLD_BY_JVM_STR)); + + assertTrue(found_shard_resource_and_increase_metadata); + } + + /* + * Test SearchBackPressure RCA returns unhealthy nonempty flow units with a HotResourceSummary of SEARCHBACKPRESSURE_SHARD Resource + * indicating the autotune (unhealthy resource unit) is caused by meeting the threshold in task-level + * decreasing threshold: + * node min heap usage in last 60 secs is more than 80% + * cancellationCount due to heap is less than 30% of all task cancellations (Task-Level) + */ + @Test + public void testSearchBpGetUnHealthyFlowUnitInTaskLevelByDecreaseThreshold() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.9); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 2.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + assertFalse(flowUnit.isEmpty()); + assertFalse(flowUnit.getResourceContext().isHealthy()); + + HotNodeSummary hotNodeSummary = flowUnit.getSummary(); + List hotResourceSummaries = hotNodeSummary.getHotResourceSummaryList(); + boolean found_task_resource = + hotResourceSummaries.stream() + .anyMatch( + hotResourceSummary -> + (hotResourceSummary.getResource() + == SEARCHBACKPRESSURE_TASK) + && (hotResourceSummary.getMetaData() + == SearchBackPressureRcaConfig + .DECREASE_THRESHOLD_BY_JVM_STR)); + + assertTrue(found_task_resource); + } + + /* + * Test SearchBackPressure RCA returns unhealthy nonempty flow units with a HotResourceSummary of SEARCHBACKPRESSURE_SHARD Resource + * indicating the autotune (unhealthy resource unit) is caused by meeting the threshold in shard-level + * Increasing threshold: + * node max heap usage in last 60 secs is less than 70% + * cancellationCount due to heap is more than 50% of all task cancellations (Task-Level) + */ + @Test + public void testSearchBpGetUnHealthyFlowUnitInTaskLevelByIncreaseThreshold() { + setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); + setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.5); + setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 2.0, 8.0); + IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); + + ResourceFlowUnit flowUnit = testRca.operate(); + assertFalse(flowUnit.isEmpty()); + assertFalse(flowUnit.getResourceContext().isHealthy()); + + HotNodeSummary hotNodeSummary = flowUnit.getSummary(); + List hotResourceSummaries = hotNodeSummary.getHotResourceSummaryList(); + boolean found_task_resource = + hotResourceSummaries.stream() + .anyMatch( + hotResourceSummary -> + (hotResourceSummary.getResource() + == SEARCHBACKPRESSURE_TASK) + && (hotResourceSummary.getMetaData() + == SearchBackPressureRcaConfig + .INCREASE_THRESHOLD_BY_JVM_STR)); + + assertTrue(found_task_resource); + } + + private void setupMockHeapMetric(final Metric metric, final double val) { + String valString = Double.toString(val); + List data = + Arrays.asList( + AllMetrics.GCType.HEAP.toString(), + valString, + valString, + valString, + valString); + when(metric.getFlowUnits()) + .thenReturn( + Collections.singletonList( + new MetricFlowUnit( + 0, + metricTestHelper.createTestResult( + heapTableColumns, data)))); + } + + private void setupMockSearchbpStats( + final Metric metric, + final double searchbpShardCancellationCount, + final double searchbpTaskCancellationCount, + final double searchbpJVMShardCancellationCount, + final double searchbpJVMTaskCancellationCount) { + String searchbpShardCancellationCountStr = Double.toString(searchbpShardCancellationCount); + String searchbpTaskCancellationCountStr = Double.toString(searchbpTaskCancellationCount); + String searchbpJVMShardCancellationCountStr = + Double.toString(searchbpJVMShardCancellationCount); + String searchbpJVMTaskCancellationCountStr = + Double.toString(searchbpJVMTaskCancellationCount); + + // add searchbpShardCancellationCountStr row + List searchbpShardCancellationCountRow = + Arrays.asList( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString(), + searchbpShardCancellationCountStr, + searchbpShardCancellationCountStr, + searchbpShardCancellationCountStr, + searchbpShardCancellationCountStr); + + // add searchbpTaskCancellationCountStr row + List searchbpTaskCancellationCountRow = + Arrays.asList( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString(), + searchbpTaskCancellationCountStr, + searchbpTaskCancellationCountStr, + searchbpTaskCancellationCountStr, + searchbpTaskCancellationCountStr); + + // add searchbpJVMShardCancellationCountStr row + List searchbpJVMShardCancellationCountRow = + Arrays.asList( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString(), + searchbpJVMShardCancellationCountStr, + searchbpJVMShardCancellationCountStr, + searchbpJVMShardCancellationCountStr, + searchbpJVMShardCancellationCountStr); + + // add searchbpJVMTaskCancellationCountStr row + List searchbpJVMTaskCancellationCountRow = + Arrays.asList( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString(), + searchbpJVMTaskCancellationCountStr, + searchbpJVMTaskCancellationCountStr, + searchbpJVMTaskCancellationCountStr, + searchbpJVMTaskCancellationCountStr); + + List flowUnits = + Arrays.asList( + new MetricFlowUnit( + 0, + metricTestHelper.createTestResult( + searchbpTableColumns, searchbpShardCancellationCountRow)), + new MetricFlowUnit( + 0, + metricTestHelper.createTestResult( + searchbpTableColumns, searchbpTaskCancellationCountRow)), + new MetricFlowUnit( + 0, + metricTestHelper.createTestResult( + searchbpTableColumns, + searchbpJVMShardCancellationCountRow)), + new MetricFlowUnit( + 0, + metricTestHelper.createTestResult( + searchbpTableColumns, + searchbpJVMTaskCancellationCountRow))); + + when(metric.getFlowUnits()).thenReturn(flowUnits); + } +}