diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 42e2c00f73acf..81d1a105c175f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -150,6 +150,9 @@ public static String getTlsFileForClient(String name) { private final List closeables = new ArrayList<>(); + // Set to true in test's constructor to use a real Zookeeper (TestZKServer) + protected boolean useTestZookeeper; + public MockedPulsarServiceBaseTest() { resetConfig(); } @@ -461,7 +464,6 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig PulsarTestContext.Builder builder = PulsarTestContext.builder() .spyByDefault() .config(conf) - .withMockZookeeper(true) .pulsarServiceCustomizer(pulsarService -> { try { beforePulsarStart(pulsarService); @@ -470,9 +472,25 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig } }) .brokerServiceCustomizer(this::customizeNewBrokerService); + configureMetadataStores(builder); return builder; } + /** + * Configures the metadata stores for the PulsarTestContext.Builder instance. + * Set useTestZookeeper to true in the test's constructor to use TestZKServer which is a real ZooKeeper + * implementation. + * + * @param builder the PulsarTestContext.Builder instance to configure + */ + protected void configureMetadataStores(PulsarTestContext.Builder builder) { + if (useTestZookeeper) { + builder.withTestZookeeper(); + } else { + builder.withMockZookeeper(); + } + } + protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception { return createAdditionalPulsarTestContext(conf, null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index 6403c3bcec4c3..82a1b574ac921 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -65,6 +64,7 @@ import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; +import org.apache.pulsar.metadata.TestZKServer; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -72,9 +72,11 @@ import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.MockZooKeeperSession; -import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; import org.jetbrains.annotations.NotNull; import org.mockito.Mockito; import org.mockito.internal.util.MockUtil; @@ -477,16 +479,77 @@ public Builder withMockZookeeper(boolean useSeparateGlobalZk) { private MockZooKeeper createMockZooKeeper() throws Exception { MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService()); - List dummyAclList = new ArrayList<>(0); + initializeZookeeper(zk); + registerCloseable(zk::shutdown); + return zk; + } + private static void initializeZookeeper(ZooKeeper zk) throws KeeperException, InterruptedException { ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000, - "".getBytes(StandardCharsets.UTF_8), dummyAclList, CreateMode.PERSISTENT); + "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList, + zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } - registerCloseable(zk::shutdown); - return zk; + /** + * Configure this PulsarTestContext to use a test ZooKeeper instance which is + * shared for both the local and configuration metadata stores. + * + * @return the builder + */ + public Builder withTestZookeeper() { + return withTestZookeeper(false); + } + + /** + * Configure this PulsarTestContext to use a test ZooKeeper instance. + * + * @param useSeparateGlobalZk if true, the global (configuration) zookeeper will be a separate instance + * @return the builder + */ + public Builder withTestZookeeper(boolean useSeparateGlobalZk) { + try { + TestZKServer localZk = createTestZookeeper(); + MetadataStoreExtended localStore = + createTestZookeeperMetadataStore(localZk, MetadataStoreConfig.METADATA_STORE); + localMetadataStore(localStore); + MetadataStoreExtended configStore; + if (useSeparateGlobalZk) { + TestZKServer globalZk = createTestZookeeper(); + configStore = createTestZookeeperMetadataStore(globalZk, + MetadataStoreConfig.CONFIGURATION_METADATA_STORE); + } else { + configStore = + createTestZookeeperMetadataStore(localZk, MetadataStoreConfig.CONFIGURATION_METADATA_STORE); + } + configurationMetadataStore(configStore); + } catch (Exception e) { + throw new RuntimeException(e); + } + return this; + } + + private TestZKServer createTestZookeeper() throws Exception { + TestZKServer testZKServer = new TestZKServer(); + try (ZooKeeper zkc = new ZooKeeper(testZKServer.getConnectionString(), 5000, event -> { + })) { + initializeZookeeper(zkc); + } + registerCloseable(testZKServer); + return testZKServer; + } + + private MetadataStoreExtended createTestZookeeperMetadataStore(TestZKServer zkServer, + String metadataStoreName) { + try { + MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + zkServer.getConnectionString(), + MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build()); + registerCloseable(store); + return store; + } catch (MetadataStoreException e) { + throw new RuntimeException(e); + } } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 07deb9007c487..34db053271e1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -112,14 +112,28 @@ import org.awaitility.reflect.WhiteboxImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.SkipException; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; @Test(groups = "broker-api") public class BrokerServiceLookupTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(BrokerServiceLookupTest.class); + @DataProvider + private static Object[] booleanValues() { + return new Object[]{ true, false }; + } + + @Factory(dataProvider = "booleanValues") + public BrokerServiceLookupTest(boolean useTestZookeeper) { + // when set to true, TestZKServer is used which is a real ZooKeeper implementation + this.useTestZookeeper = useTestZookeeper; + } + @BeforeMethod @Override protected void setup() throws Exception { @@ -1197,6 +1211,9 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat @Test public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() throws Exception { + if (useTestZookeeper) { + throw new SkipException("This test case depends on MockZooKeeper"); + } String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); admin.topics().createNonPartitionedTopic(tpName); PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 4c24aa5938b93..f0971791ae0ef 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -439,8 +439,8 @@ private void internalStorePut(OpPut opPut) { future.completeExceptionally(getException(Code.BADVERSION, opPut.getPath())); } else { // The z-node does not exist, let's create it first - put(opPut.getPath(), opPut.getData(), Optional.of(-1L)).thenAccept( - s -> future.complete(s)) + put(opPut.getPath(), opPut.getData(), Optional.of(-1L), opPut.getOptions()) + .thenAccept(s -> future.complete(s)) .exceptionally(ex -> { if (ex.getCause() instanceof BadVersionException) { // The z-node exist now, let's overwrite it @@ -478,7 +478,7 @@ public void close() throws Exception { private Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) { return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(), - zkStat.getEphemeralOwner() != -1, + zkStat.getEphemeralOwner() != 0, zkStat.getEphemeralOwner() == zkc.getSessionId()); } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java index 9a38cdbcd2f85..b71511aabceae 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.metadata; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -66,4 +67,38 @@ public void sequentialKeys(String provider, Supplier urlSupplier) throws assertNotEquals(seq1, seq2); assertTrue(n1 < n2); } + + @Test(dataProvider = "impl") + public void testPersistentOrEphemeralPut(String provider, Supplier urlSupplier) throws Exception { + final String key1 = newKey(); + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + store.put(key1, "value-1".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join(); + var value = store.get(key1).join().get(); + assertEquals(value.getValue(), "value-1".getBytes()); + assertFalse(value.getStat().isEphemeral()); + assertTrue(value.getStat().isFirstVersion()); + var version = value.getStat().getVersion(); + + store.put(key1, "value-2".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join(); + value = store.get(key1).join().get(); + assertEquals(value.getValue(), "value-2".getBytes()); + assertFalse(value.getStat().isEphemeral()); + assertEquals(value.getStat().getVersion(), version + 1); + + final String key2 = newKey(); + store.put(key2, "value-4".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join(); + value = store.get(key2).join().get(); + assertEquals(value.getValue(), "value-4".getBytes()); + assertTrue(value.getStat().isEphemeral()); + assertTrue(value.getStat().isFirstVersion()); + version = value.getStat().getVersion(); + + + store.put(key2, "value-5".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join(); + value = store.get(key2).join().get(); + assertEquals(value.getValue(), "value-5".getBytes()); + assertTrue(value.getStat().isEphemeral()); + assertEquals(value.getStat().getVersion(), version + 1); + } + } diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index f32036e53f001..7b23d0814aab9 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -64,6 +64,9 @@ import org.slf4j.LoggerFactory; public class MockZooKeeper extends ZooKeeper { + // ephemeralOwner value for persistent nodes + private static final long NOT_EPHEMERAL = 0L; + @Data @AllArgsConstructor private static class MockZNode { @@ -84,13 +87,13 @@ static MockZNode of(byte[] content, int version, long ephemeralOwner) { private ExecutorService executor; private Watcher sessionWatcher; - private long sessionId = 0L; + private long sessionId = 1L; private int readOpDelayMs; private ReentrantLock mutex; private AtomicLong sequentialIdGenerator; - private ThreadLocal epheralOwnerThreadLocal; + private ThreadLocal ephemeralOwnerThreadLocal; //see details of Objenesis caching - http://objenesis.org/details.html //see supported jvms - https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md @@ -156,7 +159,7 @@ private static MockZooKeeper createMockZooKeeperInstance(ExecutorService executo ObjectInstantiator mockZooKeeperInstantiator = objenesis.getInstantiatorOf(MockZooKeeper.class); MockZooKeeper zk = mockZooKeeperInstantiator.newInstance(); - zk.epheralOwnerThreadLocal = new ThreadLocal<>(); + zk.ephemeralOwnerThreadLocal = new ThreadLocal<>(); zk.init(executor); zk.readOpDelayMs = readOpDelayMs; zk.mutex = new ReentrantLock(); @@ -278,7 +281,7 @@ public String create(String path, byte[] data, List acl, CreateMode createM MockZNode.of(parentNode.getContent(), parentVersion + 1, parentNode.getEphemeralOwner())); } - tree.put(path, MockZNode.of(data, 0, createMode.isEphemeral() ? getEphemeralOwner() : -1L)); + tree.put(path, MockZNode.of(data, 0, createMode.isEphemeral() ? getEphemeralOwner() : NOT_EPHEMERAL)); toNotifyCreate.addAll(watchers.get(path)); @@ -310,19 +313,19 @@ public String create(String path, byte[] data, List acl, CreateMode createM } protected long getEphemeralOwner() { - Long epheralOwner = epheralOwnerThreadLocal.get(); - if (epheralOwner != null) { - return epheralOwner; + Long ephemeralOwner = ephemeralOwnerThreadLocal.get(); + if (ephemeralOwner != null) { + return ephemeralOwner; } return getSessionId(); } - public void overrideEpheralOwner(long epheralOwner) { - epheralOwnerThreadLocal.set(epheralOwner); + public void overrideEphemeralOwner(long ephemeralOwner) { + ephemeralOwnerThreadLocal.set(ephemeralOwner); } - public void removeEpheralOwnerOverride() { - epheralOwnerThreadLocal.remove(); + public void removeEphemeralOwnerOverride() { + ephemeralOwnerThreadLocal.remove(); } @Override @@ -373,7 +376,7 @@ public void create(final String path, final byte[] data, final List acl, Cr cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); } else { tree.put(name, MockZNode.of(data, 0, - createMode != null && createMode.isEphemeral() ? getEphemeralOwner() : -1L)); + createMode != null && createMode.isEphemeral() ? getEphemeralOwner() : NOT_EPHEMERAL)); watchers.removeAll(name); unlockIfLocked(); cb.processResult(0, path, ctx, name); @@ -678,9 +681,7 @@ private static Stat createStatForZNode(MockZNode zNode) { private static Stat applyToStat(MockZNode zNode, Stat stat) { stat.setVersion(zNode.getVersion()); - if (zNode.getEphemeralOwner() != -1L) { - stat.setEphemeralOwner(zNode.getEphemeralOwner()); - } + stat.setEphemeralOwner(zNode.getEphemeralOwner()); return stat; } @@ -1199,5 +1200,16 @@ private void triggerPersistentWatches(String path, String parent, EventType even }); } + public void deleteEphemeralNodes(long sessionId) { + if (sessionId != NOT_EPHEMERAL) { + lock(); + try { + tree.values().removeIf(zNode -> zNode.getEphemeralOwner() == sessionId); + } finally { + unlockIfLocked(); + } + } + } + private static final Logger log = LoggerFactory.getLogger(MockZooKeeper.class); } diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java index a286a75aa9103..a75402018ac04 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java @@ -88,10 +88,10 @@ public void register(Watcher watcher) { public String create(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { try { - mockZooKeeper.overrideEpheralOwner(getSessionId()); + mockZooKeeper.overrideEphemeralOwner(getSessionId()); return mockZooKeeper.create(path, data, acl, createMode); } finally { - mockZooKeeper.removeEpheralOwnerOverride(); + mockZooKeeper.removeEphemeralOwnerOverride(); } } @@ -99,10 +99,10 @@ public String create(String path, byte[] data, List acl, CreateMode createM public void create(final String path, final byte[] data, final List acl, CreateMode createMode, final AsyncCallback.StringCallback cb, final Object ctx) { try { - mockZooKeeper.overrideEpheralOwner(getSessionId()); + mockZooKeeper.overrideEphemeralOwner(getSessionId()); mockZooKeeper.create(path, data, acl, createMode, cb, ctx); } finally { - mockZooKeeper.removeEpheralOwnerOverride(); + mockZooKeeper.removeEphemeralOwnerOverride(); } } @@ -188,12 +188,22 @@ public void delete(final String path, int version, final VoidCallback cb, final @Override public void multi(Iterable ops, AsyncCallback.MultiCallback cb, Object ctx) { - mockZooKeeper.multi(ops, cb, ctx); + try { + mockZooKeeper.overrideEphemeralOwner(getSessionId()); + mockZooKeeper.multi(ops, cb, ctx); + } finally { + mockZooKeeper.removeEphemeralOwnerOverride(); + } } @Override public List multi(Iterable ops) throws InterruptedException, KeeperException { - return mockZooKeeper.multi(ops); + try { + mockZooKeeper.overrideEphemeralOwner(getSessionId()); + return mockZooKeeper.multi(ops); + } finally { + mockZooKeeper.removeEphemeralOwnerOverride(); + } } @Override @@ -221,12 +231,16 @@ public void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, Object public void close() throws InterruptedException { if (closeMockZooKeeperOnClose) { mockZooKeeper.close(); + } else { + mockZooKeeper.deleteEphemeralNodes(getSessionId()); } } public void shutdown() throws InterruptedException { if (closeMockZooKeeperOnClose) { mockZooKeeper.shutdown(); + } else { + mockZooKeeper.deleteEphemeralNodes(getSessionId()); } }