Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][meta] Fix ephemeral Zookeeper put which creates a persistent znode #23902

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ public static String getTlsFileForClient(String name) {

private final List<AutoCloseable> closeables = new ArrayList<>();

// Set to true in test's constructor to use a real Zookeeper (TestZKServer)
protected boolean useTestZookeeper;

public MockedPulsarServiceBaseTest() {
resetConfig();
}
Expand Down Expand Up @@ -461,7 +464,6 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
PulsarTestContext.Builder builder = PulsarTestContext.builder()
.spyByDefault()
.config(conf)
.withMockZookeeper(true)
.pulsarServiceCustomizer(pulsarService -> {
try {
beforePulsarStart(pulsarService);
Expand All @@ -470,9 +472,25 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
}
})
.brokerServiceCustomizer(this::customizeNewBrokerService);
configureMetadataStores(builder);
return builder;
}

/**
* Configures the metadata stores for the PulsarTestContext.Builder instance.
* Set useTestZookeeper to true in the test's constructor to use TestZKServer which is a real ZooKeeper
* implementation.
*
* @param builder the PulsarTestContext.Builder instance to configure
*/
protected void configureMetadataStores(PulsarTestContext.Builder builder) {
if (useTestZookeeper) {
builder.withTestZookeeper();
} else {
builder.withMockZookeeper();
}
}

protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
return createAdditionalPulsarTestContext(conf, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -65,16 +64,19 @@
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.metadata.TestZKServer;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
import org.mockito.internal.util.MockUtil;
Expand Down Expand Up @@ -477,16 +479,77 @@ public Builder withMockZookeeper(boolean useSeparateGlobalZk) {

private MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
List<ACL> dummyAclList = new ArrayList<>(0);
initializeZookeeper(zk);
registerCloseable(zk::shutdown);
return zk;
}

private static void initializeZookeeper(ZooKeeper zk) throws KeeperException, InterruptedException {
ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
"".getBytes(StandardCharsets.UTF_8), dummyAclList, CreateMode.PERSISTENT);
"".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}

registerCloseable(zk::shutdown);
return zk;
/**
* Configure this PulsarTestContext to use a test ZooKeeper instance which is
* shared for both the local and configuration metadata stores.
*
* @return the builder
*/
public Builder withTestZookeeper() {
return withTestZookeeper(false);
}

/**
* Configure this PulsarTestContext to use a test ZooKeeper instance.
*
* @param useSeparateGlobalZk if true, the global (configuration) zookeeper will be a separate instance
* @return the builder
*/
public Builder withTestZookeeper(boolean useSeparateGlobalZk) {
try {
TestZKServer localZk = createTestZookeeper();
MetadataStoreExtended localStore =
createTestZookeeperMetadataStore(localZk, MetadataStoreConfig.METADATA_STORE);
localMetadataStore(localStore);
MetadataStoreExtended configStore;
if (useSeparateGlobalZk) {
TestZKServer globalZk = createTestZookeeper();
configStore = createTestZookeeperMetadataStore(globalZk,
MetadataStoreConfig.CONFIGURATION_METADATA_STORE);
} else {
configStore =
createTestZookeeperMetadataStore(localZk, MetadataStoreConfig.CONFIGURATION_METADATA_STORE);
}
configurationMetadataStore(configStore);
} catch (Exception e) {
throw new RuntimeException(e);
}
return this;
}

private TestZKServer createTestZookeeper() throws Exception {
TestZKServer testZKServer = new TestZKServer();
try (ZooKeeper zkc = new ZooKeeper(testZKServer.getConnectionString(), 5000, event -> {
})) {
initializeZookeeper(zkc);
}
registerCloseable(testZKServer);
return testZKServer;
}

private MetadataStoreExtended createTestZookeeperMetadataStore(TestZKServer zkServer,
String metadataStoreName) {
try {
MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + zkServer.getConnectionString(),
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
registerCloseable(store);
return store;
} catch (MetadataStoreException e) {
throw new RuntimeException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,28 @@
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.SkipException;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class BrokerServiceLookupTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(BrokerServiceLookupTest.class);

@DataProvider
private static Object[] booleanValues() {
return new Object[]{ true, false };
}

@Factory(dataProvider = "booleanValues")
public BrokerServiceLookupTest(boolean useTestZookeeper) {
// when set to true, TestZKServer is used which is a real ZooKeeper implementation
this.useTestZookeeper = useTestZookeeper;
}

@BeforeMethod
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -1197,6 +1211,9 @@

@Test
public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() throws Exception {
if (useTestZookeeper) {
throw new SkipException("This test case depends on MockZooKeeper");
}
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(tpName);
PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
Expand Down Expand Up @@ -1338,7 +1355,7 @@

try {
future.get();
fail();

Check failure on line 1358 in pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Brokers - Client Api

BrokerServiceLookupTest.testLookupConnectionNotCloseIfFailedToAcquireOwnershipOfBundle

null
} catch (ExecutionException e) {
log.info("getBroker failed with {}: {}", e.getCause().getClass().getName(), e.getMessage());
assertTrue(e.getCause() instanceof PulsarClientException.BrokerMetadataException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ private void internalStorePut(OpPut opPut) {
future.completeExceptionally(getException(Code.BADVERSION, opPut.getPath()));
} else {
// The z-node does not exist, let's create it first
put(opPut.getPath(), opPut.getData(), Optional.of(-1L)).thenAccept(
s -> future.complete(s))
put(opPut.getPath(), opPut.getData(), Optional.of(-1L), opPut.getOptions())
.thenAccept(s -> future.complete(s))
.exceptionally(ex -> {
if (ex.getCause() instanceof BadVersionException) {
// The z-node exist now, let's overwrite it
Expand Down Expand Up @@ -478,7 +478,7 @@ public void close() throws Exception {

private Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) {
return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(),
zkStat.getEphemeralOwner() != -1,
zkStat.getEphemeralOwner() != 0,
zkStat.getEphemeralOwner() == zkc.getSessionId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.metadata;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -66,4 +67,38 @@ public void sequentialKeys(String provider, Supplier<String> urlSupplier) throws
assertNotEquals(seq1, seq2);
assertTrue(n1 < n2);
}

@Test(dataProvider = "impl")
public void testPersistentOrEphemeralPut(String provider, Supplier<String> urlSupplier) throws Exception {
final String key1 = newKey();
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
store.put(key1, "value-1".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join();
var value = store.get(key1).join().get();
assertEquals(value.getValue(), "value-1".getBytes());
assertFalse(value.getStat().isEphemeral());
assertTrue(value.getStat().isFirstVersion());
var version = value.getStat().getVersion();

store.put(key1, "value-2".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join();
value = store.get(key1).join().get();
assertEquals(value.getValue(), "value-2".getBytes());
assertFalse(value.getStat().isEphemeral());
assertEquals(value.getStat().getVersion(), version + 1);

final String key2 = newKey();
store.put(key2, "value-4".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
value = store.get(key2).join().get();
assertEquals(value.getValue(), "value-4".getBytes());
assertTrue(value.getStat().isEphemeral());
assertTrue(value.getStat().isFirstVersion());
version = value.getStat().getVersion();


store.put(key2, "value-5".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
value = store.get(key2).join().get();
assertEquals(value.getValue(), "value-5".getBytes());
assertTrue(value.getStat().isEphemeral());
assertEquals(value.getStat().getVersion(), version + 1);
}

}
42 changes: 27 additions & 15 deletions testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
import org.slf4j.LoggerFactory;

public class MockZooKeeper extends ZooKeeper {
// ephemeralOwner value for persistent nodes
private static final long NOT_EPHEMERAL = 0L;

@Data
@AllArgsConstructor
private static class MockZNode {
Expand All @@ -84,13 +87,13 @@ static MockZNode of(byte[] content, int version, long ephemeralOwner) {
private ExecutorService executor;

private Watcher sessionWatcher;
private long sessionId = 0L;
private long sessionId = 1L;
private int readOpDelayMs;

private ReentrantLock mutex;

private AtomicLong sequentialIdGenerator;
private ThreadLocal<Long> epheralOwnerThreadLocal;
private ThreadLocal<Long> ephemeralOwnerThreadLocal;

//see details of Objenesis caching - http://objenesis.org/details.html
//see supported jvms - https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md
Expand Down Expand Up @@ -156,7 +159,7 @@ private static MockZooKeeper createMockZooKeeperInstance(ExecutorService executo
ObjectInstantiator<MockZooKeeper> mockZooKeeperInstantiator =
objenesis.getInstantiatorOf(MockZooKeeper.class);
MockZooKeeper zk = mockZooKeeperInstantiator.newInstance();
zk.epheralOwnerThreadLocal = new ThreadLocal<>();
zk.ephemeralOwnerThreadLocal = new ThreadLocal<>();
zk.init(executor);
zk.readOpDelayMs = readOpDelayMs;
zk.mutex = new ReentrantLock();
Expand Down Expand Up @@ -278,7 +281,7 @@ public String create(String path, byte[] data, List<ACL> acl, CreateMode createM
MockZNode.of(parentNode.getContent(), parentVersion + 1, parentNode.getEphemeralOwner()));
}

tree.put(path, MockZNode.of(data, 0, createMode.isEphemeral() ? getEphemeralOwner() : -1L));
tree.put(path, MockZNode.of(data, 0, createMode.isEphemeral() ? getEphemeralOwner() : NOT_EPHEMERAL));

toNotifyCreate.addAll(watchers.get(path));

Expand Down Expand Up @@ -310,19 +313,19 @@ public String create(String path, byte[] data, List<ACL> acl, CreateMode createM
}

protected long getEphemeralOwner() {
Long epheralOwner = epheralOwnerThreadLocal.get();
if (epheralOwner != null) {
return epheralOwner;
Long ephemeralOwner = ephemeralOwnerThreadLocal.get();
if (ephemeralOwner != null) {
return ephemeralOwner;
}
return getSessionId();
}

public void overrideEpheralOwner(long epheralOwner) {
epheralOwnerThreadLocal.set(epheralOwner);
public void overrideEphemeralOwner(long ephemeralOwner) {
ephemeralOwnerThreadLocal.set(ephemeralOwner);
}

public void removeEpheralOwnerOverride() {
epheralOwnerThreadLocal.remove();
public void removeEphemeralOwnerOverride() {
ephemeralOwnerThreadLocal.remove();
}

@Override
Expand Down Expand Up @@ -373,7 +376,7 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
} else {
tree.put(name, MockZNode.of(data, 0,
createMode != null && createMode.isEphemeral() ? getEphemeralOwner() : -1L));
createMode != null && createMode.isEphemeral() ? getEphemeralOwner() : NOT_EPHEMERAL));
watchers.removeAll(name);
unlockIfLocked();
cb.processResult(0, path, ctx, name);
Expand Down Expand Up @@ -678,9 +681,7 @@ private static Stat createStatForZNode(MockZNode zNode) {

private static Stat applyToStat(MockZNode zNode, Stat stat) {
stat.setVersion(zNode.getVersion());
if (zNode.getEphemeralOwner() != -1L) {
stat.setEphemeralOwner(zNode.getEphemeralOwner());
}
stat.setEphemeralOwner(zNode.getEphemeralOwner());
return stat;
}

Expand Down Expand Up @@ -1199,5 +1200,16 @@ private void triggerPersistentWatches(String path, String parent, EventType even
});
}

public void deleteEphemeralNodes(long sessionId) {
if (sessionId != NOT_EPHEMERAL) {
lock();
try {
tree.values().removeIf(zNode -> zNode.getEphemeralOwner() == sessionId);
} finally {
unlockIfLocked();
}
}
}

private static final Logger log = LoggerFactory.getLogger(MockZooKeeper.class);
}
Loading
Loading