From c82825bb9b8ef98e2b120bcdd65a7d5ddb134549 Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Fri, 30 Jun 2023 16:35:47 +0800 Subject: [PATCH] [feat][broker][PIP-278] Support pluggable topic compaction service - part1 (#20645) --- .../pulsar/broker/ServiceConfiguration.java | 6 + .../apache/pulsar/broker/PulsarService.java | 2 +- .../service/AbstractBaseDispatcher.java | 4 +- .../service/persistent/PersistentTopic.java | 2 +- ....java => PulsarCompactorSubscription.java} | 8 +- .../pulsar/compaction/CompactedTopicImpl.java | 10 +- .../compaction/CompactionServiceFactory.java | 48 +++++++ .../PulsarCompactionServiceFactory.java | 84 +++++++++++++ .../PulsarTopicCompactionService.java | 111 ++++++++++++++++ .../compaction/TopicCompactionService.java | 63 ++++++++++ .../broker/service/PersistentTopicTest.java | 6 +- .../TopicCompactionServiceTest.java | 118 ++++++++++++++++++ 12 files changed, 449 insertions(+), 13 deletions(-) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/{CompactorSubscription.java => PulsarCompactorSubscription.java} (93%) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 9398d349be7d9..b41a562fbd788 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3166,6 +3166,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private int transactionPendingAckBatchedWriteMaxDelayInMillis = 1; + @FieldContext( + category = CATEGORY_SERVER, + doc = "The class name of the factory that implements the topic compaction service." + ) + private String compactionServiceFactoryClassName = "org.apache.pulsar.compaction.PulsarCompactionServiceFactory"; + /**** --- KeyStore TLS config variables. --- ****/ @FieldContext( category = CATEGORY_KEYSTORE_TLS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 00dfad40191bd..2b64a8cfb5c6f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1470,7 +1470,7 @@ public BookKeeperClientFactory getBookKeeperClientFactory() { return bkClientFactory; } - protected synchronized ScheduledExecutorService getCompactorExecutor() { + public synchronized ScheduledExecutorService getCompactorExecutor() { if (this.compactorExecutor == null) { compactorExecutor = Executors.newSingleThreadScheduledExecutor( new ExecutorProvider.ExtendedThreadFactory("compaction")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 8f6caa7a20801..437a6527e8538 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -37,10 +37,10 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; -import org.apache.pulsar.broker.service.persistent.CompactorSubscription; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.api.transaction.TxnID; @@ -301,7 +301,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i } private void individualAcknowledgeMessageIfNeeded(Position position, Map properties) { - if (!(subscription instanceof CompactorSubscription)) { + if (!(subscription instanceof PulsarCompactorSubscription)) { subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Individual, properties); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2a0c229daf6c2..12691d1c67796 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -483,7 +483,7 @@ private PersistentSubscription createPersistentSubscription(String subscriptionN boolean replicated, Map subscriptionProperties) { Objects.requireNonNull(compactedTopic); if (isCompactionSubscription(subscriptionName)) { - return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor); + return new PulsarCompactorSubscription(this, compactedTopic, subscriptionName, cursor); } else { return new PersistentSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java similarity index 93% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java index ec34aeffbec4c..dbb09f6ac39fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java @@ -32,11 +32,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class CompactorSubscription extends PersistentSubscription { +public class PulsarCompactorSubscription extends PersistentSubscription { private final CompactedTopic compactedTopic; - public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic, - String subscriptionName, ManagedCursor cursor) { + public PulsarCompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic, + String subscriptionName, ManagedCursor cursor) { super(topic, subscriptionName, cursor, false); checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)); this.compactedTopic = compactedTopic; @@ -106,5 +106,5 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { } } - private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class); + private static final Logger log = LoggerFactory.getLogger(PulsarCompactorSubscription.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index c8114f9adb652..e2d3de9c19a29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -60,7 +61,7 @@ public class CompactedTopicImpl implements CompactedTopic { private final BookKeeper bk; private PositionImpl compactionHorizon = null; - private CompletableFuture compactedTopicContext = null; + private volatile CompletableFuture compactedTopicContext = null; public CompactedTopicImpl(BookKeeper bk) { this.bk = bk; @@ -258,7 +259,7 @@ private static CompletableFuture tryDeleteCompactedLedger(BookKeeper bk, l return promise; } - private static CompletableFuture> readEntries(LedgerHandle lh, long from, long to) { + static CompletableFuture> readEntries(LedgerHandle lh, long from, long to) { CompletableFuture> promise = new CompletableFuture<>(); lh.asyncReadEntries(from, to, @@ -320,6 +321,11 @@ private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) public synchronized Optional getCompactionHorizon() { return Optional.ofNullable(this.compactionHorizon); } + + @Nullable + public CompletableFuture getCompactedTopicContextFuture() { + return compactedTopicContext; + } private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java new file mode 100644 index 0000000000000..de1abfbea9558 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import com.google.common.annotations.Beta; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.classification.InterfaceAudience; + +@Beta +@InterfaceAudience.Public +public interface CompactionServiceFactory extends AutoCloseable { + + /** + * Initialize the compaction service factory. + * + * @param pulsarService + * the pulsar service instance + * @return a future represents the initialization result + */ + CompletableFuture initialize(@Nonnull PulsarService pulsarService); + + /** + * Create a new topic compaction service for topic. + * + * @param topic + * the topic name + * @return a future represents the topic compaction service + */ + CompletableFuture newTopicCompactionService(@Nonnull String topic); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java new file mode 100644 index 0000000000000..dd817ca35f145 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; + +public class PulsarCompactionServiceFactory implements CompactionServiceFactory { + + private PulsarService pulsarService; + + private volatile Compactor compactor; + + @VisibleForTesting + public Compactor getCompactor() throws PulsarServerException { + if (compactor == null) { + synchronized (this) { + if (compactor == null) { + compactor = newCompactor(); + } + } + } + return compactor; + } + + @Nullable + public Compactor getNullableCompactor() { + return compactor; + } + + protected Compactor newCompactor() throws PulsarServerException { + return new TwoPhaseCompactor(pulsarService.getConfiguration(), + pulsarService.getClient(), pulsarService.getBookKeeperClient(), + pulsarService.getCompactorExecutor()); + } + + @Override + public CompletableFuture initialize(@Nonnull PulsarService pulsarService) { + Objects.requireNonNull(pulsarService); + this.pulsarService = pulsarService; + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture newTopicCompactionService(@Nonnull String topic) { + Objects.requireNonNull(topic); + PulsarTopicCompactionService pulsarTopicCompactionService = + new PulsarTopicCompactionService(topic, pulsarService.getBookKeeperClient(), () -> { + try { + return this.getCompactor(); + } catch (Throwable e) { + throw new CompletionException(e); + } + }); + return CompletableFuture.completedFuture(pulsarTopicCompactionService); + } + + @Override + public void close() throws Exception { + // noop + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java new file mode 100644 index 0000000000000..0a8bf9d69a277 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY; +import static org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED; +import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import javax.annotation.Nonnull; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.util.FutureUtil; + + +public class PulsarTopicCompactionService implements TopicCompactionService { + + private final String topic; + + private final CompactedTopicImpl compactedTopic; + + private final Supplier compactorSupplier; + + public PulsarTopicCompactionService(String topic, BookKeeper bookKeeper, + Supplier compactorSupplier) { + this.topic = topic; + this.compactedTopic = new CompactedTopicImpl(bookKeeper); + this.compactorSupplier = compactorSupplier; + } + + @Override + public CompletableFuture compact() { + Compactor compactor; + try { + compactor = compactorSupplier.get(); + } catch (Throwable e) { + return CompletableFuture.failedFuture(e); + } + return compactor.compact(topic).thenApply(x -> null); + } + + @Override + public CompletableFuture> readCompactedEntries(@Nonnull Position startPosition, + int numberOfEntriesToRead) { + Objects.requireNonNull(startPosition); + checkArgument(numberOfEntriesToRead > 0); + + CompletableFuture> resultFuture = new CompletableFuture<>(); + + Objects.requireNonNull(compactedTopic.getCompactedTopicContextFuture()).thenCompose( + (context) -> findStartPoint((PositionImpl) startPosition, context.ledger.getLastAddConfirmed(), + context.cache).thenCompose((startPoint) -> { + if (startPoint == COMPACT_LEDGER_EMPTY || startPoint == NEWER_THAN_COMPACTED) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + long endPoint = + Math.min(context.ledger.getLastAddConfirmed(), startPoint + numberOfEntriesToRead); + return CompactedTopicImpl.readEntries(context.ledger, startPoint, endPoint); + })).whenComplete((result, ex) -> { + if (ex == null) { + resultFuture.complete(result); + } else { + ex = FutureUtil.unwrapCompletionException(ex); + if (ex instanceof NoSuchElementException) { + resultFuture.complete(Collections.emptyList()); + } else { + resultFuture.completeExceptionally(ex); + } + } + }); + + return resultFuture; + } + + @Override + public CompletableFuture readLastCompactedEntry() { + return compactedTopic.readLastEntryOfCompactedLedger(); + } + + @Override + public CompletableFuture getLastCompactedPosition() { + return CompletableFuture.completedFuture(compactedTopic.getCompactionHorizon().orElse(null)); + } + + public CompactedTopicImpl getCompactedTopic() { + return compactedTopic; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java new file mode 100644 index 0000000000000..6b64b9ce0fda6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import com.google.common.annotations.Beta; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.common.classification.InterfaceAudience; + +@Beta +@InterfaceAudience.Public +public interface TopicCompactionService { + /** + * Compact the topic. + * Topic Compaction is a key-based retention mechanism. It keeps the most recent value for a given key and + * user reads compacted data from TopicCompactionService. + * + * @return a future that will be completed when the compaction is done. + */ + CompletableFuture compact(); + + /** + * Read the compacted entries from the TopicCompactionService. + * + * @param startPosition the position to start reading from. + * @param numberOfEntriesToRead the maximum number of entries to read. + * @return a future that will be completed with the list of entries, this list can be null. + */ + CompletableFuture> readCompactedEntries(@Nonnull Position startPosition, int numberOfEntriesToRead); + + /** + * Read the last compacted entry from the TopicCompactionService. + * + * @return a future that will be completed with the compacted last entry, this entry can be null. + */ + CompletableFuture readLastCompactedEntry(); + + /** + * Get the last compacted position from the TopicCompactionService. + * + * @return a future that will be completed with the last compacted position, this position can be null. + */ + CompletableFuture getLastCompactedPosition(); +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 45ef58bb7038c..fefed1aaa0a6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -93,7 +93,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.persistent.CompactorSubscription; +import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription; import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; @@ -1792,7 +1792,7 @@ public void testCompactorSubscription() { CompactedTopic compactedTopic = mock(CompactedTopic.class); when(compactedTopic.newCompactedLedger(any(Position.class), anyLong())) .thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicContext.class))); - PersistentSubscription sub = new CompactorSubscription(topic, compactedTopic, + PersistentSubscription sub = new PulsarCompactorSubscription(topic, compactedTopic, Compactor.COMPACTION_SUBSCRIPTION, cursorMock); PositionImpl position = new PositionImpl(1, 1); @@ -1816,7 +1816,7 @@ public void testCompactorSubscriptionUpdatedOnInit() { CompactedTopic compactedTopic = mock(CompactedTopic.class); when(compactedTopic.newCompactedLedger(any(Position.class), anyLong())) .thenReturn(CompletableFuture.completedFuture(null)); - new CompactorSubscription(topic, compactedTopic, Compactor.COMPACTION_SUBSCRIPTION, cursorMock); + new PulsarCompactorSubscription(topic, compactedTopic, Compactor.COMPACTION_SUBSCRIPTION, cursorMock); verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java new file mode 100644 index 0000000000000..5810e0180d07c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.apache.pulsar.compaction.Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import static org.testng.Assert.assertEquals; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import lombok.Cleanup; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.testng.annotations.Test; + +public class TopicCompactionServiceTest extends CompactorTest { + + @Test + public void test() throws PulsarClientException, PulsarAdminException { + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + String defaultTenant = "prop-xyz"; + admin.tenants().createTenant(defaultTenant, tenantInfo); + String defaultNamespace = defaultTenant + "/ns1"; + admin.namespaces().createNamespace(defaultNamespace, Set.of("test")); + + String topic = "persistent://prop-xyz/ns1/my-topic"; + + PulsarTopicCompactionService service = new PulsarTopicCompactionService(topic, bk, () -> compactor); + + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + producer.newMessage() + .key("a") + .value("A_1".getBytes()) + .send(); + producer.newMessage() + .key("b") + .value("B_1".getBytes()) + .send(); + producer.newMessage() + .key("a") + .value("A_2".getBytes()) + .send(); + producer.newMessage() + .key("b") + .value("B_2".getBytes()) + .send(); + producer.newMessage() + .key("b") + .value("B_3".getBytes()) + .send(); + + producer.flush(); + + service.compact().join(); + + + CompactedTopicImpl compactedTopic = service.getCompactedTopic(); + + Long compactedLedger = admin.topics().getInternalStats(topic).cursors.get(COMPACTION_SUBSCRIPTION).properties.get( + COMPACTED_TOPIC_LEDGER_PROPERTY); + String markDeletePosition = + admin.topics().getInternalStats(topic).cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition; + String[] split = markDeletePosition.split(":"); + compactedTopic.newCompactedLedger(PositionImpl.get(Long.valueOf(split[0]), Long.valueOf(split[1])), + compactedLedger).join(); + + Position lastCompactedPosition = service.getLastCompactedPosition().join(); + assertEquals(admin.topics().getInternalStats(topic).lastConfirmedEntry, lastCompactedPosition.toString()); + + List entries = service.readCompactedEntries(PositionImpl.EARLIEST, 4).join(); + assertEquals(entries.size(), 2); + entries.stream().map(e -> { + try { + return MessageImpl.deserialize(e.getDataBuffer()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }).forEach(message -> { + String data = new String(message.getData()); + if (Objects.equals(message.getKey(), "a")) { + assertEquals(data, "A_2"); + } else { + assertEquals(data, "B_3"); + } + }); + } +}