From 4fbeaad74aa723f1b5a4083808feb3da970f39dd Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 23 Jan 2025 19:52:57 +0530 Subject: [PATCH] Remove code for CuratorLoadQueuePeon (#17657) ZK-based segment loading was completely disabled in #16360 and #15705. This PR removes all related code which is now unused. Changes: - Remove CuratorLoadQueuePeon and CuratorLoadQueuePeonTest - Refactor DruidCoordinatorTest to use a TestLoadQueuePeon instead --- .../loading/CuratorLoadQueuePeon.java | 427 ----------------- .../coordinator/loading/LoadQueuePeon.java | 1 - .../CuratorDruidCoordinatorTest.java | 439 ------------------ .../coordinator/DruidCoordinatorTest.java | 311 +++---------- .../loading/CuratorLoadQueuePeonTest.java | 401 ---------------- .../server/coordinator/simulate/README.md | 1 - 6 files changed, 67 insertions(+), 1513 deletions(-) delete mode 100644 server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java delete mode 100644 server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java delete mode 100644 server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java deleted file mode 100644 index 0ab9d8ab3297..000000000000 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java +++ /dev/null @@ -1,427 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordinator.loading; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.utils.ZKPaths; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.server.coordination.SegmentChangeRequestNoop; -import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; -import org.apache.druid.server.coordinator.stats.Stats; -import org.apache.druid.timeline.DataSegment; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.joda.time.Duration; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Use {@link HttpLoadQueuePeon} instead. - *

- * Objects of this class can be accessed by multiple threads. State wise, this class - * is thread safe and callers of the public methods can expect thread safe behavior. - * Though, like a typical object being accessed by multiple threads, - * callers shouldn't expect strict consistency in results between two calls - * of the same or different methods. - */ -@Deprecated -public class CuratorLoadQueuePeon implements LoadQueuePeon -{ - private static final EmittingLogger log = new EmittingLogger(CuratorLoadQueuePeon.class); - - private final CuratorFramework curator; - private final String basePath; - private final ObjectMapper jsonMapper; - private final ScheduledExecutorService processingExecutor; - - /** - * Threadpool with daemon threads that execute callback actions associated - * with loading or dropping segments. - */ - private final ExecutorService callBackExecutor; - private final Duration loadTimeout; - - private final AtomicLong queuedSize = new AtomicLong(0); - private final AtomicReference stats = new AtomicReference<>(new CoordinatorRunStats()); - - /** - * Needs to be thread safe since it can be concurrently accessed via - * {@link #loadSegment(DataSegment, SegmentAction, LoadPeonCallback)}, - * {@link #actionCompleted(SegmentHolder)}, {@link #getSegmentsToLoad()} and - * {@link #stop()}. - */ - private final ConcurrentSkipListMap segmentsToLoad - = new ConcurrentSkipListMap<>(SegmentHolder.NEWEST_SEGMENT_FIRST); - - /** - * Needs to be thread safe since it can be concurrently accessed via - * {@link #dropSegment(DataSegment, LoadPeonCallback)}, {@link #actionCompleted(SegmentHolder)}, - * {@link #getSegmentsToDrop()} and {@link #stop()} - */ - private final ConcurrentSkipListMap segmentsToDrop - = new ConcurrentSkipListMap<>(SegmentHolder.NEWEST_SEGMENT_FIRST); - - /** - * Needs to be thread safe since it can be concurrently accessed via - * {@link #markSegmentToDrop(DataSegment)}}, {@link #unmarkSegmentToDrop(DataSegment)}} - * and {@link #getSegmentsToDrop()} - */ - private final ConcurrentSkipListSet segmentsMarkedToDrop - = new ConcurrentSkipListSet<>(SegmentHolder.NEWEST_SEGMENT_FIRST); - - /** - * Needs to be thread safe since it can be concurrently accessed via - * {@link #failAssign(SegmentHolder, boolean, Exception)}, {@link #actionCompleted(SegmentHolder)}, - * {@link #getTimedOutSegments()} and {@link #stop()} - */ - private final ConcurrentSkipListSet timedOutSegments = - new ConcurrentSkipListSet<>(SegmentHolder.NEWEST_SEGMENT_FIRST); - - public CuratorLoadQueuePeon( - CuratorFramework curator, - String basePath, - ObjectMapper jsonMapper, - ScheduledExecutorService processingExecutor, - ExecutorService callbackExecutor, - Duration loadTimeout - ) - { - this.curator = curator; - this.basePath = basePath; - this.jsonMapper = jsonMapper; - this.callBackExecutor = callbackExecutor; - this.processingExecutor = processingExecutor; - this.loadTimeout = loadTimeout; - } - - @JsonProperty - @Override - public Set getSegmentsToLoad() - { - return segmentsToLoad.keySet(); - } - - @Override - public Set getSegmentsInQueue() - { - final Set segmentsInQueue = new HashSet<>(); - segmentsInQueue.addAll(segmentsToLoad.values()); - segmentsInQueue.addAll(segmentsToDrop.values()); - return segmentsInQueue; - } - - @JsonProperty - @Override - public Set getSegmentsToDrop() - { - return segmentsToDrop.keySet(); - } - - @JsonProperty - @Override - public Set getSegmentsMarkedToDrop() - { - return segmentsMarkedToDrop; - } - - @Override - public Set getTimedOutSegments() - { - return timedOutSegments; - } - - @Override - public long getSizeOfSegmentsToLoad() - { - return queuedSize.get(); - } - - @Override - public long getLoadRateKbps() - { - return 0; - } - - @Override - public CoordinatorRunStats getAndResetStats() - { - return stats.getAndSet(new CoordinatorRunStats()); - } - - @Override - public void loadSegment(final DataSegment segment, SegmentAction action, @Nullable final LoadPeonCallback callback) - { - SegmentHolder segmentHolder = new SegmentHolder(segment, action, Duration.ZERO, callback); - final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, segmentHolder); - if (existingHolder != null) { - existingHolder.addCallback(callback); - return; - } - log.debug("Asking server peon[%s] to load segment[%s]", basePath, segment.getId()); - queuedSize.addAndGet(segment.getSize()); - processingExecutor.submit(new SegmentChangeProcessor(segmentHolder)); - } - - @Override - public void dropSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback) - { - SegmentHolder segmentHolder = new SegmentHolder(segment, SegmentAction.DROP, Duration.ZERO, callback); - final SegmentHolder existingHolder = segmentsToDrop.putIfAbsent(segment, segmentHolder); - if (existingHolder != null) { - existingHolder.addCallback(callback); - return; - } - log.debug("Asking server peon[%s] to drop segment[%s]", basePath, segment.getId()); - processingExecutor.submit(new SegmentChangeProcessor(segmentHolder)); - } - - @Override - public void markSegmentToDrop(DataSegment dataSegment) - { - segmentsMarkedToDrop.add(dataSegment); - } - - @Override - public void unmarkSegmentToDrop(DataSegment dataSegment) - { - segmentsMarkedToDrop.remove(dataSegment); - } - - private class SegmentChangeProcessor implements Runnable - { - private final SegmentHolder segmentHolder; - - private SegmentChangeProcessor(SegmentHolder segmentHolder) - { - this.segmentHolder = segmentHolder; - } - - @Override - public void run() - { - try { - final String path = ZKPaths.makePath(basePath, segmentHolder.getSegmentIdentifier()); - final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest()); - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - if (log.isDebugEnabled()) { - log.debug( - "ZKNode created for server to [%s] %s [%s]", - basePath, segmentHolder.getAction(), segmentHolder.getSegmentIdentifier() - ); - } - final ScheduledFuture nodeDeletedCheck = scheduleNodeDeletedCheck(path); - final Stat stat = curator.checkExists().usingWatcher( - (CuratorWatcher) watchedEvent -> { - switch (watchedEvent.getType()) { - case NodeDeleted: - // Cancel the check node deleted task since we have already - // been notified by the zk watcher - nodeDeletedCheck.cancel(true); - onZkNodeDeleted(segmentHolder, watchedEvent.getPath()); - break; - default: - // do nothing - } - } - ).forPath(path); - - // Cleanup watcher to avoid memory leak if we missed the NodeDeleted event - if (stat == null) { - // Create a node and then delete it to remove the registered watcher. This is a work-around for - // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event - // that happens for that node. If no events happen, the watcher stays registered foreverz. - // Couple that with the fact that you cannot set a watcher when you create a node, but what we - // want is to create a node and then watch for it to get deleted. The solution is that you *can* - // set a watcher when you check to see if it exists so, we first create the node and then set a - // watcher on its existence. However, if already does not exist by the time the existence check - // returns, then the watcher that was set will never fire (nobody will ever create the node - // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause - // that watcher to fire and delete it right away. - // - // We do not create the existence watcher first, because then it will fire when we create the - // node and we'll have the same race when trying to refresh that watcher. - final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop()); - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload); - onZkNodeDeleted(segmentHolder, path); - } - } - catch (KeeperException.NodeExistsException ne) { - // This is expected when historicals haven't yet picked up processing this segment and coordinator - // tries reassigning it to the same node. - log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed"); - failAssign(segmentHolder, true, null); - } - catch (Exception e) { - failAssign(segmentHolder, false, e); - } - } - - @Nonnull - private ScheduledFuture scheduleNodeDeletedCheck(String path) - { - return processingExecutor.schedule( - () -> { - try { - if (curator.checkExists().forPath(path) != null) { - failAssign( - segmentHolder, - true, - new ISE( - "%s operation timed out and [%s] was never removed! " - + "These segments may still get processed.", - segmentHolder.getAction(), - path - ) - ); - } else { - log.debug("Path [%s] has been removed.", path); - } - } - catch (Exception e) { - log.error(e, "Exception caught and ignored when checking whether zk node was deleted"); - failAssign(segmentHolder, false, e); - } - }, - loadTimeout.getMillis(), - TimeUnit.MILLISECONDS - ); - } - } - - private void actionCompleted(SegmentHolder segmentHolder) - { - switch (segmentHolder.getAction()) { - case LOAD: - case REPLICATE: - case MOVE_TO: - // When load failed a segment will be removed from the segmentsToLoad twice and - // null value will be returned at the second time in which case queueSize may be negative. - // See https://github.com/apache/druid/pull/10362 for more details. - if (null != segmentsToLoad.remove(segmentHolder.getSegment())) { - queuedSize.addAndGet(-segmentHolder.getSegment().getSize()); - timedOutSegments.remove(segmentHolder.getSegment()); - } - break; - case DROP: - segmentsToDrop.remove(segmentHolder.getSegment()); - timedOutSegments.remove(segmentHolder.getSegment()); - break; - default: - throw new UnsupportedOperationException(); - } - executeCallbacks(segmentHolder, true); - } - - - @Override - public void start() - { - } - - @Override - public void stop() - { - for (SegmentHolder holder : segmentsToDrop.values()) { - executeCallbacks(holder, false); - } - segmentsToDrop.clear(); - - for (SegmentHolder holder : segmentsToLoad.values()) { - executeCallbacks(holder, false); - } - segmentsToLoad.clear(); - - timedOutSegments.clear(); - queuedSize.set(0L); - stats.get().clear(); - } - - private void onZkNodeDeleted(SegmentHolder segmentHolder, String path) - { - if (!ZKPaths.getNodeFromPath(path).equals(segmentHolder.getSegmentIdentifier())) { - log.warn( - "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]", - basePath, - path, - segmentHolder - ); - return; - } - actionCompleted(segmentHolder); - log.debug( - "Server[%s] done processing %s of segment [%s]", - basePath, - segmentHolder.getAction(), - path - ); - } - - private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout, Exception e) - { - if (e != null) { - log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, segmentHolder); - } - stats.get().add(Stats.SegmentQueue.FAILED_ACTIONS, 1); - - if (handleTimeout) { - // Avoid removing the segment entry from the load/drop list in case config.getLoadTimeoutDelay() expires. - // This is because the ZK Node is still present and it may be processed after this timeout and so the coordinator - // needs to take this into account. - log.debug( - "Skipping segment removal from [%s] queue, since ZK Node still exists!", - segmentHolder.getAction() - ); - timedOutSegments.add(segmentHolder.getSegment()); - executeCallbacks(segmentHolder, false); - } else { - // This may have failed for a different reason and so act like it was completed. - actionCompleted(segmentHolder); - } - } - - @Override - public boolean cancelOperation(DataSegment segment) - { - return false; - } - - private void executeCallbacks(SegmentHolder holder, boolean success) - { - for (LoadPeonCallback callback : holder.getCallbacks()) { - callBackExecutor.submit(() -> callback.execute(success)); - } - } -} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java index 8e9989717ab9..2d42a49febaf 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java @@ -27,7 +27,6 @@ /** * Supports load queue management. */ -@Deprecated public interface LoadQueuePeon { void start(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java deleted file mode 100644 index c312cb493b59..000000000000 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordinator; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.utils.ZKPaths; -import org.apache.druid.client.BatchServerInventoryView; -import org.apache.druid.client.CoordinatorSegmentWatcherConfig; -import org.apache.druid.client.CoordinatorServerView; -import org.apache.druid.client.DataSourcesSnapshot; -import org.apache.druid.client.DruidServer; -import org.apache.druid.client.ImmutableDruidDataSource; -import org.apache.druid.curator.CuratorTestBase; -import org.apache.druid.curator.CuratorUtils; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.realtime.appenderator.SegmentSchemas; -import org.apache.druid.server.coordination.DruidServerMetadata; -import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.coordinator.balancer.BalancerStrategy; -import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon; -import org.apache.druid.server.coordinator.loading.LoadQueuePeon; -import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; -import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; -import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig; -import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner; -import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; -import org.apache.druid.server.initialization.ZkPathsConfig; -import org.apache.druid.server.metrics.NoopServiceEmitter; -import org.apache.druid.testing.DeadlockDetectingTimeout; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.partition.NoneShardSpec; -import org.easymock.EasyMock; -import org.joda.time.Duration; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestRule; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * This tests zookeeper specific coordinator/load queue/historical interactions, such as moving segments by the balancer - */ -public class CuratorDruidCoordinatorTest extends CuratorTestBase -{ - private DataSourcesSnapshot dataSourcesSnapshot; - private DruidCoordinatorRuntimeParams coordinatorRuntimeParams; - - private LoadQueuePeon sourceLoadQueuePeon; - private LoadQueuePeon destinationLoadQueuePeon; - private PathChildrenCache sourceLoadQueueChildrenCache; - private PathChildrenCache destinationLoadQueueChildrenCache; - - private static final String SEGPATH = "/druid/segments"; - private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1"; - private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2"; - private static final long COORDINATOR_START_DELAY = 1; - private static final long COORDINATOR_PERIOD = 100; - - private BatchServerInventoryView baseView; - private CoordinatorServerView serverView; - private CountDownLatch segmentViewInitLatch; - /** - * The following two fields are changed during {@link #testMoveSegment()}, the change might not be visible from the - * thread, that runs the callback, registered in {@link #setupView()}. volatile modificator doesn't guarantee - * visibility either, but somewhat increases the chances. - */ - private volatile CountDownLatch segmentAddedLatch; - private volatile CountDownLatch segmentRemovedLatch; - private final ObjectMapper jsonMapper; - private final ZkPathsConfig zkPathsConfig; - - private final ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d"); - private final ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d"); - - public CuratorDruidCoordinatorTest() - { - jsonMapper = TestHelper.makeJsonMapper(); - zkPathsConfig = new ZkPathsConfig(); - } - - @Before - public void setUp() throws Exception - { - dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); - coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class); - - setupServerAndCurator(); - curator.start(); - curator.blockUntilConnected(); - curator.create().creatingParentsIfNeeded().forPath(SEGPATH); - curator.create().creatingParentsIfNeeded().forPath(SOURCE_LOAD_PATH); - curator.create().creatingParentsIfNeeded().forPath(DESTINATION_LOAD_PATH); - - final ObjectMapper objectMapper = new DefaultObjectMapper(); - sourceLoadQueueChildrenCache = new PathChildrenCache( - curator, - SOURCE_LOAD_PATH, - true, - true, - Execs.singleThreaded("coordinator_test_path_children_cache_src-%d") - ); - destinationLoadQueueChildrenCache = new PathChildrenCache( - curator, - DESTINATION_LOAD_PATH, - true, - true, - Execs.singleThreaded("coordinator_test_path_children_cache_dest-%d") - ); - sourceLoadQueuePeon = new CuratorLoadQueuePeon( - curator, - SOURCE_LOAD_PATH, - objectMapper, - peonExec, - callbackExec, - Duration.standardMinutes(15) - ); - destinationLoadQueuePeon = new CuratorLoadQueuePeon( - curator, - DESTINATION_LOAD_PATH, - objectMapper, - peonExec, - callbackExec, - Duration.standardMinutes(15) - ); - } - - @After - public void tearDown() throws Exception - { - baseView.stop(); - sourceLoadQueuePeon.stop(); - sourceLoadQueueChildrenCache.close(); - destinationLoadQueueChildrenCache.close(); - tearDownServerAndCurator(); - } - - @Rule - public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS); - - @Test - public void testStopDoesntKillPoolItDoesntOwn() throws Exception - { - setupView(); - sourceLoadQueuePeon.stop(); - Assert.assertFalse(peonExec.isShutdown()); - Assert.assertFalse(callbackExec.isShutdown()); - } - - @Test - public void testMoveSegment() throws Exception - { - segmentViewInitLatch = new CountDownLatch(1); - segmentAddedLatch = new CountDownLatch(4); - - segmentRemovedLatch = new CountDownLatch(0); - - setupView(); - - DruidServer source = new DruidServer( - "localhost:1", - "localhost:1", - null, - 10000000L, - ServerType.HISTORICAL, - "default_tier", - 0 - ); - - DruidServer dest = new DruidServer( - "localhost:2", - "localhost:2", - null, - 10000000L, - ServerType.HISTORICAL, - "default_tier", - 0 - ); - - setupZNodeForServer(source, zkPathsConfig, jsonMapper); - setupZNodeForServer(dest, zkPathsConfig, jsonMapper); - - final List sourceSegments = Arrays.asList( - createSegment("2011-04-01/2011-04-03", "v1"), - createSegment("2011-04-03/2011-04-06", "v1"), - createSegment("2011-04-06/2011-04-09", "v1") - ); - - final List destinationSegments = Collections.singletonList( - createSegment("2011-03-31/2011-04-01", "v1") - ); - - DataSegment segmentToMove = sourceSegments.get(2); - - List sourceSegKeys = new ArrayList<>(); - - for (DataSegment segment : sourceSegments) { - sourceSegKeys.add(announceBatchSegmentsForServer(source, ImmutableSet.of(segment), zkPathsConfig, jsonMapper)); - } - - for (DataSegment segment : destinationSegments) { - announceBatchSegmentsForServer(dest, ImmutableSet.of(segment), zkPathsConfig, jsonMapper); - } - - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - - // these child watchers are used to simulate actions of historicals, announcing a segment on noticing a load queue - // for the destination and unannouncing from source server when noticing a drop request - - CountDownLatch srcCountdown = new CountDownLatch(1); - sourceLoadQueueChildrenCache.getListenable().addListener( - (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> { - if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { - srcCountdown.countDown(); - } else if (CuratorUtils.isChildAdded(event)) { - //Simulate source server dropping segment - unannounceSegmentFromBatchForServer(source, segmentToMove, sourceSegKeys.get(2), zkPathsConfig); - } - } - ); - - CountDownLatch destCountdown = new CountDownLatch(1); - destinationLoadQueueChildrenCache.getListenable().addListener( - (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> { - if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { - destCountdown.countDown(); - } else if (CuratorUtils.isChildAdded(event)) { - //Simulate destination server loading segment - announceBatchSegmentsForServer(dest, ImmutableSet.of(segmentToMove), zkPathsConfig, jsonMapper); - } - } - ); - - sourceLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); - destinationLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); - - Assert.assertTrue(timing.forWaiting().awaitLatch(srcCountdown)); - Assert.assertTrue(timing.forWaiting().awaitLatch(destCountdown)); - - sourceSegments.forEach(source::addDataSegment); - destinationSegments.forEach(dest::addDataSegment); - - segmentRemovedLatch = new CountDownLatch(1); - segmentAddedLatch = new CountDownLatch(1); - - ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class); - EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(sourceSegments.get(2)); - EasyMock.replay(druidDataSource); - EasyMock.expect(coordinatorRuntimeParams.getDataSourcesSnapshot()) - .andReturn(dataSourcesSnapshot).anyTimes(); - final CoordinatorDynamicConfig dynamicConfig = - CoordinatorDynamicConfig.builder().withUseRoundRobinSegmentAssignment(false).build(); - EasyMock.expect(coordinatorRuntimeParams.getCoordinatorDynamicConfig()) - .andReturn(dynamicConfig) - .anyTimes(); - EasyMock.expect(coordinatorRuntimeParams.getSegmentLoadingConfig()) - .andReturn(SegmentLoadingConfig.create(dynamicConfig, 100)) - .anyTimes(); - - final ServerHolder sourceServer = new ServerHolder(source.toImmutableDruidServer(), sourceLoadQueuePeon); - final ServerHolder destinationServer = new ServerHolder(dest.toImmutableDruidServer(), destinationLoadQueuePeon); - final DruidCluster cluster = DruidCluster.builder().add(sourceServer).add(destinationServer).build(); - - final BalancerStrategy balancerStrategy = EasyMock.mock(BalancerStrategy.class); - EasyMock.expect( - balancerStrategy.findDestinationServerToMoveSegment( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject() - ) - ).andReturn(destinationServer).atLeastOnce(); - EasyMock.expect(coordinatorRuntimeParams.getBalancerStrategy()) - .andReturn(balancerStrategy).anyTimes(); - EasyMock.expect(coordinatorRuntimeParams.getDruidCluster()).andReturn(cluster).anyTimes(); - EasyMock.replay(coordinatorRuntimeParams, balancerStrategy); - - EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())) - .andReturn(druidDataSource).anyTimes(); - EasyMock.replay(dataSourcesSnapshot); - - LoadQueueTaskMaster taskMaster = EasyMock.createMock(LoadQueueTaskMaster.class); - EasyMock.expect(taskMaster.isHttpLoading()).andReturn(false).anyTimes(); - EasyMock.replay(taskMaster); - - // Move the segment from source to dest - SegmentLoadQueueManager loadQueueManager = - new SegmentLoadQueueManager(baseView, taskMaster); - StrategicSegmentAssigner segmentAssigner = createSegmentAssigner(loadQueueManager, coordinatorRuntimeParams); - segmentAssigner.moveSegment( - segmentToMove, - sourceServer, - Collections.singletonList(destinationServer) - ); - - // wait for destination server to load segment - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - - // remove load queue key from destination server to trigger adding drop to load queue - curator.delete().guaranteed().forPath(ZKPaths.makePath(DESTINATION_LOAD_PATH, segmentToMove.getId().toString())); - - // wait for drop - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); - - // clean up drop from load queue - curator.delete().guaranteed().forPath(ZKPaths.makePath(SOURCE_LOAD_PATH, segmentToMove.getId().toString())); - - List servers = new ArrayList<>(serverView.getInventory()); - - Assert.assertEquals(2, servers.get(0).getTotalSegments()); - Assert.assertEquals(2, servers.get(1).getTotalSegments()); - } - - private void setupView() throws Exception - { - baseView = new BatchServerInventoryView( - zkPathsConfig, - curator, - jsonMapper, - Predicates.alwaysTrue(), - "test" - ) - { - @Override - public void registerSegmentCallback(Executor exec, final SegmentCallback callback) - { - super.registerSegmentCallback( - exec, - new SegmentCallback() - { - @Override - public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) - { - CallbackAction res = callback.segmentAdded(server, segment); - segmentAddedLatch.countDown(); - return res; - } - - @Override - public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) - { - CallbackAction res = callback.segmentRemoved(server, segment); - segmentRemovedLatch.countDown(); - return res; - } - - @Override - public CallbackAction segmentViewInitialized() - { - CallbackAction res = callback.segmentViewInitialized(); - segmentViewInitLatch.countDown(); - return res; - } - - @Override - public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) - { - return CallbackAction.CONTINUE; - } - } - ); - } - }; - - serverView = new CoordinatorServerView(baseView, new CoordinatorSegmentWatcherConfig(), new NoopServiceEmitter(), null); - - baseView.start(); - - sourceLoadQueuePeon.start(); - destinationLoadQueuePeon.start(); - } - - private DataSegment createSegment(String intervalStr, String version) - { - return DataSegment.builder() - .dataSource("test_curator_druid_coordinator") - .interval(Intervals.of(intervalStr)) - .loadSpec(ImmutableMap.of("type", "local", "path", "somewhere")) - .version(version) - .dimensions(ImmutableList.of()) - .metrics(ImmutableList.of()) - .shardSpec(NoneShardSpec.instance()) - .binaryVersion(9) - .size(0) - .build(); - } - - private StrategicSegmentAssigner createSegmentAssigner( - SegmentLoadQueueManager loadQueueManager, - DruidCoordinatorRuntimeParams params - ) - { - return new StrategicSegmentAssigner( - loadQueueManager, - params.getDruidCluster(), - params.getBalancerStrategy(), - params.getSegmentLoadingConfig(), - new CoordinatorRunStats() - ); - } -} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 512efd0316e8..d50e82e7baa0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -25,27 +25,22 @@ import com.google.common.collect.ImmutableSet; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.druid.client.BatchServerInventoryView; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.ServerInventoryView; import org.apache.druid.common.config.JacksonConfigManager; -import org.apache.druid.curator.CuratorTestBase; -import org.apache.druid.curator.CuratorUtils; import org.apache.druid.curator.discovery.LatchableServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.rpc.indexing.OverlordClient; @@ -65,37 +60,37 @@ import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; import org.apache.druid.server.coordinator.duty.DutyGroupStatus; import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty; -import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon; +import org.apache.druid.server.coordinator.loading.LoadPeonCallback; import org.apache.druid.server.coordinator.loading.LoadQueuePeon; import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; +import org.apache.druid.server.coordinator.loading.SegmentAction; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; import org.apache.druid.server.coordinator.rules.Rule; +import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.server.lookup.cache.LookupCoordinatorManager; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.Duration; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; /** */ -public class DruidCoordinatorTest extends CuratorTestBase +public class DruidCoordinatorTest { - private static final String LOADPATH = "/druid/loadqueue/localhost:1234"; - private static final Duration LOAD_TIMEOUT = Duration.standardMinutes(15); private static final long COORDINATOR_START_DELAY = 1; private static final long COORDINATOR_PERIOD = 100; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); @@ -104,27 +99,22 @@ public class DruidCoordinatorTest extends CuratorTestBase private SegmentsMetadataManager segmentsMetadataManager; private DataSourcesSnapshot dataSourcesSnapshot; - private BatchServerInventoryView serverInventoryView; + private ServerInventoryView serverInventoryView; private ScheduledExecutorFactory scheduledExecutorFactory; - private DruidServer druidServer; - private LoadQueuePeon loadQueuePeon; private LoadQueueTaskMaster loadQueueTaskMaster; private MetadataRuleManager metadataRuleManager; private CountDownLatch leaderAnnouncerLatch; private CountDownLatch leaderUnannouncerLatch; - private PathChildrenCache pathChildrenCache; private DruidCoordinatorConfig druidCoordinatorConfig; - private ObjectMapper objectMapper; private DruidNode druidNode; private OverlordClient overlordClient; private CompactionStatusTracker statusTracker; - private final LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter(); + private LatchableServiceEmitter serviceEmitter; @Before public void setUp() throws Exception { - druidServer = new DruidServer("from", "from", null, 5L, ServerType.HISTORICAL, "tier1", 0); - serverInventoryView = EasyMock.createMock(BatchServerInventoryView.class); + serverInventoryView = EasyMock.createMock(ServerInventoryView.class); segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class); dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class); @@ -147,11 +137,7 @@ public void setUp() throws Exception ) ).andReturn(new AtomicReference<>(DruidCompactionConfig.empty())).anyTimes(); EasyMock.replay(configManager); - setupServerAndCurator(); - curator.start(); - curator.blockUntilConnected(); - curator.create().creatingParentsIfNeeded().forPath(LOADPATH); - objectMapper = new DefaultObjectMapper(); + final ObjectMapper objectMapper = new DefaultObjectMapper(); statusTracker = new CompactionStatusTracker(objectMapper); druidCoordinatorConfig = new DruidCoordinatorConfig( new CoordinatorRunConfig(new Duration(COORDINATOR_START_DELAY), new Duration(COORDINATOR_PERIOD)), @@ -160,26 +146,11 @@ public void setUp() throws Exception new CostBalancerStrategyFactory(), null ); - pathChildrenCache = new PathChildrenCache( - curator, - LOADPATH, - true, - true, - Execs.singleThreaded("coordinator_test_path_children_cache-%d") - ); - loadQueuePeon = new CuratorLoadQueuePeon( - curator, - LOADPATH, - objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon-%d"), - LOAD_TIMEOUT - ); - loadQueuePeon.start(); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); scheduledExecutorFactory = ScheduledExecutors::fixed; leaderAnnouncerLatch = new CountDownLatch(1); leaderUnannouncerLatch = new CountDownLatch(1); + serviceEmitter = new LatchableServiceEmitter(); coordinator = new DruidCoordinator( druidCoordinatorConfig, createMetadataManager(configManager), @@ -213,14 +184,6 @@ private MetadataManager createMetadataManager(JacksonConfigManager configManager ); } - @After - public void tearDown() throws Exception - { - loadQueuePeon.stop(); - pathChildrenCache.close(); - tearDownServerAndCurator(); - } - @Test(timeout = 60_000L) public void testCoordinatorRun() throws Exception { @@ -261,7 +224,8 @@ public void testCoordinatorRun() throws Exception EasyMock.replay(immutableDruidDataSource); // Setup ServerInventoryView - druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, tier, 0); + final DruidServer druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, tier, 0); + final LoadQueuePeon loadQueuePeon = createImmediateLoadPeonFor(druidServer); setupPeons(Collections.singletonMap("server1", loadQueuePeon)); EasyMock.expect(serverInventoryView.getInventory()).andReturn( ImmutableList.of(druidServer) @@ -280,19 +244,8 @@ public void testCoordinatorRun() throws Exception // This coordinator should be leader by now Assert.assertTrue(coordinator.isLeader()); Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader()); - pathChildrenCache.start(); - final CountDownLatch assignSegmentLatch = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch( - 1, - pathChildrenCache, - ImmutableMap.of("2010-01-01T00:00:00.000Z_2010-01-02T00:00:00.000Z", dataSegment), - druidServer - ); - assignSegmentLatch.await(); - - final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); - serviceEmitter.latch = coordinatorRunLatch; - coordinatorRunLatch.await(); + serviceEmitter.coordinatorRunLatch.await(); Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus()); @@ -350,42 +303,24 @@ public void testCoordinatorTieredRun() throws Exception final String dataSource = "dataSource", hotTierName = "hot", coldTierName = "cold"; final Rule hotTier = new IntervalLoadRule(Intervals.of("2018-01-01/P1M"), ImmutableMap.of(hotTierName, 1), null); final Rule coldTier = new ForeverLoadRule(ImmutableMap.of(coldTierName, 1), null); - final String loadPathCold = "/druid/loadqueue/cold:1234"; final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0); final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0); - final Map dataSegments = ImmutableMap.of( - "2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z", + final Set dataSegments = Set.of( new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0), - "2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z", new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0), - "2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z", new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0) ); - final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon( - curator, - loadPathCold, - objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"), - LOAD_TIMEOUT - ); - final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache( - curator, - loadPathCold, - true, - true, - Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d") - ); - setupPeons(ImmutableMap.of("hot", loadQueuePeon, "cold", loadQueuePeonCold)); + final LoadQueuePeon loadQueuePeonHot = createImmediateLoadPeonFor(hotServer); + final LoadQueuePeon loadQueuePeonCold = createImmediateLoadPeonFor(coldServer); + setupPeons(ImmutableMap.of("hot", loadQueuePeonHot, "cold", loadQueuePeonCold)); + loadQueuePeonHot.start(); loadQueuePeonCold.start(); - pathChildrenCache.start(); - pathChildrenCacheCold.start(); DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())}; - dataSegments.values().forEach(druidDataSources[0]::addSegment); + dataSegments.forEach(druidDataSources[0]::addSegment); setupSegmentsMetadataMock(druidDataSources[0]); @@ -402,14 +337,7 @@ public void testCoordinatorTieredRun() throws Exception coordinator.start(); leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader - final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(2, pathChildrenCache, dataSegments, hotServer); - final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(1, pathChildrenCacheCold, dataSegments, coldServer); - assignSegmentLatchHot.await(); - assignSegmentLatchCold.await(); - - final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); - serviceEmitter.latch = coordinatorRunLatch; - coordinatorRunLatch.await(); + serviceEmitter.coordinatorRunLatch.await(); Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus()); @@ -425,7 +353,7 @@ public void testCoordinatorTieredRun() throws Exception Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource)); Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource)); - dataSegments.values().forEach(dataSegment -> Assert.assertEquals(Integer.valueOf(1), coordinator.getReplicationFactor(dataSegment.getId()))); + dataSegments.forEach(dataSegment -> Assert.assertEquals(Integer.valueOf(1), coordinator.getReplicationFactor(dataSegment.getId()))); coordinator.stop(); leaderUnannouncerLatch.await(); @@ -443,109 +371,39 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith final String coldTierName = "cold"; final String tierName1 = "tier1"; final String tierName2 = "tier2"; - final String loadPathCold = "/druid/loadqueue/cold:1234"; - final String loadPathBroker1 = "/druid/loadqueue/broker1:1234"; - final String loadPathBroker2 = "/druid/loadqueue/broker2:1234"; - final String loadPathPeon = "/druid/loadqueue/peon:1234"; final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0); final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0); final DruidServer brokerServer1 = new DruidServer("broker1", "broker1", null, 5L, ServerType.BROKER, tierName1, 0); final DruidServer brokerServer2 = new DruidServer("broker2", "broker2", null, 5L, ServerType.BROKER, tierName2, 0); final DruidServer peonServer = new DruidServer("peon", "peon", null, 5L, ServerType.INDEXER_EXECUTOR, tierName2, 0); - final Map dataSegments = ImmutableMap.of( - "2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z", + final Set dataSegments = Set.of( new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0), - "2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z", new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0), - "2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z", new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0) ); - final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon( - curator, - loadPathCold, - objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"), - LOAD_TIMEOUT - ); - - final LoadQueuePeon loadQueuePeonBroker1 = new CuratorLoadQueuePeon( - curator, - loadPathBroker1, - objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker1_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_broker1-%d"), - LOAD_TIMEOUT - ); - - final LoadQueuePeon loadQueuePeonBroker2 = new CuratorLoadQueuePeon( - curator, - loadPathBroker2, - objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker2_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_broker2-%d"), - LOAD_TIMEOUT - ); - - final LoadQueuePeon loadQueuePeonPoenServer = new CuratorLoadQueuePeon( - curator, - loadPathPeon, - objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_peon_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_peon-%d"), - LOAD_TIMEOUT - ); - final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache( - curator, - loadPathCold, - true, - true, - Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d") - ); - final PathChildrenCache pathChildrenCacheBroker1 = new PathChildrenCache( - curator, - loadPathBroker1, - true, - true, - Execs.singleThreaded("coordinator_test_path_children_cache_broker1-%d") - ); - final PathChildrenCache pathChildrenCacheBroker2 = new PathChildrenCache( - curator, - loadPathBroker2, - true, - true, - Execs.singleThreaded("coordinator_test_path_children_cache_broker2-%d") - ); - final PathChildrenCache pathChildrenCachePeon = new PathChildrenCache( - curator, - loadPathPeon, - true, - true, - Execs.singleThreaded("coordinator_test_path_children_cache_peon-%d") - ); - + final LoadQueuePeon loadQueuePeonHot = createImmediateLoadPeonFor(hotServer); + final LoadQueuePeon loadQueuePeonCold = createImmediateLoadPeonFor(coldServer); + final LoadQueuePeon loadQueuePeonBroker1 = createImmediateLoadPeonFor(brokerServer1); + final LoadQueuePeon loadQueuePeonBroker2 = createImmediateLoadPeonFor(brokerServer2); + final LoadQueuePeon loadQueuePeonPoenServer = createImmediateLoadPeonFor(peonServer); setupPeons(ImmutableMap.of( - "hot", loadQueuePeon, + "hot", loadQueuePeonHot, "cold", loadQueuePeonCold, "broker1", loadQueuePeonBroker1, "broker2", loadQueuePeonBroker2, "peon", loadQueuePeonPoenServer )); + loadQueuePeonHot.start(); loadQueuePeonCold.start(); loadQueuePeonBroker1.start(); loadQueuePeonBroker2.start(); loadQueuePeonPoenServer.start(); - pathChildrenCache.start(); - pathChildrenCacheCold.start(); - pathChildrenCacheBroker1.start(); - pathChildrenCacheBroker2.start(); - pathChildrenCachePeon.start(); DruidDataSource druidDataSource = new DruidDataSource(dataSource, Collections.emptyMap()); - dataSegments.values().forEach(druidDataSource::addSegment); + dataSegments.forEach(druidDataSource::addSegment); setupSegmentsMetadataMock(druidDataSource); @@ -563,23 +421,10 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith coordinator.start(); leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader - final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCache, dataSegments, hotServer); - final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheCold, dataSegments, coldServer); - final CountDownLatch assignSegmentLatchBroker1 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker1, dataSegments, brokerServer1); - final CountDownLatch assignSegmentLatchBroker2 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker2, dataSegments, brokerServer2); - final CountDownLatch assignSegmentLatchPeon = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCachePeon, dataSegments, peonServer); - assignSegmentLatchHot.await(); - assignSegmentLatchCold.await(); - assignSegmentLatchBroker1.await(); - assignSegmentLatchBroker2.await(); - assignSegmentLatchPeon.await(); - - final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); - serviceEmitter.latch = coordinatorRunLatch; - coordinatorRunLatch.await(); + serviceEmitter.coordinatorRunLatch.await(); Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus()); - Assert.assertEquals(new HashSet<>(dataSegments.values()), coordinator.getBroadcastSegments()); + Assert.assertEquals(dataSegments, coordinator.getBroadcastSegments()); // Under-replicated counts are updated only after the next coordinator run Map> underReplicationCountsPerDataSourcePerTier = @@ -875,7 +720,6 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception // Setup SegmentsMetadataManager DruidDataSource[] dataSources = { new DruidDataSource(dataSource, Collections.emptyMap()) - }; final DataSegment dataSegment = new DataSegment( dataSource, @@ -908,11 +752,16 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception EasyMock.replay(immutableDruidDataSource); // Setup ServerInventoryView - druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, hotTier, 0); - DruidServer druidServer2 = new DruidServer("server2", "localhost", null, 5L, ServerType.HISTORICAL, coldTier, 0); - setupPeons(ImmutableMap.of("server1", loadQueuePeon, "server2", loadQueuePeon)); + final DruidServer druidServer1 = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, hotTier, 0); + final DruidServer druidServer2 = new DruidServer("server2", "localhost", null, 5L, ServerType.HISTORICAL, coldTier, 0); + + // For hot server, use a load queue peon that does not perform immediate load + setupPeons(ImmutableMap.of( + "server1", new TestLoadQueuePeon(), + "server2", createImmediateLoadPeonFor(druidServer2)) + ); EasyMock.expect(serverInventoryView.getInventory()).andReturn( - ImmutableList.of(druidServer, druidServer2) + ImmutableList.of(druidServer1, druidServer2) ).atLeastOnce(); EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); EasyMock.replay(serverInventoryView, loadQueueTaskMaster); @@ -925,11 +774,8 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception // This coordinator should be leader by now Assert.assertTrue(coordinator.isLeader()); Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader()); - pathChildrenCache.start(); - final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); - serviceEmitter.latch = coordinatorRunLatch; - coordinatorRunLatch.await(); + serviceEmitter.coordinatorRunLatch.await(); Object2IntMap numsUnavailableUsedSegmentsPerDataSource = coordinator.getDatasourceToUnavailableSegmentCount(); @@ -980,33 +826,6 @@ public void testSimulateRunWithEmptyDatasourceCompactionConfigs() Assert.assertEquals(Collections.emptyMap(), result.getCompactionStates()); } - private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch( - int latchCount, - PathChildrenCache pathChildrenCache, - Map segments, - DruidServer server - ) - { - final CountDownLatch countDownLatch = new CountDownLatch(latchCount); - pathChildrenCache.getListenable().addListener( - (CuratorFramework client, PathChildrenCacheEvent event) -> { - if (CuratorUtils.isChildAdded(event)) { - DataSegment segment = findSegmentRelatedToCuratorEvent(segments, event); - if (segment != null && server.getSegment(segment.getId()) == null) { - if (countDownLatch.getCount() > 0) { - server.addDataSegment(segment); - curator.delete().guaranteed().forPath(event.getData().getPath()); - countDownLatch.countDown(); - } else { - Assert.fail("The segment path " + event.getData().getPath() + " is not expected"); - } - } - } - } - ); - return countDownLatch; - } - private void setupSegmentsMetadataMock(DruidDataSource dataSource) { EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes(); @@ -1041,21 +860,6 @@ private void setupSegmentsMetadataMock(DruidDataSource dataSource) EasyMock.replay(this.dataSourcesSnapshot); } - @Nullable - private static DataSegment findSegmentRelatedToCuratorEvent( - Map dataSegments, - PathChildrenCacheEvent event - ) - { - return dataSegments - .entrySet() - .stream() - .filter(x -> event.getData().getPath().contains(x.getKey())) - .map(Map.Entry::getValue) - .findFirst() - .orElse(null); - } - private void setupPeons(Map peonMap) { loadQueueTaskMaster.resetPeonsForNewServers(EasyMock.anyObject()); @@ -1071,6 +875,18 @@ private void setupPeons(Map peonMap) () -> peonMap.get(((ImmutableDruidServer) EasyMock.getCurrentArgument(0)).getName()) ).anyTimes(); } + + private LoadQueuePeon createImmediateLoadPeonFor(DruidServer server) + { + return new TestLoadQueuePeon() { + @Override + public void loadSegment(DataSegment segment, SegmentAction action, @Nullable LoadPeonCallback callback) + { + server.addDataSegment(segment); + super.loadSegment(segment, action, callback); + } + }; + } private static class TestDruidLeaderSelector implements DruidLeaderSelector { @@ -1113,7 +929,7 @@ public void unregisterListener() private static class LatchableServiceEmitter extends ServiceEmitter { - private CountDownLatch latch; + private final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); private LatchableServiceEmitter() { @@ -1123,8 +939,15 @@ private LatchableServiceEmitter() @Override public void emit(Event event) { - if (latch != null && "segment/count".equals(event.toMap().get("metric"))) { - latch.countDown(); + if (event instanceof ServiceMetricEvent) { + final ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; + + // Count down when the historical management duties group has finished + String dutyGroupName = (String) metricEvent.getUserDims().get("dutyGroup"); + if (Stats.CoordinatorRun.GROUP_RUN_TIME.getMetricName().equals(metricEvent.getMetric()) + && "HistoricalManagementDuties".equals(dutyGroupName)) { + coordinatorRunLatch.countDown(); + } } } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java deleted file mode 100644 index 4de59af1f2d3..000000000000 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordinator.loading; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.CuratorTestBase; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.segment.TestHelper; -import org.apache.druid.server.coordination.DataSegmentChangeCallback; -import org.apache.druid.server.coordination.DataSegmentChangeHandler; -import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentChangeRequestDrop; -import org.apache.druid.server.coordination.SegmentChangeRequestLoad; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.partition.NoneShardSpec; -import org.joda.time.Duration; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class CuratorLoadQueuePeonTest extends CuratorTestBase -{ - private static final String LOAD_QUEUE_PATH = "/druid/loadqueue/localhost:1234"; - - private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - - private CuratorLoadQueuePeon loadQueuePeon; - private PathChildrenCache loadQueueCache; - - @Before - public void setUp() throws Exception - { - setupServerAndCurator(); - curator.start(); - curator.blockUntilConnected(); - curator.create().creatingParentsIfNeeded().forPath(LOAD_QUEUE_PATH); - - loadQueueCache = new PathChildrenCache( - curator, - LOAD_QUEUE_PATH, - true, - true, - Execs.singleThreaded("load_queue_cache-%d") - ); - } - - @Test - public void testMultipleLoadDropSegments() throws Exception - { - loadQueuePeon = new CuratorLoadQueuePeon( - curator, - LOAD_QUEUE_PATH, - jsonMapper, - Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), - Execs.singleThreaded("test_load_queue_peon-%d"), - Duration.standardMinutes(15) - ); - - loadQueuePeon.start(); - - ConcurrentMap loadRequestSignals = new ConcurrentHashMap<>(5); - ConcurrentMap dropRequestSignals = new ConcurrentHashMap<>(5); - ConcurrentMap segmentLoadedSignals = new ConcurrentHashMap<>(5); - ConcurrentMap segmentDroppedSignals = new ConcurrentHashMap<>(5); - - final List segmentToDrop = Lists.transform( - ImmutableList.of( - "2014-10-26T00:00:00Z/P1D", - "2014-10-25T00:00:00Z/P1D", - "2014-10-24T00:00:00Z/P1D", - "2014-10-23T00:00:00Z/P1D", - "2014-10-22T00:00:00Z/P1D" - ), new Function<>() - { - @Override - public DataSegment apply(String intervalStr) - { - DataSegment dataSegment = dataSegmentWithInterval(intervalStr); - return dataSegment; - } - } - ); - - final CountDownLatch[] dropRequestLatches = new CountDownLatch[5]; - final CountDownLatch[] dropSegmentLatches = new CountDownLatch[5]; - for (int i = 0; i < 5; i++) { - dropRequestLatches[i] = new CountDownLatch(1); - dropSegmentLatches[i] = new CountDownLatch(1); - } - int i = 0; - for (DataSegment s : segmentToDrop) { - dropRequestSignals.put(s.getId(), dropRequestLatches[i]); - segmentDroppedSignals.put(s.getId(), dropSegmentLatches[i++]); - } - - final List segmentToLoad = Lists.transform( - ImmutableList.of( - "2014-10-27T00:00:00Z/P1D", - "2014-10-29T00:00:00Z/P1M", - "2014-10-31T00:00:00Z/P1D", - "2014-10-30T00:00:00Z/P1D", - "2014-10-28T00:00:00Z/P1D" - ), new Function<>() - { - @Override - public DataSegment apply(String intervalStr) - { - DataSegment dataSegment = dataSegmentWithInterval(intervalStr); - loadRequestSignals.put(dataSegment.getId(), new CountDownLatch(1)); - segmentLoadedSignals.put(dataSegment.getId(), new CountDownLatch(1)); - return dataSegment; - } - } - ); - - final CountDownLatch[] loadRequestLatches = new CountDownLatch[5]; - final CountDownLatch[] segmentLoadedLatches = new CountDownLatch[5]; - for (i = 0; i < 5; i++) { - loadRequestLatches[i] = new CountDownLatch(1); - segmentLoadedLatches[i] = new CountDownLatch(1); - } - i = 0; - for (DataSegment s : segmentToDrop) { - loadRequestSignals.put(s.getId(), loadRequestLatches[i]); - segmentLoadedSignals.put(s.getId(), segmentLoadedLatches[i++]); - } - - // segment with latest interval should be loaded first - final List expectedLoadOrder = Lists.transform( - ImmutableList.of( - "2014-10-29T00:00:00Z/P1M", - "2014-10-31T00:00:00Z/P1D", - "2014-10-30T00:00:00Z/P1D", - "2014-10-28T00:00:00Z/P1D", - "2014-10-27T00:00:00Z/P1D" - ), intervalStr -> dataSegmentWithInterval(intervalStr) - ); - - final DataSegmentChangeHandler handler = new DataSegmentChangeHandler() - { - @Override - public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - loadRequestSignals.get(segment.getId()).countDown(); - } - - @Override - public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - dropRequestSignals.get(segment.getId()).countDown(); - } - }; - - loadQueueCache.getListenable().addListener( - (client, event) -> { - if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { - DataSegmentChangeRequest request = jsonMapper.readValue( - event.getData().getData(), - DataSegmentChangeRequest.class - ); - request.go(handler, null); - } - } - ); - loadQueueCache.start(); - - for (final DataSegment segment : segmentToDrop) { - loadQueuePeon.dropSegment( - segment, - success -> segmentDroppedSignals.get(segment.getId()).countDown() - ); - } - - for (final DataSegment segment : segmentToLoad) { - loadQueuePeon.loadSegment( - segment, - SegmentAction.LOAD, - success -> segmentLoadedSignals.get(segment.getId()).countDown() - ); - } - - Assert.assertEquals(6000, loadQueuePeon.getSizeOfSegmentsToLoad()); - Assert.assertEquals(5, loadQueuePeon.getSegmentsToLoad().size()); - Assert.assertEquals(5, loadQueuePeon.getSegmentsToDrop().size()); - Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size()); - - for (DataSegment segment : segmentToDrop) { - String dropRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString()); - Assert.assertTrue( - "Latch not counted down for " + dropRequestSignals.get(segment.getId()), - dropRequestSignals.get(segment.getId()).await(10, TimeUnit.SECONDS) - ); - Assert.assertNotNull( - "Path " + dropRequestPath + " doesn't exist", - curator.checkExists().forPath(dropRequestPath) - ); - Assert.assertEquals( - segment, - ((SegmentChangeRequestDrop) jsonMapper.readValue( - curator.getData() - .decompressed() - .forPath(dropRequestPath), DataSegmentChangeRequest.class - )).getSegment() - ); - - // simulate completion of drop request by historical - curator.delete().guaranteed().forPath(dropRequestPath); - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentDroppedSignals.get(segment.getId()))); - } - - for (DataSegment segment : expectedLoadOrder) { - String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString()); - Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignals.get(segment.getId()))); - Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath)); - Assert.assertEquals( - segment, - ((SegmentChangeRequestLoad) jsonMapper - .readValue(curator.getData().decompressed().forPath(loadRequestPath), DataSegmentChangeRequest.class)) - .getSegment() - ); - - // simulate completion of load request by historical - curator.delete().guaranteed().forPath(loadRequestPath); - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignals.get(segment.getId()))); - } - } - - @Test - public void testFailAssignForNonTimeoutFailures() throws Exception - { - final DataSegment segment = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D"); - - final CountDownLatch segmentLoadedSignal = new CountDownLatch(1); - - loadQueuePeon = new CuratorLoadQueuePeon( - curator, - LOAD_QUEUE_PATH, - // This will fail inside SegmentChangeProcessor.run() - null, - Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), - Execs.singleThreaded("test_load_queue_peon-%d"), - // set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly - new Duration(1) - ); - - loadQueuePeon.start(); - - loadQueueCache.start(); - - loadQueuePeon.loadSegment( - segment, - SegmentAction.LOAD, - success -> segmentLoadedSignal.countDown() - ); - - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal)); - Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size()); - Assert.assertEquals(0L, loadQueuePeon.getSizeOfSegmentsToLoad()); - Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size()); - - } - - @Test - public void testFailAssignForLoadDropTimeout() throws Exception - { - final DataSegment segment = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D"); - - final CountDownLatch loadRequestSignal = new CountDownLatch(1); - final CountDownLatch segmentLoadedSignal = new CountDownLatch(1); - final CountDownLatch delayedSegmentLoadedSignal = new CountDownLatch(2); - final CountDownLatch loadRequestRemoveSignal = new CountDownLatch(1); - - loadQueuePeon = new CuratorLoadQueuePeon( - curator, - LOAD_QUEUE_PATH, - jsonMapper, - Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), - Execs.singleThreaded("test_load_queue_peon-%d"), - // The timeout here was set to 1ms, when this test was acting flakey. A cursory glance makes me wonder if - // there's a race where the timeout actually happens before other code can run. 1ms timeout seems aggressive. - // 100ms is a great price to pay if it removes the flakeyness, - new Duration(100) - ); - - loadQueuePeon.start(); - - loadQueueCache.getListenable().addListener( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) - { - switch (event.getType()) { - case CHILD_ADDED: - loadRequestSignal.countDown(); - break; - case CHILD_REMOVED: - loadRequestRemoveSignal.countDown(); - break; - default: - // pass - } - } - } - ); - loadQueueCache.start(); - - loadQueuePeon.loadSegment( - segment, - SegmentAction.LOAD, - success -> { - segmentLoadedSignal.countDown(); - delayedSegmentLoadedSignal.countDown(); - } - ); - - String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString()); - - Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal)); - Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath)); - Assert.assertEquals( - segment, - ((SegmentChangeRequestLoad) jsonMapper - .readValue(curator.getData().decompressed().forPath(loadRequestPath), DataSegmentChangeRequest.class)) - .getSegment() - ); - - // simulate incompletion of load request since request has timed out - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal)); - Assert.assertEquals(1, loadQueuePeon.getSegmentsToLoad().size()); - Assert.assertEquals(1200L, loadQueuePeon.getSizeOfSegmentsToLoad()); - Assert.assertEquals(1, loadQueuePeon.getTimedOutSegments().size()); - - // simulate completion of load request by historical after time out on coordinator - curator.delete().guaranteed().forPath(loadRequestPath); - Assert.assertTrue(timing.forWaiting().awaitLatch(delayedSegmentLoadedSignal)); - Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestRemoveSignal)); - Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size()); - Assert.assertEquals(0L, loadQueuePeon.getSizeOfSegmentsToLoad()); - Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size()); - - } - - private DataSegment dataSegmentWithInterval(String intervalStr) - { - return DataSegment.builder() - .dataSource("test_load_queue_peon") - .interval(Intervals.of(intervalStr)) - .loadSpec(ImmutableMap.of()) - .version("2015-05-27T03:38:35.683Z") - .dimensions(ImmutableList.of()) - .metrics(ImmutableList.of()) - .shardSpec(NoneShardSpec.instance()) - .binaryVersion(9) - .size(1200) - .build(); - } - - @After - public void tearDown() throws Exception - { - loadQueueCache.close(); - loadQueuePeon.stop(); - tearDownServerAndCurator(); - } -} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md index 9e8270832845..cd925ccfa724 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md @@ -74,7 +74,6 @@ of the coordinator in these situations. - It should not be used to verify the absolute values of execution latencies, e.g. the time taken to compute the balancing cost of a segment. But the relative values can still be a good indicator while doing comparisons between, say two balancing strategies. -- It does not support simulation of the zk-based `CuratorLoadQueuePeon`. ## Usage