Skip to content

Commit

Permalink
[improve][broker] Add topic name validation by NamedEntity
Browse files Browse the repository at this point in the history
  • Loading branch information
erobot committed Jan 9, 2024
1 parent 6560a21 commit a7e6ac1
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -237,6 +238,9 @@ protected void validateTopicName(String property, String namespace, String encod
try {
this.namespaceName = NamespaceName.get(property, namespace);
this.topicName = TopicName.get(domain(), namespaceName, topic);
if (config().isStrictTopicNameEnabled()) {
NamedEntity.checkName(topicName.getLocalName());
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Invalid topic name [{}://{}/{}/{}]", clientAppId(), domain(), property, namespace, topic);
throw new RestException(Status.PRECONDITION_FAILED, "Topic name is not valid");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -240,6 +241,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {

private final long connectionLivenessCheckTimeoutMillis;

private final boolean strictTopicNameEnabled;

// Tracks and limits number of bytes pending to be published from a single specific IO thread.
static final class PendingBytesPerThreadTracker {
private static final FastThreadLocal<PendingBytesPerThreadTracker> pendingBytesPerThread =
Expand Down Expand Up @@ -341,6 +344,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.topicListService = new TopicListService(pulsar, this,
enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength);
this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
this.strictTopicNameEnabled = conf.isStrictTopicNameEnabled();
}

@Override
Expand Down Expand Up @@ -3267,7 +3271,11 @@ private void disableTcpNoDelayIfNeeded(String topic, String producerName) {

private TopicName validateTopicName(String topic, long requestId, Object requestCommand) {
try {
return TopicName.get(topic);
TopicName topicName = TopicName.get(topic);
if (strictTopicNameEnabled) {
NamedEntity.checkName(topicName.getLocalName());
}
return topicName;
} catch (Throwable t) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to parse topic name '{}'", remoteAddress, topic, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void testValidatePersistentTopicNameSuccess() {
String topic = Codec.encode("test-topic");

AdminResource resource = mockResource();
resource.setPulsar(pulsar);
resource.validatePersistentTopicName(tenant, namespace, topic);
}

Expand All @@ -85,6 +86,7 @@ public void testValidatePersistentTopicNameInvalid() {
String topic = Codec.encode("test-topic");

AdminResource nPResource = mockNonPersistentResource();
nPResource.setPulsar(pulsar);
try {
nPResource.validatePersistentTopicName(tenant, namespace, topic);
fail("Should fail validation on non-persistent topic");
Expand All @@ -100,6 +102,7 @@ public void testValidatePartitionedTopicNameSuccess() {
String topic = Codec.encode("test-topic");

AdminResource resource = mockResource();
resource.setPulsar(pulsar);
resource.validatePartitionedTopicName(tenant, namespace, topic);
}

Expand All @@ -110,6 +113,7 @@ public void testValidatePartitionedTopicNameInvalid() {
String topic = Codec.encode("test-topic-partition-0");

AdminResource resource = mockResource();
resource.setPulsar(pulsar);
try {
resource.validatePartitionedTopicName(tenant, namespace, topic);
fail("Should fail validation on invalid partitioned topic");
Expand Down

0 comments on commit a7e6ac1

Please sign in to comment.