Skip to content

Commit

Permalink
Metrics via API, per-cluster Prometheus Config, OCP monitoring support (
Browse files Browse the repository at this point in the history
#1155)

* Metrics via API, per-cluster Prometheus Config, OCP monitoring support
* Add unit/integration tests
* Minor UI update for missing metrics
* Clean up CRD metrics types and add validation rule, tests
* Resolve Sonar issues
* Docs, examples, clean-up remaining use of metrics environment variable
* PR feedback: better arg name for `uniqueNames`, rm unnecessary `Named`

---------

Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar authored Nov 14, 2024
1 parent 0be5bde commit 81c6706
Show file tree
Hide file tree
Showing 82 changed files with 2,216 additions and 1,002 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/playwright-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ jobs:
kubectl get all,subscriptions,csv,operatorgroups,installplans -n operators -o yaml > ./resources/operators.yaml
kubectl logs -n operators -l app.kubernetes.io/name=console-operator --all-containers=true --tail -1 > ./resources/console-operator-logs.txt
kubectl get all -n $TARGET_NAMESPACE -o yaml > ./resources/$TARGET_NAMESPACE.yaml
kubectl logs -n ${TARGET_NAMESPACE} -l app.kubernetes.io/instance=example-console-deployment --all-containers=true > ./resources/$TARGET_NAMESPACE-console-logs.txt
kubectl logs -n ${TARGET_NAMESPACE} -l app.kubernetes.io/instance=example-console-deployment --all-containers=true --tail -1 > ./resources/$TARGET_NAMESPACE-console-logs.txt
- name: Archive Resource Backup
uses: actions/upload-artifact@v4
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ jobs:
run: |
npm ci --omit=dev
export BACKEND_URL=http://example
export CONSOLE_METRICS_PROMETHEUS_URL=http://example
export NEXTAUTH_SECRET=examplesecret
export LOG_LEVEL=info
export CONSOLE_MODE=read-only
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ jobs:
run: |
npm ci --omit=dev
export BACKEND_URL=http://example
export CONSOLE_METRICS_PROMETHEUS_URL=http://example
export NEXTAUTH_SECRET=examplesecret
export LOG_LEVEL=info
export CONSOLE_MODE=read-only
Expand Down
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ ARCH ?= linux/amd64
SKIP_RANGE ?= ">=1.0.0 <1.0.3"

CONSOLE_UI_NEXTAUTH_SECRET ?= $(shell openssl rand -base64 32)
CONSOLE_METRICS_PROMETHEUS_URL ?=

container-image-api:
mvn package -am -pl api -Pcontainer-image -DskipTests -Dquarkus.container-image.image=$(CONSOLE_API_IMAGE)
Expand All @@ -43,7 +42,6 @@ container-image-ui:
cd ui && \
npm ci --omit=dev && \
export BACKEND_URL=http://example && \
export CONSOLE_METRICS_PROMETHEUS_URL=http://example && \
export NEXTAUTH_SECRET=examplesecret && \
export LOG_LEVEL=info && \
export CONSOLE_MODE=read-only && \
Expand All @@ -65,7 +63,6 @@ compose-up:
echo "CONSOLE_API_KUBERNETES_API_SERVER_URL=$(CONSOLE_API_KUBERNETES_API_SERVER_URL)" >> compose-runtime.env
echo "CONSOLE_UI_IMAGE=$(CONSOLE_UI_IMAGE)" >> compose-runtime.env
echo "CONSOLE_UI_NEXTAUTH_SECRET=$(CONSOLE_UI_NEXTAUTH_SECRET)" >> compose-runtime.env
echo "CONSOLE_METRICS_PROMETHEUS_URL=$(CONSOLE_METRICS_PROMETHEUS_URL)" >> compose-runtime.env
$(CONTAINER_RUNTIME) compose --env-file compose-runtime.env up -d

compose-down:
Expand Down
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ To ensure the console has the necessary access to function, a minimum level of a
3. `READ`, `DESCRIBE` for all `GROUP` resources

#### Prometheus
Prometheus is an optional dependency of the console if cluster metrics are to be displayed. The operator currently installs a private Prometheus instance for each `Console` instance. However, when installing a single console deployment, Prometheus must be either installed separately or provided via a URL reference. This will be addressed below in the section dealing with creating a console via a `Deployment`.
Prometheus is an optional dependency of the console if cluster metrics are to be displayed. The console supports gathering metrics in several ways.

- OpenShift-managed Prometheus instances. Monitoring of user-defined projects must be enabled in OpenShift.
- User-supplied Prometheus instances
- Private Prometheus instance for each `Console`. The operator creates a managed Prometheus deployment for use only by the console.

### Deploy the operator with OLM
The preferred way to deploy the console is using the Operator Lifecycle Manager, or OLM. The sample install files in `install/operator-olm` will install the operator with cluster-wide scope. This means that `Console` instances may be created in any namespace. If you wish to limit the scope of the operator, the `OperatorGroup` resource may be modified to specify only the namespace that should be watched by the operator.
Expand All @@ -68,11 +72,18 @@ kind: Console
metadata:
name: example
spec:
hostname: example-console.apps-crc.testing # Hostname where the console will be accessed via HTTPS
hostname: example-console.cloud.example.com # Hostname where the console will be accessed via HTTPS
metricsSources:
# A `standalone` Prometheus instance must already exist and be accessible from the console Pod
- name: custom-prometheus
type: standalone
url: https://custom-prometheus.cloud.example.com
# Prometheus API authentication may also be provided
kafkaClusters:
- name: console-kafka # Name of the `Kafka` CR representing the cluster
namespace: kafka # Namespace of the `Kafka` CR representing the cluster
listener: secure # Listener on the `Kafka` CR to connect from the console
metricsSource: custom-prometheus
properties:
values: [] # Array of name/value for properties to be used for connections
# made to this cluster
Expand Down Expand Up @@ -114,7 +125,6 @@ Running the console locally requires configuration of any Apache Kafka<sup>®</s
```
CONSOLE_API_SERVICE_ACCOUNT_TOKEN=<TOKEN>
CONSOLE_API_KUBERNETES_API_SERVER_URL=https://my-kubernetes-api.example.com:6443
CONSOLE_METRICS_PROMETHEUS_URL=http://console-prometheus.<your cluster base domain>
```
The service account token may be obtained using the `kubectl create token` command. For example, to create a service account named "console-server" with the correct permissions and a token that expires in 1 year ([yq](https://github.com/mikefarah/yq/releases) required):
```shell
Expand Down
8 changes: 4 additions & 4 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,10 @@
<quarkus.profile>build</quarkus.profile>
<quarkus.docker.dockerfile-jvm-path>src/main/docker/Dockerfile</quarkus.docker.dockerfile-jvm-path>
<quarkus.container-image.build>true</quarkus.container-image.build>
<quarkus.container-image.registry>${docker.registry}</quarkus.container-image.registry>
<quarkus.container-image.group>${docker.group}</quarkus.container-image.group>
<quarkus.container-image.tag>${docker.tag}</quarkus.container-image.tag>
<quarkus.container-image.push>${docker.push}</quarkus.container-image.push>
<quarkus.container-image.registry>${container-image.registry}</quarkus.container-image.registry>
<quarkus.container-image.group>${container-image.group}</quarkus.container-image.group>
<quarkus.container-image.tag>${container-image.tag}</quarkus.container-image.tag>
<quarkus.container-image.push>${container-image.push}</quarkus.container-image.push>
</properties>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.streamshub.console.api.service.MetricsService;
import com.github.streamshub.console.api.support.Holder;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
Expand Down Expand Up @@ -154,6 +155,9 @@ public class ClientFactory {
@Named("kafkaAdminFilter")
UnaryOperator<Admin> kafkaAdminFilter = UnaryOperator.identity();

@Inject
MetricsService metricsService;

@Produces
@ApplicationScoped
Map<String, KafkaContext> produceKafkaContexts(Function<Map<String, Object>, Admin> adminBuilder) {
Expand All @@ -168,7 +172,6 @@ Map<String, KafkaContext> produceKafkaContexts(Function<Map<String, Object>, Adm
consoleConfig.getKafka().getClusters()
.stream()
.filter(c -> cachedKafkaResource(c).isEmpty())
.filter(Predicate.not(KafkaClusterConfig::hasNamespace))
.forEach(clusterConfig -> putKafkaContext(contexts,
clusterConfig,
Optional.empty(),
Expand Down Expand Up @@ -303,6 +306,12 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
ctx.schemaRegistryClient(registryConfig, mapper);

if (clusterConfig.hasNamespace()) {
ctx.prometheus(metricsService.createClient(consoleConfig, clusterConfig));
} else if (clusterConfig.getMetricsSource() != null) {
log.infof("Skipping setup of metrics client for cluster %s. Reason: namespace is required for metrics retrieval but none was provided", clusterKey);
}

KafkaContext previous = contexts.put(clusterId, ctx);

if (previous == null) {
Expand All @@ -325,7 +334,7 @@ Optional<Kafka> cachedKafkaResource(KafkaClusterConfig clusterConfig) {
String key = clusterConfig.clusterKey();

if (kafkaInformer.isPresent()) {
log.warnf("Configuration references Kubernetes Kafka resource %s, but it was not found", key);
log.infof("Kafka resource %s not found in Kubernetes cluster", key);
} else {
log.warnf("Configuration references Kubernetes Kafka resource %s, but Kubernetes access is disabled", key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ static class Attributes {
@JsonProperty
boolean cruiseControlEnabled;

@JsonProperty
Metrics metrics = new Metrics();

Attributes(List<Node> nodes, Node controller, List<String> authorizedOperations) {
this.nodes = nodes;
this.controller = controller;
Expand Down Expand Up @@ -328,4 +331,12 @@ public void nodePools(List<String> nodePools) {
public void cruiseControlEnabled(boolean cruiseControlEnabled) {
attributes.cruiseControlEnabled = cruiseControlEnabled;
}

public Metrics metrics() {
return attributes.metrics;
}

public void metrics(Metrics metrics) {
attributes.metrics = metrics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.github.streamshub.console.api.model;

import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.eclipse.microprofile.openapi.annotations.media.Schema;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

public record Metrics(
@JsonProperty
Map<String, List<Metrics.ValueMetric>> values,

@JsonProperty
Map<String, List<Metrics.RangeMetric>> ranges) {

public Metrics() {
this(new LinkedHashMap<>(), new LinkedHashMap<>());
}

@Schema(additionalProperties = String.class)
public static record ValueMetric(
@JsonProperty
String value,

@JsonAnyGetter
@Schema(hidden = true)
Map<String, String> attributes) {
}

@Schema(additionalProperties = String.class)
public static record RangeMetric(
@JsonProperty
@Schema(implementation = String[][].class)
List<RangeEntry> range,

@JsonAnyGetter
@Schema(hidden = true)
Map<String, String> attributes) {
}

@JsonFormat(shape = JsonFormat.Shape.ARRAY)
@JsonPropertyOrder({"when", "value"})
public static record RangeEntry(Instant when, String value) {
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.github.streamshub.console.api.service;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -76,6 +80,9 @@ public class KafkaClusterService {
@Inject
ConsoleConfig consoleConfig;

@Inject
MetricsService metricsService;

@Inject
/**
* All Kafka contexts known to the application
Expand Down Expand Up @@ -148,6 +155,7 @@ public CompletionStage<KafkaCluster> describeCluster(List<String> fields) {
enumNames(get(result::authorizedOperations))))
.thenApplyAsync(this::addKafkaContextData, threadContext.currentContextExecutor())
.thenApply(this::addKafkaResourceData)
.thenCompose(cluster -> addMetrics(cluster, fields))
.thenApply(this::setManaged);
}

Expand Down Expand Up @@ -313,6 +321,42 @@ KafkaCluster setManaged(KafkaCluster cluster) {
return cluster;
}


CompletionStage<KafkaCluster> addMetrics(KafkaCluster cluster, List<String> fields) {
if (!fields.contains(KafkaCluster.Fields.METRICS)) {
return CompletableFuture.completedStage(cluster);
}

if (kafkaContext.prometheus() == null) {
logger.warnf("Kafka cluster metrics were requested, but Prometheus URL is not configured");
cluster.metrics(null);
return CompletableFuture.completedStage(cluster);
}

String namespace = cluster.namespace();
String name = cluster.name();
String rangeQuery;
String valueQuery;

try (var rangesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_ranges.promql");
var valuesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_values.promql")) {
rangeQuery = new String(rangesStream.readAllBytes(), StandardCharsets.UTF_8)
.formatted(namespace, name);
valueQuery = new String(valuesStream.readAllBytes(), StandardCharsets.UTF_8)
.formatted(namespace, name);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

var rangeResults = metricsService.queryRanges(rangeQuery).toCompletableFuture();
var valueResults = metricsService.queryValues(valueQuery).toCompletableFuture();

return CompletableFuture.allOf(
rangeResults.thenAccept(cluster.metrics().ranges()::putAll),
valueResults.thenAccept(cluster.metrics().values()::putAll))
.thenApply(nothing -> cluster);
}

private Optional<Kafka> findCluster(KafkaCluster cluster) {
return findCluster(Cache.namespaceKeyFunc(cluster.namespace(), cluster.name()));
}
Expand Down
Loading

0 comments on commit 81c6706

Please sign in to comment.