Skip to content

Commit

Permalink
[improve] [broker] Add additionalSystemCursorNames ignore list for TT…
Browse files Browse the repository at this point in the history
…L check (apache#22614)
  • Loading branch information
hangc0276 authored May 9, 2024
1 parent 0fd223d commit bed032e
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 1 deletion.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold
# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0

# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated.
# Default is empty.
# additionalSystemCursorNames=

# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
allowAutoTopicCreation=true

Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1
# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0

# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated.
# Default is empty.
# additionalSystemCursorNames=

# Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter.
# If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true.
brokerDeleteInactiveTopicsEnabled=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,13 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int ttlDurationDefaultInSeconds = 0;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Additional system subscriptions that will be ignored by ttl check. "
+ "The cursor names are comma separated. Default is empty."
)
private Set<String> additionalSystemCursorNames = new TreeSet<>();

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -279,6 +280,7 @@ protected TopicStatsHelper initialValue() {
private final ExecutorService orderedExecutor;

private volatile CloseFutures closeFutures;
private Set<String> additionalSystemCursorNames = new TreeSet<>();

@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();
Expand Down Expand Up @@ -414,6 +416,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
} else {
shadowSourceTopic = null;
}
additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
}

@Override
Expand Down Expand Up @@ -1934,7 +1937,9 @@ public void checkMessageExpiry() {
int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
if (messageTtlInSeconds != 0) {
subscriptions.forEach((__, sub) -> {
if (!isCompactionSubscription(sub.getName())) {
if (!isCompactionSubscription(sub.getName())
&& (additionalSystemCursorNames.isEmpty()
|| !additionalSystemCursorNames.contains(sub.getName()))) {
sub.expireMessages(messageTtlInSeconds);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,23 @@
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -138,4 +146,92 @@ public void testTTLPoliciesUpdate() throws Exception {
topicRefMock.onUpdate(topicPolicies);
verify(topicRefMock, times(2)).checkMessageExpiry();
}

@Test
public void testTtlFilteredByIgnoreSubscriptions() throws Exception {
String topicName = "persistent://prop/ns-abc/testTTLFilteredByIgnoreSubscriptions";
String subName = "__SUB_FILTER";
cleanup();
Set<String> ignoredSubscriptions = new HashSet<>();
ignoredSubscriptions.add(subName);
int defaultTtl = 5;
conf.setAdditionalSystemCursorNames(ignoredSubscriptions);
conf.setTtlDurationDefaultInSeconds(defaultTtl);
super.baseSetup();

pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName)
.subscribe().close();

@Cleanup
org.apache.pulsar.client.api.Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

final int messages = 10;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
producer.send(message);
}
producer.close();

Optional<Topic> topic = pulsar.getBrokerService().getTopicReference(topicName);
assertTrue(topic.isPresent());
PersistentSubscription subscription = (PersistentSubscription) topic.get().getSubscription(subName);

Thread.sleep((defaultTtl - 1) * 1000);
topic.get().checkMessageExpiry();
// Wait the message expire task done and make sure the message does not expire early.
Thread.sleep(1000);
assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10);
Thread.sleep(2000);
topic.get().checkMessageExpiry();
// Wait the message expire task done.
retryStrategically((test) -> subscription.getNumberOfEntriesInBacklog(false) == 0, 5, 200);
// The message should not expire because the subscription is ignored.
assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10);

conf.setAdditionalSystemCursorNames(new TreeSet<>());
}

@Test
public void testTtlWithoutIgnoreSubscriptions() throws Exception {
String topicName = "persistent://prop/ns-abc/testTTLWithoutIgnoreSubscriptions";
String subName = "__SUB_FILTER";
cleanup();
int defaultTtl = 5;
conf.setTtlDurationDefaultInSeconds(defaultTtl);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
super.baseSetup();

pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName)
.subscribe().close();

@Cleanup
org.apache.pulsar.client.api.Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

final int messages = 10;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
producer.send(message);
}
producer.close();

Optional<Topic> topic = pulsar.getBrokerService().getTopicReference(topicName);
assertTrue(topic.isPresent());
PersistentSubscription subscription = (PersistentSubscription) topic.get().getSubscription(subName);

Thread.sleep((defaultTtl - 1) * 1000);
topic.get().checkMessageExpiry();
// Wait the message expire task done and make sure the message does not expire early.
Thread.sleep(1000);
assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10);
Thread.sleep(2000);
topic.get().checkMessageExpiry();
// Wait the message expire task done and make sure the message expired.
retryStrategically((test) -> subscription.getNumberOfEntriesInBacklog(false) == 0, 5, 200);
assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0);
}

}

0 comments on commit bed032e

Please sign in to comment.