Skip to content

Commit

Permalink
Fixes the Gauge instrument to be created only once (#756)
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gaganjuneja and Gagan Juneja authored Oct 31, 2024
1 parent 12edf69 commit 6c5b932
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Closeable;
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand All @@ -34,10 +35,12 @@

public class RTFCacheConfigMetricsCollector extends PerformanceAnalyzerMetricsCollector
implements TelemetryCollector {
private MetricsRegistry metricsRegistry;
private static final Logger LOG = LogManager.getLogger(RTFCacheConfigMetricsCollector.class);
private PerformanceAnalyzerController performanceAnalyzerController;
private ConfigOverridesWrapper configOverridesWrapper;
private Closeable fieldDataCacheGauge;
private Closeable requestCacheGauge;
private boolean metricsInitialised;

public RTFCacheConfigMetricsCollector(
PerformanceAnalyzerController performanceAnalyzerController,
Expand All @@ -56,11 +59,11 @@ public RTFCacheConfigMetricsCollector(
public void collectMetrics(long l) {
if (performanceAnalyzerController.isCollectorDisabled(
configOverridesWrapper, getCollectorName())) {
closeOpenGaugeObservablesIfAny();
LOG.info("RTFCacheConfigMetricsCollector is disabled. Skipping collection.");
return;
}

metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry == null) {
LOG.error("could not get the instance of MetricsRegistry class");
return;
Expand All @@ -71,33 +74,59 @@ configOverridesWrapper, getCollectorName())) {
LOG.error("could not get the instance of indicesService class");
return;
}

LOG.debug("Executing collect metrics for RTFCacheConfigMetricsCollector");
CacheMaxSizeStatus fieldDataCacheMaxSizeStatus =
AccessController.doPrivileged(
(PrivilegedAction<CacheMaxSizeStatus>)
() -> {
try {
Cache fieldDataCache =
indicesService
.getIndicesFieldDataCache()
.getCache();
long fieldDataMaxSize =
(Long)
FieldUtils.readField(
fieldDataCache,
CACHE_MAX_WEIGHT,
true);
return new CacheMaxSizeStatus(
FIELD_DATA_CACHE.toString(), fieldDataMaxSize);
} catch (Exception e) {
LOG.debug(
"Error occurred while fetching fieldDataCacheMaxSizeStatus: "
+ e.getMessage());
return null;
}
});
initialiseMetricsIfNeeded(metricsRegistry, indicesService);
}

private void initialiseMetricsIfNeeded(
MetricsRegistry metricsRegistry, IndicesService indicesService) {
if (!metricsInitialised) {
fieldDataCacheGauge =
metricsRegistry.createGauge(
RTFMetrics.CacheConfigValue.Constants.CACHE_MAX_SIZE_VALUE,
"Cache Max Size metrics",
RTFMetrics.MetricUnits.BYTE.toString(),
() -> getFieldCacheMaxSizeStatus(indicesService),
Tags.create()
.addTag(
RTFMetrics.CacheConfigDimension.Constants.TYPE_VALUE,
FIELD_DATA_CACHE.toString()));
requestCacheGauge =
metricsRegistry.createGauge(
RTFMetrics.CacheConfigValue.Constants.CACHE_MAX_SIZE_VALUE,
"Cache Max Size metrics",
RTFMetrics.MetricUnits.BYTE.toString(),
() -> getRequestCacheMaxSizeStatus(indicesService),
Tags.create()
.addTag(
RTFMetrics.CacheConfigDimension.Constants.TYPE_VALUE,
SHARD_REQUEST_CACHE.toString()));
metricsInitialised = true;
}
}

private void closeOpenGaugeObservablesIfAny() {
if (fieldDataCacheGauge != null) {
try {
fieldDataCacheGauge.close();
} catch (Exception e) {
LOG.error("Unable to close the fieldDataCacheGauge observable");
} finally {
fieldDataCacheGauge = null;
}
}
if (requestCacheGauge != null) {
try {
requestCacheGauge.close();
} catch (Exception e) {
LOG.error("Unable to close the fieldDataCacheGauge observable");
} finally {
requestCacheGauge = null;
}
}
}

private double getRequestCacheMaxSizeStatus(IndicesService indicesService) {
CacheMaxSizeStatus shardRequestCacheMaxSizeStatus =
AccessController.doPrivileged(
(PrivilegedAction<CacheMaxSizeStatus>)
Expand Down Expand Up @@ -132,28 +161,45 @@ configOverridesWrapper, getCollectorName())) {
return null;
}
});

if (fieldDataCacheMaxSizeStatus != null
&& fieldDataCacheMaxSizeStatus.getCacheMaxSize() > 0) {
recordMetrics(fieldDataCacheMaxSizeStatus);
}

if (shardRequestCacheMaxSizeStatus != null
&& shardRequestCacheMaxSizeStatus.getCacheMaxSize() > 0) {
recordMetrics(shardRequestCacheMaxSizeStatus);
return shardRequestCacheMaxSizeStatus.getCacheMaxSize();
} else {
return 0.0;
}
}

private void recordMetrics(CacheMaxSizeStatus cacheMaxSizeStatus) {
metricsRegistry.createGauge(
RTFMetrics.CacheConfigValue.Constants.CACHE_MAX_SIZE_VALUE,
"Cache Max Size metrics",
RTFMetrics.MetricUnits.BYTE.toString(),
() -> (double) cacheMaxSizeStatus.getCacheMaxSize(),
Tags.create()
.addTag(
RTFMetrics.CacheConfigDimension.Constants.TYPE_VALUE,
cacheMaxSizeStatus.getCacheType()));
private static double getFieldCacheMaxSizeStatus(IndicesService indicesService) {
CacheMaxSizeStatus fieldDataCacheMaxSizeStatus =
AccessController.doPrivileged(
(PrivilegedAction<CacheMaxSizeStatus>)
() -> {
try {
Cache fieldDataCache =
indicesService
.getIndicesFieldDataCache()
.getCache();
long fieldDataMaxSize =
(Long)
FieldUtils.readField(
fieldDataCache,
CACHE_MAX_WEIGHT,
true);
return new CacheMaxSizeStatus(
FIELD_DATA_CACHE.toString(), fieldDataMaxSize);
} catch (Exception e) {
LOG.debug(
"Error occurred while fetching fieldDataCacheMaxSizeStatus: "
+ e.getMessage());
return null;
}
});
if (fieldDataCacheMaxSizeStatus != null
&& fieldDataCacheMaxSizeStatus.getCacheMaxSize() > 0) {
return fieldDataCacheMaxSizeStatus.getCacheMaxSize();
} else {
return 0.0;
}
}

static class CacheMaxSizeStatus extends MetricStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.performanceanalyzer.collectors.telemetry;

import java.io.Closeable;
import java.lang.management.MemoryUsage;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -38,6 +40,7 @@ public class RTFHeapMetricsCollector extends PerformanceAnalyzerMetricsCollector
private boolean metricsInitialised;
private PerformanceAnalyzerController performanceAnalyzerController;
private ConfigOverridesWrapper configOverridesWrapper;
private Map<String, Closeable> memTypeToGaugeObservableMap;

public RTFHeapMetricsCollector(
PerformanceAnalyzerController performanceAnalyzerController,
Expand All @@ -50,13 +53,15 @@ public RTFHeapMetricsCollector(
this.metricsInitialised = false;
this.performanceAnalyzerController = performanceAnalyzerController;
this.configOverridesWrapper = configOverridesWrapper;
this.memTypeToGaugeObservableMap = new HashMap<>();
}

@Override
public void collectMetrics(long startTime) {
if (performanceAnalyzerController.isCollectorDisabled(
configOverridesWrapper, getCollectorName())) {
LOG.info("RTFDisksCollector is disabled. Skipping collection.");
closeOpenGaugeObservablesIfAny();
return;
}

Expand All @@ -72,6 +77,21 @@ configOverridesWrapper, getCollectorName())) {
recordMetrics();
}

private void closeOpenGaugeObservablesIfAny() {
for (String key : memTypeToGaugeObservableMap.keySet()) {
if (memTypeToGaugeObservableMap.containsKey(key)) {
try {
Closeable observableGauge = memTypeToGaugeObservableMap.remove(key);
if (observableGauge != null) {
observableGauge.close();
}
} catch (Exception e) {
LOG.error("Unable to close the observable gauge for key {}", key);
}
}
}
}

private void initialiseMetricsIfNeeded() {
if (metricsInitialised == false) {
gcCollectionEventMetrics =
Expand All @@ -91,6 +111,7 @@ private void initialiseMetricsIfNeeded() {
RTFMetrics.HeapValue.Constants.USED_VALUE,
"GC Heap Used PA Metrics",
RTFMetrics.MetricUnits.BYTE.toString());

metricsInitialised = true;
}
}
Expand Down Expand Up @@ -119,12 +140,31 @@ private void recordMetrics() {
heapUsedMetrics.record(
memoryUsage.getUsed(),
Tags.create().addTag(memTypeAttributeKey, entry.getKey()));
metricsRegistry.createGauge(
RTFMetrics.HeapValue.Constants.MAX_VALUE,
"Heap Max PA metrics",
"",
() -> (double) memoryUsage.getMax(),
Tags.create().addTag(memTypeAttributeKey, entry.getKey()));
createGaugeInstanceIfNotAvailable(entry.getKey());
}
}

private void createGaugeInstanceIfNotAvailable(String key) {
if (!memTypeToGaugeObservableMap.containsKey(key)) {
LOG.info("Gauge doesn't exist for the mem type {}", key);
Closeable observableGauge =
metricsRegistry.createGauge(
RTFMetrics.HeapValue.Constants.MAX_VALUE,
"Heap Max PA metrics",
"",
() -> getValue(key),
Tags.create().addTag(memTypeAttributeKey, key));
memTypeToGaugeObservableMap.put(key, observableGauge);
}
}

private double getValue(String key) {
Map<String, Supplier<MemoryUsage>> memoryUsageSuppliers =
HeapMetrics.getMemoryUsageSuppliers();
MemoryUsage memoryUsage = null;
if (memoryUsageSuppliers.get(key) != null) {
memoryUsage = memoryUsageSuppliers.get(key).get();
}
return memoryUsage != null ? memoryUsage.getMax() : 0.0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

Expand All @@ -29,13 +30,15 @@ public class RTFCacheConfigMetricsCollectorTests extends OpenSearchSingleNodeTes
private static final String TEST_INDEX = "test";
private RTFCacheConfigMetricsCollector rtfCacheConfigMetricsCollector;
private static MetricsRegistry metricsRegistry;
private static MetricsRegistry metricsRegistry1;
private long startTimeInMills = 1153721339;

@Before
public void init() {
MetricsConfiguration.CONFIG_MAP.put(
RTFCacheConfigMetricsCollector.class, MetricsConfiguration.cdefault);
metricsRegistry = mock(MetricsRegistry.class);
metricsRegistry1 = mock(MetricsRegistry.class);
OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
OpenSearchResources.INSTANCE.setIndicesService(indicesService);
Expand All @@ -58,4 +61,17 @@ public void testCollectMetrics() throws IOException {
verify(metricsRegistry, atLeastOnce())
.createGauge(anyString(), anyString(), anyString(), any(), any());
}

@Test
public void testCollectMetricsRepeated() throws IOException {
createIndex(TEST_INDEX);
rtfCacheConfigMetricsCollector.collectMetrics(startTimeInMills);
verify(metricsRegistry, atLeastOnce())
.createGauge(anyString(), anyString(), anyString(), any(), any());

OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry1);
rtfCacheConfigMetricsCollector.collectMetrics(startTimeInMills);
verify(metricsRegistry1, never())
.createGauge(anyString(), anyString(), anyString(), any(), any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class RTFHeapMetricsCollectorTests extends CollectorTestBase {
private RTFHeapMetricsCollector rtfHeapMetricsCollector;

private static MetricsRegistry metricsRegistry;
private static MetricsRegistry metricsRegistry1;
private static Histogram gcCollectionEventHistogram;
private static Histogram gcCollectionTimeHistogram;
private static Histogram heapUsedHistogram;
Expand All @@ -35,6 +36,7 @@ public void init() {
RTFHeapMetricsCollector.class, MetricsConfiguration.cdefault);

metricsRegistry = mock(MetricsRegistry.class);
metricsRegistry1 = mock(MetricsRegistry.class);
gcCollectionEventHistogram = mock(Histogram.class);
gcCollectionTimeHistogram = mock(Histogram.class);
heapUsedHistogram = mock(Histogram.class);
Expand Down Expand Up @@ -65,4 +67,20 @@ public void testCollectMetrics() throws IOException {
verify(metricsRegistry, atLeastOnce())
.createGauge(anyString(), anyString(), anyString(), any(), any());
}

@Test
public void testCollectMetricsRepeated() throws IOException {

rtfHeapMetricsCollector.collectMetrics(System.currentTimeMillis());
verify(heapUsedHistogram, atLeastOnce()).record(anyDouble(), any());
verify(gcCollectionTimeHistogram, atLeastOnce()).record(anyDouble(), any());
verify(gcCollectionEventHistogram, atLeastOnce()).record(anyDouble(), any());
verify(metricsRegistry, atLeastOnce())
.createGauge(anyString(), anyString(), anyString(), any(), any());

OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry1);
rtfHeapMetricsCollector.collectMetrics(System.currentTimeMillis());
verify(metricsRegistry1, never())
.createGauge(anyString(), anyString(), anyString(), any(), any());
}
}

0 comments on commit 6c5b932

Please sign in to comment.