Skip to content

Commit

Permalink
fix: fix non-persistent topic impl bug (#22)
Browse files Browse the repository at this point in the history
Signed-off-by: ZhangJian He <[email protected]>
  • Loading branch information
shoothzj committed Aug 28, 2024
1 parent f0dbc40 commit 6525edb
Show file tree
Hide file tree
Showing 48 changed files with 112 additions and 105 deletions.
12 changes: 7 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
<jackson.version>2.17.2</jackson.version>
<junit.version>5.11.0</junit.version>
<log4j.version>2.20.0</log4j.version>
<lombok.version>1.18.34</lombok.version>
<!-- plugin -->
<compiler-plugin.version>3.10.1</compiler-plugin.version>
<checkstyle-plugin.version>3.2.0</checkstyle-plugin.version>
<javadoc-plugin.version>3.4.1</javadoc-plugin.version>
<lombok.version>1.18.34</lombok.version>
<lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version>
<maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
<puppycrawl.version>10.18.0</puppycrawl.version>
<release-plugin.version>3.0.0-M6</release-plugin.version>
Expand Down Expand Up @@ -243,7 +245,7 @@

<developers>
<developer>
<name>ShootHzj</name>
<name>shootHzj</name>
<email>[email protected]</email>
</developer>
</developers>
Expand Down Expand Up @@ -280,7 +282,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>3.0.1</version>
<version>${maven-gpg-plugin.version}</version>
<executions>
<execution>
<id>sign-artifacts</id>
Expand Down Expand Up @@ -311,12 +313,12 @@
<plugin>
<groupId>org.projectlombok</groupId>
<artifactId>lombok-maven-plugin</artifactId>
<version>1.18.20.0</version>
<version>${lombok-maven-plugin.version}</version>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<version>${lombok.version}</version>
</dependency>
</dependencies>
<executions>
Expand Down
11 changes: 11 additions & 0 deletions pulsar-admin-jdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,15 @@

<artifactId>pulsar-admin-jdk</artifactId>

<build>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<includes>
<include>**.jks</include>
</includes>
</testResource>
</testResources>
</build>

</project>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

public enum BacklogQuotaType {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import com.fasterxml.jackson.core.type.TypeReference;

Expand All @@ -7,25 +7,20 @@
import java.util.List;
import java.util.Map;

public class PersistentTopicsImpl implements Topics {
public abstract class BaseTopicsImpl implements Topics {

protected final InnerHttpClient httpClient;

private static final String BASE_URL_PERSISTENT_DOMAIN = "/admin/v2" + "/persistent";

public PersistentTopicsImpl(InnerHttpClient httpClient) {
public BaseTopicsImpl(InnerHttpClient httpClient) {
this.httpClient = httpClient;
}

public String getDomainBaseUrl() {
return "/admin/v2" + "/persistent";
}
protected abstract String getDomainBaseUrl();

@Override
public void createPartitionedTopic(String tenant, String namespace, String encodedTopic, int numPartitions,
boolean createLocalTopicOnly) throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
"/partitions");
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, "/partitions");
try {
HttpResponse<String> response = httpClient.put(url, numPartitions, "createLocalTopicOnly",
String.valueOf(createLocalTopicOnly));
Expand All @@ -42,8 +37,7 @@ public void createPartitionedTopic(String tenant, String namespace, String encod
@Override
public void deletePartitionedTopic(String tenant, String namespace, String encodedTopic, boolean force,
boolean authoritative) throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
"/partitions");
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, "/partitions");
try {
HttpResponse<String> response = httpClient.delete(url, "force", String.valueOf(force),
"authoritative", String.valueOf(authoritative));
Expand All @@ -61,8 +55,7 @@ public void deletePartitionedTopic(String tenant, String namespace, String encod
public void updatePartitionedTopic(String tenant, String namespace, String encodedTopic,
boolean updateLocalTopicOnly, boolean authoritative,
boolean force, int numPartitions) throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
"/partitions");
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, "/partitions");
try {
HttpResponse<String> response = httpClient.post(url, numPartitions, "updateLocalTopicOnly",
String.valueOf(updateLocalTopicOnly),
Expand All @@ -82,8 +75,7 @@ public void updatePartitionedTopic(String tenant, String namespace, String encod
public PartitionedTopicMetadata getPartitionedMetadata(String tenant, String namespace, String encodedTopic,
boolean checkAllowAutoCreation, boolean authoritative)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent",
tenant, namespace, encodedTopic, "/partitions");
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, "/partitions");
try {
HttpResponse<String> response = httpClient.get(url,
"checkAllowAutoCreation", String.valueOf(checkAllowAutoCreation),
Expand All @@ -102,7 +94,7 @@ public PartitionedTopicMetadata getPartitionedMetadata(String tenant, String nam
@Override
public void createNonPartitionedTopic(String tenant, String namespace, String encodedTopic, boolean authoritative,
Map<String, String> properties) throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic);
String url = String.format("%s/%s/%s/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic);
try {
HttpResponse<String> response = httpClient.put(url, properties,
"authoritative", String.valueOf(authoritative));
Expand All @@ -119,7 +111,7 @@ public void createNonPartitionedTopic(String tenant, String namespace, String en
@Override
public void deleteTopic(String tenant, String namespace, String encodedTopic, boolean force, boolean authoritative)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic);
String url = String.format("%s/%s/%s/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic);
try {
HttpResponse<String> response = httpClient.delete(url,
"force", String.valueOf(force),
Expand All @@ -137,7 +129,7 @@ public void deleteTopic(String tenant, String namespace, String encodedTopic, bo
@Override
public List<String> getList(String tenant, String namespace, String bundle, boolean includeSystemTopic)
throws PulsarAdminException {
String url = String.format("%s/%s/%s", "/admin/v2" + "/persistent", tenant, namespace);
String url = String.format("%s/%s/%s", getDomainBaseUrl(), tenant, namespace);
try {
HttpResponse<String> response;
if (bundle != null) {
Expand All @@ -164,7 +156,7 @@ public List<String> getList(String tenant, String namespace, String bundle, bool
@Override
public List<String> getPartitionedTopicList(String tenant, String namespace, boolean includeSystemTopic)
throws PulsarAdminException {
String url = String.format("%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, UrlConst.PARTITIONED);
String url = String.format("%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, UrlConst.PARTITIONED);
try {
HttpResponse<String> response = httpClient.get(url, "includeSystemTopic",
String.valueOf(includeSystemTopic));
Expand All @@ -184,7 +176,7 @@ public List<String> getPartitionedTopicList(String tenant, String namespace, boo
@Override
public void createMissedPartitions(String tenant, String namespace, String encodedTopic)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.CREATE_MISSED_PARTITIONS);
try {
HttpResponse<String> response = httpClient.post(url);
Expand All @@ -202,7 +194,7 @@ public void createMissedPartitions(String tenant, String namespace, String encod
@Override
public MessageIdImpl getLastMessageId(String tenant, String namespace, String encodedTopic, boolean authoritative)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.LAST_MESSAGE_ID);
try {
HttpResponse<String> response = httpClient.get(url, "authoritative", String.valueOf(authoritative));
Expand All @@ -220,7 +212,7 @@ public MessageIdImpl getLastMessageId(String tenant, String namespace, String en
@Override
public RetentionPolicies getRetention(String tenant, String namespace, String encodedTopic, boolean isGlobal,
boolean applied, boolean authoritative) throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.RETENTION);
try {
HttpResponse<String> response = httpClient.get(url,
Expand All @@ -241,7 +233,7 @@ public RetentionPolicies getRetention(String tenant, String namespace, String en
@Override
public void setRetention(String tenant, String namespace, String encodedTopic, boolean authoritative,
boolean isGlobal, RetentionPolicies retention) throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.RETENTION);
try {
HttpResponse<String> response = httpClient.post(url, retention,
Expand All @@ -260,7 +252,7 @@ public void setRetention(String tenant, String namespace, String encodedTopic, b
@Override
public void removeRetention(String tenant, String namespace, String encodedTopic, boolean authoritative)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.RETENTION);
try {
HttpResponse<String> response = httpClient.delete(url, "authoritative", String.valueOf(authoritative));
Expand All @@ -279,7 +271,7 @@ public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String tenant, Str
String encodedTopic, boolean applied,
boolean authoritative, boolean isGlobal)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.BACKLOG_QUOTA_MAP);
try {
HttpResponse<String> response = httpClient.get(url,
Expand All @@ -302,7 +294,7 @@ public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String tenant, Str
public void setBacklogQuota(String tenant, String namespace, String encodedTopic, boolean authoritative,
boolean isGlobal, BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.BACKLOG_QUOTA);
try {
HttpResponse<String> response = httpClient.post(url, backlogQuota,
Expand All @@ -323,7 +315,7 @@ public void setBacklogQuota(String tenant, String namespace, String encodedTopic
public void removeBacklogQuota(String tenant, String namespace, String encodedTopic,
BacklogQuotaType backlogQuotaType, boolean authoritative, boolean isGlobal)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.BACKLOG_QUOTA);
try {
HttpResponse<String> response = httpClient.delete(url,
Expand All @@ -343,7 +335,7 @@ public void removeBacklogQuota(String tenant, String namespace, String encodedTo
@Override
public PersistentOfflineTopicStats getBacklog(String tenant, String namespace, String encodedTopic,
boolean authoritative) throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.BACKLOG);
try {
HttpResponse<String> response = httpClient.get(url, "authoritative", String.valueOf(authoritative));
Expand All @@ -361,7 +353,7 @@ public PersistentOfflineTopicStats getBacklog(String tenant, String namespace, S
@Override
public long getBacklogSizeByMessageId(String tenant, String namespace, String encodedTopic, boolean authoritative,
MessageIdImpl messageId) throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s%s", "/admin/v2" + "/persistent", tenant, namespace, encodedTopic,
String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
UrlConst.BACKLOG_SIZE);
try {
HttpResponse<String> response = httpClient.put(url, messageId, "authoritative",
Expand All @@ -376,5 +368,4 @@ public long getBacklogSizeByMessageId(String tenant, String namespace, String en
throw new PulsarAdminException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

public interface Brokers {
void healthcheck(TopicVersion topicVersion) throws PulsarAdminException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import java.net.http.HttpResponse;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import com.fasterxml.jackson.core.type.TypeReference;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import lombok.Getter;
import lombok.Setter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import com.fasterxml.jackson.core.JsonProcessingException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

public enum Mode {
PERSISTENT,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.protocol.pulsar;
package io.github.protocol.pulsar.admin.jdk;

import com.fasterxml.jackson.core.type.TypeReference;

Expand Down
Loading

0 comments on commit 6525edb

Please sign in to comment.