Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus configurable queries #2127

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/prometheus-query.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# This file contains all the custom prometheus queries
# This is an example property file for Kafka Cruise Control configurable prometheus queries.


# =======================================
# This must define all the queries for the different RawMetricType needed.
# based on the schema <RAW_METRIC_TYPE>=query

BROKER_CPU_UTIL=1 - avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[2m]))
# ..
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus;

import com.linkedin.cruisecontrol.common.CruiseControlConfigurable;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.PrometheusMetricSampler.PROMETHEUS_QUERY_FILE_CONFIG;

/**
* Configurable prometheus query supplier. This needs a configuration file to specify the different
* prometheus metrics.
* <p>
* See {@link PrometheusQuerySupplier}
*/
public class ConfigurablePrometheusQuerySupplier implements CruiseControlConfigurable, PrometheusQuerySupplier {
private static final Map<RawMetricType, String> TYPE_TO_QUERY = new HashMap<>();

@Override
public Map<RawMetricType, String> get() {
return TYPE_TO_QUERY;
}

@Override
public void configure(Map<String, ?> configs) {
String configFileName = validateNotNull((String) configs.get(PROMETHEUS_QUERY_FILE_CONFIG),
"Prometheus configuration file is missing.");

internalParse(configFileName);
}

/**
* Parse the config file to fill in the map
* @param configFileName the name of the input config file
*/
private void internalParse(String configFileName) {
Properties props = new Properties();
try (InputStream propStream = new FileInputStream(configFileName)) {
props.load(propStream);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}

// load each entry of the properties into the internal map
props.forEach((key, value) -> loadEntry((String) key, (String) value));
}

private void loadEntry(String rawMetricTypeName, String query) {
// will throw a IllegalArgumentException if the name is unknown in the RawMetricType enum
TYPE_TO_QUERY.put(RawMetricType.valueOf(rawMetricTypeName), query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public class PrometheusMetricSampler extends AbstractMetricSampler {

// Config name visible to tests
static final String PROMETHEUS_QUERY_SUPPLIER_CONFIG = "prometheus.query.supplier";
static final String PROMETHEUS_QUERY_FILE_CONFIG = "prometheus.query.file";

private static final Class<?> DEFAULT_PROMETHEUS_QUERY_SUPPLIER = DefaultPrometheusQuerySupplier.class;
private static final Class<?> PROMETHEUS_QUERY_FILE_SUPPLIER = ConfigurablePrometheusQuerySupplier.class;

private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricSampler.class);

Expand Down Expand Up @@ -120,15 +123,21 @@ private void configurePrometheusAdapter(Map<String, ?> configs) {
}

private void configureQueryMap(Map<String, ?> configs) {
String prometheusQueryFileName = (String) configs.get(PROMETHEUS_QUERY_FILE_CONFIG);
String prometheusQuerySupplierClassName = (String) configs.get(PROMETHEUS_QUERY_SUPPLIER_CONFIG);
Class<?> prometheusQuerySupplierClass = DEFAULT_PROMETHEUS_QUERY_SUPPLIER;
if (prometheusQuerySupplierClassName != null) {
prometheusQuerySupplierClass = (Class<?>) ConfigDef.parseType(PROMETHEUS_QUERY_SUPPLIER_CONFIG,
prometheusQuerySupplierClassName, CLASS);
if (!PrometheusQuerySupplier.class.isAssignableFrom(prometheusQuerySupplierClass)) {
throw new ConfigException(String.format(
"Invalid %s is provided to prometheus metric sampler, provided %s",
PROMETHEUS_QUERY_SUPPLIER_CONFIG, prometheusQuerySupplierClass));
// prometheus configuration file is first over other custom or default query supplier class
if (null != prometheusQueryFileName) {
prometheusQuerySupplierClass = PROMETHEUS_QUERY_FILE_SUPPLIER;
} else {
if (prometheusQuerySupplierClassName != null) {
prometheusQuerySupplierClass = (Class<?>) ConfigDef.parseType(PROMETHEUS_QUERY_SUPPLIER_CONFIG,
prometheusQuerySupplierClassName, CLASS);
if (!PrometheusQuerySupplier.class.isAssignableFrom(prometheusQuerySupplierClass)) {
throw new ConfigException(String.format(
"Invalid %s is provided to prometheus metric sampler, provided %s",
PROMETHEUS_QUERY_SUPPLIER_CONFIG, prometheusQuerySupplierClass));
}
}
}
PrometheusQuerySupplier prometheusQuerySupplier = KafkaCruiseControlConfigUtils.getConfiguredInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,44 @@ public void testGetSamplesPrometheusQuerySupplierInvalidClass() throws Exception
_prometheusMetricSampler.configure(config);
}

@Test
public void testGetSamplesCustomPrometheusQueryFile() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "http://kafka-cluster-1.org:9090");
addCapacityConfig(config);
config.put(PROMETHEUS_QUERY_FILE_CONFIG, this.getClass().getClassLoader().getResource("prometheusQueriesTest.properties").getFile());
_prometheusMetricSampler.configure(config);

MetricSamplerOptions metricSamplerOptions = buildMetricSamplerOptions(TEST_TOPIC);
_prometheusMetricSampler._prometheusAdapter = _prometheusAdapter;

expect(_prometheusAdapter.queryMetric(eq(TestQuerySupplier.TEST_QUERY), anyLong(), anyLong()))
.andReturn(buildBrokerResults());
replay(_prometheusAdapter);

_prometheusMetricSampler.getSamples(metricSamplerOptions);

verify(_prometheusAdapter);
}

@Test(expected = IllegalArgumentException.class)
public void testGetSamplesCustomPrometheusQueryFileNotFoundFile() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "http://kafka-cluster-1.org:9090");
addCapacityConfig(config);
config.put(PROMETHEUS_QUERY_FILE_CONFIG, "/a/b/file.properties");
_prometheusMetricSampler.configure(config);
}

@Test(expected = IllegalArgumentException.class)
public void testGetSamplesCustomPrometheusQueryFileUnknownMetricType() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "http://kafka-cluster-1.org:9090");
addCapacityConfig(config);
config.put(PROMETHEUS_QUERY_FILE_CONFIG, this.getClass().getClassLoader().getResource("prometheusQueriesTestFailing.properties").getFile());
_prometheusMetricSampler.configure(config);
}

private static MetricSamplerOptions buildMetricSamplerOptions(String topic) {

return new MetricSamplerOptions(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALL_TOPIC_BYTES_IN=test_query
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALL_TOPIC_BYTES_IN=test_query
Unknown="I don't know"
1 change: 1 addition & 0 deletions docs/wiki/User Guide/Configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ We are still trying to improve cruise control. And following are some configurat
| prometheus.server.endpoint | String | Y | | The HTTP endpoint of the Prometheus server which is to be used as a source for sampling metrics. |
| prometheus.query.resolution.step.ms | Integer | N | 60,000 | The resolution of the Prometheus query made to the server. If this is set to 30 seconds for a 2 minutes query interval, the query returns with 4 values, which are then aggregated into the metric sample. |
| prometheus.query.supplier | Class | N | com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.DefaultPrometheusQuerySupplier | The class that supplies the Prometheus queries corresponding to Kafka raw metrics. If there are no customizations done when configuring Prometheus node exporter, the default class should work fine. |
| prometheus.query.file | String | N | | The configuration file supplying the custom Prometheus queries corresponding to Kafka raw metrics. Takes precedence over prometheus.query.supplier. |
| prometheus.broker.metrics.scraping.frequency.seconds | Integer | N | 60 | The scraping frequency with which Prometheus scrapes metrics from Kafka brokers. This value is used by DefaultPrometheusQuerySupplier to construct the iRate query that is used to get broker cpu metrics. |
### KafkaSampleStore configurations
| Name | Type | Required? | Default Value | Description |
Expand Down