diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java index 4a60df1f4d804..40648e67b3cfa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -121,7 +121,7 @@ public void testEvents(String topicTypePersistence, String topicTypePartitioned, boolean forceDelete) throws Exception { String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); - createTopicAndVerifyEvents(topicTypePartitioned, topicName); + createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName); events.clear(); if (topicTypePartitioned.equals("partitioned")) { @@ -145,7 +145,7 @@ public void testEventsWithUnload(String topicTypePersistence, String topicTypePa boolean forceDelete) throws Exception { String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); - createTopicAndVerifyEvents(topicTypePartitioned, topicName); + createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName); events.clear(); admin.topics().unload(topicName); @@ -177,7 +177,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar boolean forceDelete) throws Exception { String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); - createTopicAndVerifyEvents(topicTypePartitioned, topicName); + createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName); Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe(); Producer producer = pulsarClient.newProducer().topic(topicName).create(); @@ -233,7 +233,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar public void testTopicAutoGC(String topicTypePersistence, String topicTypePartitioned) throws Exception { String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); - createTopicAndVerifyEvents(topicTypePartitioned, topicName); + createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName); admin.namespaces().setInactiveTopicPolicies(namespace, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true)); @@ -257,25 +257,22 @@ public void testTopicAutoGC(String topicTypePersistence, String topicTypePartiti ); } - private void createTopicAndVerifyEvents(String topicTypePartitioned, String topicName) throws Exception { + private void createTopicAndVerifyEvents(String topicDomain, String topicTypePartitioned, String topicName) + throws Exception { final String[] expectedEvents; - if (topicTypePartitioned.equals("partitioned")) { - topicNameToWatch = topicName + "-partition-1"; - admin.topics().createPartitionedTopic(topicName, 2); - triggerPartitionsCreation(topicName); - + if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) { expectedEvents = new String[]{ "LOAD__BEFORE", "CREATE__BEFORE", "CREATE__SUCCESS", "LOAD__SUCCESS" }; - } else { - topicNameToWatch = topicName; - admin.topics().createNonPartitionedTopic(topicName); - expectedEvents = new String[]{ + // Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic + // was already exists, and the action "check topic exists" will try to load Managed ledger, + // the check triggers two exrtra events: [LOAD__BEFORE, LOAD__FAILURE]. + // #21995 fixed this wrong behavior, so remove these two events. "LOAD__BEFORE", "LOAD__FAILURE", "LOAD__BEFORE", @@ -283,7 +280,14 @@ private void createTopicAndVerifyEvents(String topicTypePartitioned, String topi "CREATE__SUCCESS", "LOAD__SUCCESS" }; - + } + if (topicTypePartitioned.equals("partitioned")) { + topicNameToWatch = topicName + "-partition-1"; + admin.topics().createPartitionedTopic(topicName, 2); + triggerPartitionsCreation(topicName); + } else { + topicNameToWatch = topicName; + admin.topics().createNonPartitionedTopic(topicName); } Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index a76f6159264ee..49b809d2a9bd4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -2643,6 +2643,7 @@ public void testFailedUpdatePartitionedTopic() throws Exception { URL pulsarUrl = new URL(pulsar.getWebServiceAddress()); admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions); + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build(); Consumer consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1) .subscriptionType(SubscriptionType.Shared).subscribe(); @@ -2652,18 +2653,16 @@ public void testFailedUpdatePartitionedTopic() throws Exception { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions); // create a subscription for few new partition which can fail - admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1, - MessageId.earliest); - try { - admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, false); - } catch (PulsarAdminException.PreconditionFailedException e) { - // Ok + admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1, + MessageId.earliest); + fail("Unexpected behaviour"); + } catch (PulsarAdminException.ConflictException ex) { + // OK } - assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions); + admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, true); // validate subscription is created for new partition. - assertNotNull(admin.topics().getStats(partitionedTopicName + "-partition-" + 6).getSubscriptions().get(subName1)); for (int i = startPartitions; i < newPartitions; i++) { assertNotNull( admin.topics().getStats(partitionedTopicName + "-partition-" + i).getSubscriptions().get(subName1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 3628dfcee182c..e3e8481ca860c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -150,10 +150,11 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() .sendTimeout(1, TimeUnit.SECONDS) .topic(topic) .create();) { - } catch (PulsarClientException.LookupException expected) { - String msg = "Namespace bundle for topic (%s) not served by this instance"; + } catch (PulsarClientException.TopicDoesNotExistException expected) { + // Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false, + // so the "TopicDoesNotExistException" is expected. log.info("Expected error", expected); - assertTrue(expected.getMessage().contains(String.format(msg, topic)) + assertTrue(expected.getMessage().contains(topic) || expected.getMessage().contains(topicPoliciesServiceInitException)); } @@ -161,14 +162,14 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() .topic(topic) .subscriptionName("test") .subscribe();) { - } catch (PulsarClientException.LookupException expected) { - String msg = "Namespace bundle for topic (%s) not served by this instance"; + } catch (PulsarClientException.TopicDoesNotExistException expected) { + // Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false, + // so the "TopicDoesNotExistException" is expected. log.info("Expected error", expected); - assertTrue(expected.getMessage().contains(String.format(msg, topic)) + assertTrue(expected.getMessage().contains(topic) || expected.getMessage().contains(topicPoliciesServiceInitException)); } - // verify that the topic does not exist pulsar.getPulsarResources().getNamespaceResources() .setPolicies(NamespaceName.get(namespaceName), old -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index f9a26b77aa577..6d819d4f5a8a8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -20,13 +20,11 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Policy.Expiration; -import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerBrokerUsageStats; @@ -35,12 +33,9 @@ import org.apache.pulsar.broker.service.resource.usage.NetworkUsage; import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TenantInfo; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 7d16ba240538e..eb889a8b7a0d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -284,7 +284,7 @@ public void testPersistentPartitionedTopicUnload() throws Exception { assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName)); pulsar.getBrokerService().getTopicIfExists(topicName).get(); - assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName)); + assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName)); // ref of partitioned-topic name should be empty assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());