Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Obtain topic/partition overview counts from topics API, not metrics #1121

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public SingleResponse(Topic data) {
}

@JsonFilter("fieldFilter")
static class Attributes {
public static class Attributes {
@JsonProperty
String name;

Expand Down Expand Up @@ -193,6 +193,10 @@ public String visibility() {
@JsonProperty
@Schema(readOnly = true, description = "The number of partitions in this topic")
public Integer numPartitions() {
if (partitions == null) {
return null;
}

return partitions.getOptionalPrimary().map(Collection::size).orElse(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -163,7 +164,11 @@ public CompletionStage<List<Topic>> listTopics(List<String> fields, String offse

Admin adminClient = kafkaContext.admin();
final Map<String, Integer> statuses = new HashMap<>();
listSupport.meta().put("summary", Map.of("statuses", statuses));
final AtomicInteger partitionCount = new AtomicInteger(0);

listSupport.meta().put("summary", Map.of(
"statuses", statuses,
"totalPartitions", partitionCount));

return listTopics(adminClient, true)
.thenApply(list -> list.stream().map(Topic::fromTopicListing).toList())
Expand All @@ -172,7 +177,7 @@ public CompletionStage<List<Topic>> listTopics(List<String> fields, String offse
threadContext.currentContextExecutor())
.thenApply(list -> list.stream()
.filter(listSupport)
.map(topic -> tallyStatus(statuses, topic))
.map(topic -> tallySummary(statuses, partitionCount, topic))
.map(listSupport::tally)
.filter(listSupport::betweenCursors)
.sorted(listSupport.getSortComparator())
Expand All @@ -183,8 +188,15 @@ public CompletionStage<List<Topic>> listTopics(List<String> fields, String offse
threadContext.currentContextExecutor());
}

Topic tallyStatus(Map<String, Integer> statuses, Topic topic) {
Topic tallySummary(Map<String, Integer> statuses, AtomicInteger partitionCount, Topic topic) {
statuses.compute(topic.status(), (k, v) -> v == null ? 1 : v + 1);

Integer numPartitions = topic.getAttributes().numPartitions();
//numPartitions may be null if it was not included in the requested fields
if (numPartitions != null) {
partitionCount.addAndGet(numPartitions);
}

return topic;
}

Expand Down
19 changes: 12 additions & 7 deletions ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ export async function getKafkaClusterKpis(
}

if (!prom || !cluster.attributes.namespace) {
log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
log.warn({ clusterId }, "getKafkaClusterKpis: " +
(!cluster.attributes.namespace
? "Kafka cluster namespace not available"
: "Prometheus not configured or client error"));
return { cluster, kpis: null };
}

Expand All @@ -98,7 +101,6 @@ export async function getKafkaClusterKpis(
values(
cluster.attributes.namespace,
cluster.attributes.name,
cluster.attributes.controller.id,
cluster.attributes.nodePools?.join("|") ?? "",
),
);
Expand Down Expand Up @@ -155,9 +157,6 @@ export async function getKafkaClusterKpis(
"1": 3,
"2": 3
},
"total_topics": 5,
"total_partitions": 55,
"underreplicated_topics": 0,
"replica_count": {
"byNode": {
"0": 57,
Expand Down Expand Up @@ -253,7 +252,10 @@ export async function getKafkaClusterMetrics(
}

if (!prom || !cluster.attributes.namespace) {
log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
log.warn({ clusterId }, "getKafkaClusterMetrics: " +
(!cluster.attributes.namespace
? "Kafka cluster namespace not available"
: "Prometheus not configured or client error"));
return { cluster, ranges: null };
}

Expand Down Expand Up @@ -328,7 +330,10 @@ export async function getKafkaTopicMetrics(

try {
if (!prom || !cluster.attributes.namespace) {
log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
log.warn({ clusterId }, "getKafkaTopicMetrics: " +
(!cluster.attributes.namespace
? "Kafka cluster namespace not available"
: "Prometheus not configured or client error"));
return { cluster, ranges: null };
}

Expand Down
42 changes: 0 additions & 42 deletions ui/api/kafka/kpi.promql.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
export const values = (
namespace: string,
cluster: string,
controller: number,
nodePools: string,
) => `
sum by (__console_metric_name__, nodeId) (
Expand All @@ -22,47 +21,6 @@ sum by (__console_metric_name__, nodeId) (

or

sum by (__console_metric_name__) (
label_replace(
kafka_controller_kafkacontroller_globaltopiccount{namespace="${namespace}",pod=~"${cluster}-.+-${controller}",strimzi_io_kind="Kafka"} > 0,
"__console_metric_name__",
"total_topics",
"",
""
)
)

or

sum by (__console_metric_name__) (
label_replace(
kafka_controller_kafkacontroller_globalpartitioncount{namespace="${namespace}",pod=~"${cluster}-.+-${controller}",strimzi_io_kind="Kafka"} > 0,
"__console_metric_name__",
"total_partitions",
"",
""
)
)

or

label_replace(
(
count(
sum by (topic) (
kafka_cluster_partition_underreplicated{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",strimzi_io_kind="Kafka"} > 0
)
)
OR on() vector(0)
),
"__console_metric_name__",
"underreplicated_topics",
"",
""
)

or

sum by (__console_metric_name__, nodeId) (
label_replace(
label_replace(
Expand Down
3 changes: 0 additions & 3 deletions ui/api/kafka/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ export type ClusterDetail = z.infer<typeof ClusterDetailSchema>;

export const ClusterKpisSchema = z.object({
broker_state: z.record(z.number()).optional(),
total_topics: z.number().optional(),
total_partitions: z.number().optional(),
underreplicated_topics: z.number().optional(),
replica_count: z
.object({
byNode: z.record(z.number()).optional(),
Expand Down
6 changes: 4 additions & 2 deletions ui/api/topics/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export async function getTopics(
params: {
name?: string;
id?: string;
fields?: string;
status?: TopicStatus[];
pageSize?: number;
pageCursor?: string;
Expand All @@ -36,7 +37,7 @@ export async function getTopics(
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[topics]":
"name,status,visibility,numPartitions,totalLeaderLogBytes,consumerGroups",
params.fields ?? "name,status,visibility,numPartitions,totalLeaderLogBytes,consumerGroups",
"filter[id]": params.id ? `eq,${params.id}` : undefined,
"filter[name]": params.name ? `like,*${params.name}*` : undefined,
"filter[status]":
Expand Down Expand Up @@ -210,7 +211,8 @@ export async function setTopicAsViewed(kafkaId: string, topicId: string) {
kafkaId,
kafkaName: cluster.attributes.name,
topicId,
topicName: topic.attributes.name,
// name is included in the `fields[topics]` param list so we are sure it is present
topicName: topic.attributes.name!,
};
if (viewedTopics.find((t) => t.topicId === viewedTopic.topicId)) {
log.trace(
Expand Down
15 changes: 7 additions & 8 deletions ui/api/topics/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ const TopicSchema = z.object({
managed: z.boolean().optional(),
}).optional(),
attributes: z.object({
name: z.string(),
status: TopicStatusSchema,
visibility: z.string(),
name: z.string().optional(),
status: TopicStatusSchema.optional(),
visibility: z.string().optional(),
partitions: z.array(PartitionSchema).optional(),
numPartitions: z.number().optional(),
authorizedOperations: z.array(z.string()),
Expand All @@ -86,7 +86,7 @@ const TopicSchema = z.object({
relationships: z.object({
consumerGroups: z.object({
data: z.array(z.any()),
}),
}).optional(),
}),
});
export const TopicResponse = z.object({
Expand All @@ -109,10 +109,8 @@ const TopicListSchema = z.object({
numPartitions: true,
totalLeaderLogBytes: true,
}),
relationships: z.object({
consumerGroups: z.object({
data: z.array(z.any()),
}),
relationships: TopicSchema.shape.relationships.pick({
consumerGroups: true
}),
});
export type TopicList = z.infer<typeof TopicListSchema>;
Expand All @@ -129,6 +127,7 @@ export const TopicsResponseSchema = z.object({
PartiallyOffline: z.number().optional(),
Offline: z.number().optional(),
}),
totalPartitions: z.number(),
}),
}),
links: z.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async function ConnectedTopicHeader({
Consumer groups&nbsp;
<Label isCompact={true}>
<Number
value={topic?.relationships.consumerGroups.data.length}
value={topic?.relationships.consumerGroups?.data.length ?? 0}
/>
</Label>
</NavItemLink>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import { ClusterDetail, ClusterKpis } from "@/api/kafka/schema";
import { TopicsResponse } from "@/api/topics/schema";
import { TopicsPartitionsCard } from "@/components/ClusterOverview/TopicsPartitionsCard";

export async function ConnectedTopicsPartitionsCard({
data,
}: {
data: Promise<{ cluster: ClusterDetail; kpis: ClusterKpis | null } | null>;
data: Promise<TopicsResponse>;
}) {
const res = await data;
if (!res?.kpis) {
const summary = (await data).meta.summary;

if (!summary) {
return null;
}
const topicsTotal = res?.kpis.total_topics || 0;
const topicsUnderreplicated = res?.kpis.underreplicated_topics || 0;

const totalPartitions = summary.totalPartitions;
const totalReplicated = summary.statuses.FullyReplicated ?? 0;
const totalUnderReplicated = (summary.statuses.UnderReplicated ?? 0) + (summary.statuses.PartiallyOffline ?? 0);
const totalOffline = summary.statuses.Offline ?? 0;

return (
<TopicsPartitionsCard
isLoading={false}
partitions={Math.max(0, res?.kpis.total_partitions || 0)}
topicsReplicated={Math.max(0, topicsTotal - topicsUnderreplicated)}
topicsTotal={Math.max(0, topicsTotal)}
topicsUnderReplicated={Math.max(0, topicsUnderreplicated)}
partitions={totalPartitions}
topicsReplicated={totalReplicated}
topicsUnderReplicated={totalUnderReplicated}
topicsOffline={totalOffline}
/>
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
getKafkaClusterMetrics,
getKafkaTopicMetrics,
} from "@/api/kafka/actions";
import { getViewedTopics } from "@/api/topics/actions";
import { getTopics, getViewedTopics } from "@/api/topics/actions";
import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params";
import { ConnectedClusterCard } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterCard";
import { ConnectedClusterChartsCard } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterChartsCard";
Expand All @@ -25,6 +25,7 @@ export default function OverviewPage({ params }: { params: KafkaParams }) {
"outgoingByteRate",
"incomingByteRate",
]);
const topics = getTopics(params.kafkaId, { fields: "status", pageSize: 1 });
MikeEdgar marked this conversation as resolved.
Show resolved Hide resolved
const consumerGroups = getConsumerGroups(params.kafkaId, { fields: "state" });
const viewedTopics = getViewedTopics().then((topics) =>
topics.filter((t) => t.kafkaId === params.kafkaId),
Expand All @@ -34,7 +35,7 @@ export default function OverviewPage({ params }: { params: KafkaParams }) {
clusterOverview={
<ConnectedClusterCard data={kpi} consumerGroups={consumerGroups} />
}
topicsPartitions={<ConnectedTopicsPartitionsCard data={kpi} />}
topicsPartitions={<ConnectedTopicsPartitionsCard data={topics} />}
clusterCharts={<ConnectedClusterChartsCard data={cluster} />}
topicCharts={<ConnectedTopicChartsCard data={topic} />}
recentTopics={<ConnectedRecentTopics data={viewedTopics} />}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export default async function ConnectedMessagesPage({
<ConnectedMessagesTable
kafkaId={kafkaId}
topicId={topicId}
topicName={topic.attributes.name}
topicName={topic.attributes.name!}
selectedMessage={selectedMessage}
partitions={topic.attributes.numPartitions ?? 0}
/>
Expand Down
12 changes: 5 additions & 7 deletions ui/components/ClusterOverview/TopicsPartitionsCard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ import { Link } from "@/i18n/routing";
import { useTranslations } from "next-intl";

type TopicsPartitionsCardProps = {
topicsTotal: number;
topicsReplicated: number;
topicsUnderReplicated: number;
topicsOffline: number;
partitions: number;
};

export function TopicsPartitionsCard({
isLoading,
topicsTotal,
topicsReplicated,
topicsUnderReplicated,
topicsOffline,
partitions,
}:
| ({ isLoading: false } & TopicsPartitionsCardProps)
Expand Down Expand Up @@ -70,7 +70,7 @@ export function TopicsPartitionsCard({
<TextContent>
<Text component={"small"}>
<Link href={"./topics"}>
<Number value={topicsTotal} />{" "}
<Number value={topicsReplicated + topicsUnderReplicated + topicsOffline} />{" "}
{t("ClusterOverview.total_topics")}
</Link>
</Text>
Expand All @@ -81,7 +81,7 @@ export function TopicsPartitionsCard({
<TextContent>
<Text component={"small"}>
<Number value={partitions} />
&nbsp; {t("ClusterOverview.partition")}
&nbsp; {t("ClusterOverview.total_partitions")}
</Text>
</TextContent>
</FlexItem>
Expand Down Expand Up @@ -163,9 +163,7 @@ export function TopicsPartitionsCard({
/>
) : (
<Number
value={
topicsTotal - topicsReplicated - topicsUnderReplicated
}
value={topicsOffline}
/>
)}
</Link>
Expand Down
Loading