diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 1526ae18a9057..f763804fff500 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.Constants; +import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -237,6 +238,9 @@ protected void validateTopicName(String property, String namespace, String encod try { this.namespaceName = NamespaceName.get(property, namespace); this.topicName = TopicName.get(domain(), namespaceName, topic); + if (config().isStrictTopicNameEnabled()) { + NamedEntity.checkName(topicName.getLocalName()); + } } catch (IllegalArgumentException e) { log.warn("[{}] Invalid topic name [{}://{}/{}/{}]", clientAppId(), domain(), property, namespace, topic); throw new RestException(Status.PRECONDITION_FAILED, "Topic name is not valid"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4e3eb9fd4ad27..7802fd37667dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -147,6 +147,7 @@ import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.naming.Metadata; +import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; @@ -240,6 +241,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final long connectionLivenessCheckTimeoutMillis; + private final boolean strictTopicNameEnabled; + // Tracks and limits number of bytes pending to be published from a single specific IO thread. static final class PendingBytesPerThreadTracker { private static final FastThreadLocal pendingBytesPerThread = @@ -341,6 +344,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) { this.topicListService = new TopicListService(pulsar, this, enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength); this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null; + this.strictTopicNameEnabled = conf.isStrictTopicNameEnabled(); } @Override @@ -3267,7 +3271,11 @@ private void disableTcpNoDelayIfNeeded(String topic, String producerName) { private TopicName validateTopicName(String topic, long requestId, Object requestCommand) { try { - return TopicName.get(topic); + TopicName topicName = TopicName.get(topic); + if (strictTopicNameEnabled) { + NamedEntity.checkName(topicName.getLocalName()); + } + return topicName; } catch (Throwable t) { if (log.isDebugEnabled()) { log.debug("[{}] Failed to parse topic name '{}'", remoteAddress, topic, t); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java index 68e1172f01d2e..10ea1ef0dba87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java @@ -75,6 +75,7 @@ public void testValidatePersistentTopicNameSuccess() { String topic = Codec.encode("test-topic"); AdminResource resource = mockResource(); + resource.setPulsar(pulsar); resource.validatePersistentTopicName(tenant, namespace, topic); } @@ -85,6 +86,7 @@ public void testValidatePersistentTopicNameInvalid() { String topic = Codec.encode("test-topic"); AdminResource nPResource = mockNonPersistentResource(); + nPResource.setPulsar(pulsar); try { nPResource.validatePersistentTopicName(tenant, namespace, topic); fail("Should fail validation on non-persistent topic"); @@ -100,6 +102,7 @@ public void testValidatePartitionedTopicNameSuccess() { String topic = Codec.encode("test-topic"); AdminResource resource = mockResource(); + resource.setPulsar(pulsar); resource.validatePartitionedTopicName(tenant, namespace, topic); } @@ -110,6 +113,7 @@ public void testValidatePartitionedTopicNameInvalid() { String topic = Codec.encode("test-topic-partition-0"); AdminResource resource = mockResource(); + resource.setPulsar(pulsar); try { resource.validatePartitionedTopicName(tenant, namespace, topic); fail("Should fail validation on invalid partitioned topic");