Skip to content

Commit

Permalink
feat: add get partition stats function (#27)
Browse files Browse the repository at this point in the history
Signed-off-by: wei <[email protected]>
  • Loading branch information
instpe committed Sep 1, 2024
1 parent fe6735d commit fdc135c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.io.IOException;
import java.net.http.HttpResponse;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -69,6 +70,28 @@ public void updatePartitionedTopic(String tenant, String namespace, String encod
}
}

public PartitionedTopicStats getPartitionedStats(String tenant, String namespace, String encodedTopic,
boolean perPartition)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace,
encodedTopic, "/partitioned-stats");
try {
HttpResponse<String> response = httpClient.get(url, "perPartition",
Arrays.toString(new Object[] {perPartition}), "getPreciseBacklog",
Arrays.toString(new Object[] {false}), "subscriptionBacklogSize",
Arrays.toString(new Object[] {false}),
"getEarliestTimeInBacklog", Arrays.toString(new Object[] {false}));
if (response.statusCode() != 200) {
throw new PulsarAdminException(
String.format("failed to get partitioned stats of topic %s/%s/%s, status code %s, body : %s",
tenant, namespace, encodedTopic, response.statusCode(), response.body()));
}
return JacksonService.toObject(response.body(), PartitionedTopicStats.class);
} catch (IOException | InterruptedException e) {
throw new PulsarAdminException(e);
}
}

public PartitionedTopicMetadata getPartitionedMetadata(String tenant, String namespace, String encodedTopic,
boolean checkAllowAutoCreation, boolean authoritative)
throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class PartitionedTopicStats extends TopicStats{
public class PartitionedTopicStats extends TopicStats {

public PartitionedTopicMetadata metadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ public void partitionedTopicsTest() throws PulsarAdminException {
pulsarAdmin.nonPersistentTopics().getList(tenant, namespace, null, false));
}

@Test
public void getPartitionedStatsTest() throws PulsarAdminException {
String namespace = RandomUtil.randomString();
String topic = RandomUtil.randomString();
pulsarAdmin.namespaces().createNamespace(tenant, namespace);
pulsarAdmin.nonPersistentTopics().createPartitionedTopic(tenant, namespace, topic, 2, false);
Assertions.assertNotNull(pulsarAdmin.nonPersistentTopics().getPartitionedStats(tenant, namespace,
topic, false));
}

@Test
public void nonPartitionedTopicsTest() throws PulsarAdminException {
String namespace = RandomUtil.randomString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,13 @@ public void retentionTest() throws PulsarAdminException {
false, false, false));
}

@Test
public void getPartitionedStatsTest() throws PulsarAdminException {
String namespace = RandomUtil.randomString();
String topic = RandomUtil.randomString();
pulsarAdmin.namespaces().createNamespace(tenant, namespace);
pulsarAdmin.persistentTopics().createPartitionedTopic(tenant, namespace, topic, 2, false);
Assertions.assertNotNull(pulsarAdmin.persistentTopics().getPartitionedStats(tenant, namespace, topic, false));
}

}

0 comments on commit fdc135c

Please sign in to comment.