Skip to content

Commit

Permalink
Obtain topic/partition overview counts from topics API, not metrics (#…
Browse files Browse the repository at this point in the history
…1121)

* Obtain topic/partition overview counts from topics API, not metrics

---------

Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar authored Oct 21, 2024
1 parent cccabbc commit b42cb6a
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public SingleResponse(Topic data) {
}

@JsonFilter("fieldFilter")
static class Attributes {
public static class Attributes {
@JsonProperty
String name;

Expand Down Expand Up @@ -193,6 +193,10 @@ public String visibility() {
@JsonProperty
@Schema(readOnly = true, description = "The number of partitions in this topic")
public Integer numPartitions() {
if (partitions == null) {
return null;
}

return partitions.getOptionalPrimary().map(Collection::size).orElse(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -163,7 +164,11 @@ public CompletionStage<List<Topic>> listTopics(List<String> fields, String offse

Admin adminClient = kafkaContext.admin();
final Map<String, Integer> statuses = new HashMap<>();
listSupport.meta().put("summary", Map.of("statuses", statuses));
final AtomicInteger partitionCount = new AtomicInteger(0);

listSupport.meta().put("summary", Map.of(
"statuses", statuses,
"totalPartitions", partitionCount));

return listTopics(adminClient, true)
.thenApply(list -> list.stream().map(Topic::fromTopicListing).toList())
Expand All @@ -172,7 +177,7 @@ public CompletionStage<List<Topic>> listTopics(List<String> fields, String offse
threadContext.currentContextExecutor())
.thenApply(list -> list.stream()
.filter(listSupport)
.map(topic -> tallyStatus(statuses, topic))
.map(topic -> tallySummary(statuses, partitionCount, topic))
.map(listSupport::tally)
.filter(listSupport::betweenCursors)
.sorted(listSupport.getSortComparator())
Expand All @@ -183,8 +188,15 @@ public CompletionStage<List<Topic>> listTopics(List<String> fields, String offse
threadContext.currentContextExecutor());
}

Topic tallyStatus(Map<String, Integer> statuses, Topic topic) {
Topic tallySummary(Map<String, Integer> statuses, AtomicInteger partitionCount, Topic topic) {
statuses.compute(topic.status(), (k, v) -> v == null ? 1 : v + 1);

Integer numPartitions = topic.getAttributes().numPartitions();
//numPartitions may be null if it was not included in the requested fields
if (numPartitions != null) {
partitionCount.addAndGet(numPartitions);
}

return topic;
}

Expand Down
19 changes: 12 additions & 7 deletions ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ export async function getKafkaClusterKpis(
}

if (!prom || !cluster.attributes.namespace) {
log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
log.warn({ clusterId }, "getKafkaClusterKpis: " +
(!cluster.attributes.namespace
? "Kafka cluster namespace not available"
: "Prometheus not configured or client error"));
return { cluster, kpis: null };
}

Expand All @@ -98,7 +101,6 @@ export async function getKafkaClusterKpis(
values(
cluster.attributes.namespace,
cluster.attributes.name,
cluster.attributes.controller.id,
cluster.attributes.nodePools?.join("|") ?? "",
),
);
Expand Down Expand Up @@ -155,9 +157,6 @@ export async function getKafkaClusterKpis(
"1": 3,
"2": 3
},
"total_topics": 5,
"total_partitions": 55,
"underreplicated_topics": 0,
"replica_count": {
"byNode": {
"0": 57,
Expand Down Expand Up @@ -253,7 +252,10 @@ export async function getKafkaClusterMetrics(
}

if (!prom || !cluster.attributes.namespace) {
log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
log.warn({ clusterId }, "getKafkaClusterMetrics: " +
(!cluster.attributes.namespace
? "Kafka cluster namespace not available"
: "Prometheus not configured or client error"));
return { cluster, ranges: null };
}

Expand Down Expand Up @@ -328,7 +330,10 @@ export async function getKafkaTopicMetrics(

try {
if (!prom || !cluster.attributes.namespace) {
log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
log.warn({ clusterId }, "getKafkaTopicMetrics: " +
(!cluster.attributes.namespace
? "Kafka cluster namespace not available"
: "Prometheus not configured or client error"));
return { cluster, ranges: null };
}

Expand Down
42 changes: 0 additions & 42 deletions ui/api/kafka/kpi.promql.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
export const values = (
namespace: string,
cluster: string,
controller: number,
nodePools: string,
) => `
sum by (__console_metric_name__, nodeId) (
Expand All @@ -22,47 +21,6 @@ sum by (__console_metric_name__, nodeId) (
or
sum by (__console_metric_name__) (
label_replace(
kafka_controller_kafkacontroller_globaltopiccount{namespace="${namespace}",pod=~"${cluster}-.+-${controller}",strimzi_io_kind="Kafka"} > 0,
"__console_metric_name__",
"total_topics",
"",
""
)
)
or
sum by (__console_metric_name__) (
label_replace(
kafka_controller_kafkacontroller_globalpartitioncount{namespace="${namespace}",pod=~"${cluster}-.+-${controller}",strimzi_io_kind="Kafka"} > 0,
"__console_metric_name__",
"total_partitions",
"",
""
)
)
or
label_replace(
(
count(
sum by (topic) (
kafka_cluster_partition_underreplicated{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",strimzi_io_kind="Kafka"} > 0
)
)
OR on() vector(0)
),
"__console_metric_name__",
"underreplicated_topics",
"",
""
)
or
sum by (__console_metric_name__, nodeId) (
label_replace(
label_replace(
Expand Down
3 changes: 0 additions & 3 deletions ui/api/kafka/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ export type ClusterDetail = z.infer<typeof ClusterDetailSchema>;

export const ClusterKpisSchema = z.object({
broker_state: z.record(z.number()).optional(),
total_topics: z.number().optional(),
total_partitions: z.number().optional(),
underreplicated_topics: z.number().optional(),
replica_count: z
.object({
byNode: z.record(z.number()).optional(),
Expand Down
6 changes: 4 additions & 2 deletions ui/api/topics/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export async function getTopics(
params: {
name?: string;
id?: string;
fields?: string;
status?: TopicStatus[];
pageSize?: number;
pageCursor?: string;
Expand All @@ -36,7 +37,7 @@ export async function getTopics(
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[topics]":
"name,status,visibility,numPartitions,totalLeaderLogBytes,consumerGroups",
params.fields ?? "name,status,visibility,numPartitions,totalLeaderLogBytes,consumerGroups",
"filter[id]": params.id ? `eq,${params.id}` : undefined,
"filter[name]": params.name ? `like,*${params.name}*` : undefined,
"filter[status]":
Expand Down Expand Up @@ -210,7 +211,8 @@ export async function setTopicAsViewed(kafkaId: string, topicId: string) {
kafkaId,
kafkaName: cluster.attributes.name,
topicId,
topicName: topic.attributes.name,
// name is included in the `fields[topics]` param list so we are sure it is present
topicName: topic.attributes.name!,
};
if (viewedTopics.find((t) => t.topicId === viewedTopic.topicId)) {
log.trace(
Expand Down
15 changes: 7 additions & 8 deletions ui/api/topics/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ const TopicSchema = z.object({
managed: z.boolean().optional(),
}).optional(),
attributes: z.object({
name: z.string(),
status: TopicStatusSchema,
visibility: z.string(),
name: z.string().optional(),
status: TopicStatusSchema.optional(),
visibility: z.string().optional(),
partitions: z.array(PartitionSchema).optional(),
numPartitions: z.number().optional(),
authorizedOperations: z.array(z.string()),
Expand All @@ -86,7 +86,7 @@ const TopicSchema = z.object({
relationships: z.object({
consumerGroups: z.object({
data: z.array(z.any()),
}),
}).optional(),
}),
});
export const TopicResponse = z.object({
Expand All @@ -109,10 +109,8 @@ const TopicListSchema = z.object({
numPartitions: true,
totalLeaderLogBytes: true,
}),
relationships: z.object({
consumerGroups: z.object({
data: z.array(z.any()),
}),
relationships: TopicSchema.shape.relationships.pick({
consumerGroups: true
}),
});
export type TopicList = z.infer<typeof TopicListSchema>;
Expand All @@ -129,6 +127,7 @@ export const TopicsResponseSchema = z.object({
PartiallyOffline: z.number().optional(),
Offline: z.number().optional(),
}),
totalPartitions: z.number(),
}),
}),
links: z.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async function ConnectedTopicHeader({
Consumer groups&nbsp;
<Label isCompact={true}>
<Number
value={topic?.relationships.consumerGroups.data.length}
value={topic?.relationships.consumerGroups?.data.length ?? 0}
/>
</Label>
</NavItemLink>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import { ClusterDetail, ClusterKpis } from "@/api/kafka/schema";
import { TopicsResponse } from "@/api/topics/schema";
import { TopicsPartitionsCard } from "@/components/ClusterOverview/TopicsPartitionsCard";

export async function ConnectedTopicsPartitionsCard({
data,
}: {
data: Promise<{ cluster: ClusterDetail; kpis: ClusterKpis | null } | null>;
data: Promise<TopicsResponse>;
}) {
const res = await data;
if (!res?.kpis) {
const summary = (await data).meta.summary;

if (!summary) {
return null;
}
const topicsTotal = res?.kpis.total_topics || 0;
const topicsUnderreplicated = res?.kpis.underreplicated_topics || 0;

const totalPartitions = summary.totalPartitions;
const totalReplicated = summary.statuses.FullyReplicated ?? 0;
const totalUnderReplicated = (summary.statuses.UnderReplicated ?? 0) + (summary.statuses.PartiallyOffline ?? 0);
const totalOffline = summary.statuses.Offline ?? 0;

return (
<TopicsPartitionsCard
isLoading={false}
partitions={Math.max(0, res?.kpis.total_partitions || 0)}
topicsReplicated={Math.max(0, topicsTotal - topicsUnderreplicated)}
topicsTotal={Math.max(0, topicsTotal)}
topicsUnderReplicated={Math.max(0, topicsUnderreplicated)}
partitions={totalPartitions}
topicsReplicated={totalReplicated}
topicsUnderReplicated={totalUnderReplicated}
topicsOffline={totalOffline}
/>
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
getKafkaClusterMetrics,
getKafkaTopicMetrics,
} from "@/api/kafka/actions";
import { getViewedTopics } from "@/api/topics/actions";
import { getTopics, getViewedTopics } from "@/api/topics/actions";
import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params";
import { ConnectedClusterCard } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterCard";
import { ConnectedClusterChartsCard } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterChartsCard";
Expand All @@ -25,6 +25,7 @@ export default function OverviewPage({ params }: { params: KafkaParams }) {
"outgoingByteRate",
"incomingByteRate",
]);
const topics = getTopics(params.kafkaId, { fields: "status", pageSize: 1 });
const consumerGroups = getConsumerGroups(params.kafkaId, { fields: "state" });
const viewedTopics = getViewedTopics().then((topics) =>
topics.filter((t) => t.kafkaId === params.kafkaId),
Expand All @@ -34,7 +35,7 @@ export default function OverviewPage({ params }: { params: KafkaParams }) {
clusterOverview={
<ConnectedClusterCard data={kpi} consumerGroups={consumerGroups} />
}
topicsPartitions={<ConnectedTopicsPartitionsCard data={kpi} />}
topicsPartitions={<ConnectedTopicsPartitionsCard data={topics} />}
clusterCharts={<ConnectedClusterChartsCard data={cluster} />}
topicCharts={<ConnectedTopicChartsCard data={topic} />}
recentTopics={<ConnectedRecentTopics data={viewedTopics} />}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export default async function ConnectedMessagesPage({
<ConnectedMessagesTable
kafkaId={kafkaId}
topicId={topicId}
topicName={topic.attributes.name}
topicName={topic.attributes.name!}
selectedMessage={selectedMessage}
partitions={topic.attributes.numPartitions ?? 0}
/>
Expand Down
12 changes: 5 additions & 7 deletions ui/components/ClusterOverview/TopicsPartitionsCard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ import { Link } from "@/i18n/routing";
import { useTranslations } from "next-intl";

type TopicsPartitionsCardProps = {
topicsTotal: number;
topicsReplicated: number;
topicsUnderReplicated: number;
topicsOffline: number;
partitions: number;
};

export function TopicsPartitionsCard({
isLoading,
topicsTotal,
topicsReplicated,
topicsUnderReplicated,
topicsOffline,
partitions,
}:
| ({ isLoading: false } & TopicsPartitionsCardProps)
Expand Down Expand Up @@ -70,7 +70,7 @@ export function TopicsPartitionsCard({
<TextContent>
<Text component={"small"}>
<Link href={"./topics"}>
<Number value={topicsTotal} />{" "}
<Number value={topicsReplicated + topicsUnderReplicated + topicsOffline} />{" "}
{t("ClusterOverview.total_topics")}
</Link>
</Text>
Expand All @@ -81,7 +81,7 @@ export function TopicsPartitionsCard({
<TextContent>
<Text component={"small"}>
<Number value={partitions} />
&nbsp; {t("ClusterOverview.partition")}
&nbsp; {t("ClusterOverview.total_partitions")}
</Text>
</TextContent>
</FlexItem>
Expand Down Expand Up @@ -163,9 +163,7 @@ export function TopicsPartitionsCard({
/>
) : (
<Number
value={
topicsTotal - topicsReplicated - topicsUnderReplicated
}
value={topicsOffline}
/>
)}
</Link>
Expand Down
Loading

0 comments on commit b42cb6a

Please sign in to comment.