diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
deleted file mode 100644
index 0ab9d8ab3297..000000000000
--- a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.loading;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.server.coordination.SegmentChangeRequestNoop;
-import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
-import org.apache.druid.server.coordinator.stats.Stats;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.joda.time.Duration;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Use {@link HttpLoadQueuePeon} instead.
- *
- * Objects of this class can be accessed by multiple threads. State wise, this class
- * is thread safe and callers of the public methods can expect thread safe behavior.
- * Though, like a typical object being accessed by multiple threads,
- * callers shouldn't expect strict consistency in results between two calls
- * of the same or different methods.
- */
-@Deprecated
-public class CuratorLoadQueuePeon implements LoadQueuePeon
-{
- private static final EmittingLogger log = new EmittingLogger(CuratorLoadQueuePeon.class);
-
- private final CuratorFramework curator;
- private final String basePath;
- private final ObjectMapper jsonMapper;
- private final ScheduledExecutorService processingExecutor;
-
- /**
- * Threadpool with daemon threads that execute callback actions associated
- * with loading or dropping segments.
- */
- private final ExecutorService callBackExecutor;
- private final Duration loadTimeout;
-
- private final AtomicLong queuedSize = new AtomicLong(0);
- private final AtomicReference stats = new AtomicReference<>(new CoordinatorRunStats());
-
- /**
- * Needs to be thread safe since it can be concurrently accessed via
- * {@link #loadSegment(DataSegment, SegmentAction, LoadPeonCallback)},
- * {@link #actionCompleted(SegmentHolder)}, {@link #getSegmentsToLoad()} and
- * {@link #stop()}.
- */
- private final ConcurrentSkipListMap segmentsToLoad
- = new ConcurrentSkipListMap<>(SegmentHolder.NEWEST_SEGMENT_FIRST);
-
- /**
- * Needs to be thread safe since it can be concurrently accessed via
- * {@link #dropSegment(DataSegment, LoadPeonCallback)}, {@link #actionCompleted(SegmentHolder)},
- * {@link #getSegmentsToDrop()} and {@link #stop()}
- */
- private final ConcurrentSkipListMap segmentsToDrop
- = new ConcurrentSkipListMap<>(SegmentHolder.NEWEST_SEGMENT_FIRST);
-
- /**
- * Needs to be thread safe since it can be concurrently accessed via
- * {@link #markSegmentToDrop(DataSegment)}}, {@link #unmarkSegmentToDrop(DataSegment)}}
- * and {@link #getSegmentsToDrop()}
- */
- private final ConcurrentSkipListSet segmentsMarkedToDrop
- = new ConcurrentSkipListSet<>(SegmentHolder.NEWEST_SEGMENT_FIRST);
-
- /**
- * Needs to be thread safe since it can be concurrently accessed via
- * {@link #failAssign(SegmentHolder, boolean, Exception)}, {@link #actionCompleted(SegmentHolder)},
- * {@link #getTimedOutSegments()} and {@link #stop()}
- */
- private final ConcurrentSkipListSet timedOutSegments =
- new ConcurrentSkipListSet<>(SegmentHolder.NEWEST_SEGMENT_FIRST);
-
- public CuratorLoadQueuePeon(
- CuratorFramework curator,
- String basePath,
- ObjectMapper jsonMapper,
- ScheduledExecutorService processingExecutor,
- ExecutorService callbackExecutor,
- Duration loadTimeout
- )
- {
- this.curator = curator;
- this.basePath = basePath;
- this.jsonMapper = jsonMapper;
- this.callBackExecutor = callbackExecutor;
- this.processingExecutor = processingExecutor;
- this.loadTimeout = loadTimeout;
- }
-
- @JsonProperty
- @Override
- public Set getSegmentsToLoad()
- {
- return segmentsToLoad.keySet();
- }
-
- @Override
- public Set getSegmentsInQueue()
- {
- final Set segmentsInQueue = new HashSet<>();
- segmentsInQueue.addAll(segmentsToLoad.values());
- segmentsInQueue.addAll(segmentsToDrop.values());
- return segmentsInQueue;
- }
-
- @JsonProperty
- @Override
- public Set getSegmentsToDrop()
- {
- return segmentsToDrop.keySet();
- }
-
- @JsonProperty
- @Override
- public Set getSegmentsMarkedToDrop()
- {
- return segmentsMarkedToDrop;
- }
-
- @Override
- public Set getTimedOutSegments()
- {
- return timedOutSegments;
- }
-
- @Override
- public long getSizeOfSegmentsToLoad()
- {
- return queuedSize.get();
- }
-
- @Override
- public long getLoadRateKbps()
- {
- return 0;
- }
-
- @Override
- public CoordinatorRunStats getAndResetStats()
- {
- return stats.getAndSet(new CoordinatorRunStats());
- }
-
- @Override
- public void loadSegment(final DataSegment segment, SegmentAction action, @Nullable final LoadPeonCallback callback)
- {
- SegmentHolder segmentHolder = new SegmentHolder(segment, action, Duration.ZERO, callback);
- final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, segmentHolder);
- if (existingHolder != null) {
- existingHolder.addCallback(callback);
- return;
- }
- log.debug("Asking server peon[%s] to load segment[%s]", basePath, segment.getId());
- queuedSize.addAndGet(segment.getSize());
- processingExecutor.submit(new SegmentChangeProcessor(segmentHolder));
- }
-
- @Override
- public void dropSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback)
- {
- SegmentHolder segmentHolder = new SegmentHolder(segment, SegmentAction.DROP, Duration.ZERO, callback);
- final SegmentHolder existingHolder = segmentsToDrop.putIfAbsent(segment, segmentHolder);
- if (existingHolder != null) {
- existingHolder.addCallback(callback);
- return;
- }
- log.debug("Asking server peon[%s] to drop segment[%s]", basePath, segment.getId());
- processingExecutor.submit(new SegmentChangeProcessor(segmentHolder));
- }
-
- @Override
- public void markSegmentToDrop(DataSegment dataSegment)
- {
- segmentsMarkedToDrop.add(dataSegment);
- }
-
- @Override
- public void unmarkSegmentToDrop(DataSegment dataSegment)
- {
- segmentsMarkedToDrop.remove(dataSegment);
- }
-
- private class SegmentChangeProcessor implements Runnable
- {
- private final SegmentHolder segmentHolder;
-
- private SegmentChangeProcessor(SegmentHolder segmentHolder)
- {
- this.segmentHolder = segmentHolder;
- }
-
- @Override
- public void run()
- {
- try {
- final String path = ZKPaths.makePath(basePath, segmentHolder.getSegmentIdentifier());
- final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest());
- curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
- if (log.isDebugEnabled()) {
- log.debug(
- "ZKNode created for server to [%s] %s [%s]",
- basePath, segmentHolder.getAction(), segmentHolder.getSegmentIdentifier()
- );
- }
- final ScheduledFuture> nodeDeletedCheck = scheduleNodeDeletedCheck(path);
- final Stat stat = curator.checkExists().usingWatcher(
- (CuratorWatcher) watchedEvent -> {
- switch (watchedEvent.getType()) {
- case NodeDeleted:
- // Cancel the check node deleted task since we have already
- // been notified by the zk watcher
- nodeDeletedCheck.cancel(true);
- onZkNodeDeleted(segmentHolder, watchedEvent.getPath());
- break;
- default:
- // do nothing
- }
- }
- ).forPath(path);
-
- // Cleanup watcher to avoid memory leak if we missed the NodeDeleted event
- if (stat == null) {
- // Create a node and then delete it to remove the registered watcher. This is a work-around for
- // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
- // that happens for that node. If no events happen, the watcher stays registered foreverz.
- // Couple that with the fact that you cannot set a watcher when you create a node, but what we
- // want is to create a node and then watch for it to get deleted. The solution is that you *can*
- // set a watcher when you check to see if it exists so, we first create the node and then set a
- // watcher on its existence. However, if already does not exist by the time the existence check
- // returns, then the watcher that was set will never fire (nobody will ever create the node
- // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
- // that watcher to fire and delete it right away.
- //
- // We do not create the existence watcher first, because then it will fire when we create the
- // node and we'll have the same race when trying to refresh that watcher.
- final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
- curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
- onZkNodeDeleted(segmentHolder, path);
- }
- }
- catch (KeeperException.NodeExistsException ne) {
- // This is expected when historicals haven't yet picked up processing this segment and coordinator
- // tries reassigning it to the same node.
- log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed");
- failAssign(segmentHolder, true, null);
- }
- catch (Exception e) {
- failAssign(segmentHolder, false, e);
- }
- }
-
- @Nonnull
- private ScheduledFuture> scheduleNodeDeletedCheck(String path)
- {
- return processingExecutor.schedule(
- () -> {
- try {
- if (curator.checkExists().forPath(path) != null) {
- failAssign(
- segmentHolder,
- true,
- new ISE(
- "%s operation timed out and [%s] was never removed! "
- + "These segments may still get processed.",
- segmentHolder.getAction(),
- path
- )
- );
- } else {
- log.debug("Path [%s] has been removed.", path);
- }
- }
- catch (Exception e) {
- log.error(e, "Exception caught and ignored when checking whether zk node was deleted");
- failAssign(segmentHolder, false, e);
- }
- },
- loadTimeout.getMillis(),
- TimeUnit.MILLISECONDS
- );
- }
- }
-
- private void actionCompleted(SegmentHolder segmentHolder)
- {
- switch (segmentHolder.getAction()) {
- case LOAD:
- case REPLICATE:
- case MOVE_TO:
- // When load failed a segment will be removed from the segmentsToLoad twice and
- // null value will be returned at the second time in which case queueSize may be negative.
- // See https://github.com/apache/druid/pull/10362 for more details.
- if (null != segmentsToLoad.remove(segmentHolder.getSegment())) {
- queuedSize.addAndGet(-segmentHolder.getSegment().getSize());
- timedOutSegments.remove(segmentHolder.getSegment());
- }
- break;
- case DROP:
- segmentsToDrop.remove(segmentHolder.getSegment());
- timedOutSegments.remove(segmentHolder.getSegment());
- break;
- default:
- throw new UnsupportedOperationException();
- }
- executeCallbacks(segmentHolder, true);
- }
-
-
- @Override
- public void start()
- {
- }
-
- @Override
- public void stop()
- {
- for (SegmentHolder holder : segmentsToDrop.values()) {
- executeCallbacks(holder, false);
- }
- segmentsToDrop.clear();
-
- for (SegmentHolder holder : segmentsToLoad.values()) {
- executeCallbacks(holder, false);
- }
- segmentsToLoad.clear();
-
- timedOutSegments.clear();
- queuedSize.set(0L);
- stats.get().clear();
- }
-
- private void onZkNodeDeleted(SegmentHolder segmentHolder, String path)
- {
- if (!ZKPaths.getNodeFromPath(path).equals(segmentHolder.getSegmentIdentifier())) {
- log.warn(
- "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
- basePath,
- path,
- segmentHolder
- );
- return;
- }
- actionCompleted(segmentHolder);
- log.debug(
- "Server[%s] done processing %s of segment [%s]",
- basePath,
- segmentHolder.getAction(),
- path
- );
- }
-
- private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout, Exception e)
- {
- if (e != null) {
- log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, segmentHolder);
- }
- stats.get().add(Stats.SegmentQueue.FAILED_ACTIONS, 1);
-
- if (handleTimeout) {
- // Avoid removing the segment entry from the load/drop list in case config.getLoadTimeoutDelay() expires.
- // This is because the ZK Node is still present and it may be processed after this timeout and so the coordinator
- // needs to take this into account.
- log.debug(
- "Skipping segment removal from [%s] queue, since ZK Node still exists!",
- segmentHolder.getAction()
- );
- timedOutSegments.add(segmentHolder.getSegment());
- executeCallbacks(segmentHolder, false);
- } else {
- // This may have failed for a different reason and so act like it was completed.
- actionCompleted(segmentHolder);
- }
- }
-
- @Override
- public boolean cancelOperation(DataSegment segment)
- {
- return false;
- }
-
- private void executeCallbacks(SegmentHolder holder, boolean success)
- {
- for (LoadPeonCallback callback : holder.getCallbacks()) {
- callBackExecutor.submit(() -> callback.execute(success));
- }
- }
-}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
index 8e9989717ab9..2d42a49febaf 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
@@ -27,7 +27,6 @@
/**
* Supports load queue management.
*/
-@Deprecated
public interface LoadQueuePeon
{
void start();
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
deleted file mode 100644
index c312cb493b59..000000000000
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.druid.client.BatchServerInventoryView;
-import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
-import org.apache.druid.client.CoordinatorServerView;
-import org.apache.druid.client.DataSourcesSnapshot;
-import org.apache.druid.client.DruidServer;
-import org.apache.druid.client.ImmutableDruidDataSource;
-import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.curator.CuratorUtils;
-import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
-import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
-import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
-import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
-import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
-import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
-import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
-import org.apache.druid.server.initialization.ZkPathsConfig;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
-import org.apache.druid.testing.DeadlockDetectingTimeout;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.EasyMock;
-import org.joda.time.Duration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestRule;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This tests zookeeper specific coordinator/load queue/historical interactions, such as moving segments by the balancer
- */
-public class CuratorDruidCoordinatorTest extends CuratorTestBase
-{
- private DataSourcesSnapshot dataSourcesSnapshot;
- private DruidCoordinatorRuntimeParams coordinatorRuntimeParams;
-
- private LoadQueuePeon sourceLoadQueuePeon;
- private LoadQueuePeon destinationLoadQueuePeon;
- private PathChildrenCache sourceLoadQueueChildrenCache;
- private PathChildrenCache destinationLoadQueueChildrenCache;
-
- private static final String SEGPATH = "/druid/segments";
- private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1";
- private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2";
- private static final long COORDINATOR_START_DELAY = 1;
- private static final long COORDINATOR_PERIOD = 100;
-
- private BatchServerInventoryView baseView;
- private CoordinatorServerView serverView;
- private CountDownLatch segmentViewInitLatch;
- /**
- * The following two fields are changed during {@link #testMoveSegment()}, the change might not be visible from the
- * thread, that runs the callback, registered in {@link #setupView()}. volatile modificator doesn't guarantee
- * visibility either, but somewhat increases the chances.
- */
- private volatile CountDownLatch segmentAddedLatch;
- private volatile CountDownLatch segmentRemovedLatch;
- private final ObjectMapper jsonMapper;
- private final ZkPathsConfig zkPathsConfig;
-
- private final ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d");
- private final ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d");
-
- public CuratorDruidCoordinatorTest()
- {
- jsonMapper = TestHelper.makeJsonMapper();
- zkPathsConfig = new ZkPathsConfig();
- }
-
- @Before
- public void setUp() throws Exception
- {
- dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
- coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);
-
- setupServerAndCurator();
- curator.start();
- curator.blockUntilConnected();
- curator.create().creatingParentsIfNeeded().forPath(SEGPATH);
- curator.create().creatingParentsIfNeeded().forPath(SOURCE_LOAD_PATH);
- curator.create().creatingParentsIfNeeded().forPath(DESTINATION_LOAD_PATH);
-
- final ObjectMapper objectMapper = new DefaultObjectMapper();
- sourceLoadQueueChildrenCache = new PathChildrenCache(
- curator,
- SOURCE_LOAD_PATH,
- true,
- true,
- Execs.singleThreaded("coordinator_test_path_children_cache_src-%d")
- );
- destinationLoadQueueChildrenCache = new PathChildrenCache(
- curator,
- DESTINATION_LOAD_PATH,
- true,
- true,
- Execs.singleThreaded("coordinator_test_path_children_cache_dest-%d")
- );
- sourceLoadQueuePeon = new CuratorLoadQueuePeon(
- curator,
- SOURCE_LOAD_PATH,
- objectMapper,
- peonExec,
- callbackExec,
- Duration.standardMinutes(15)
- );
- destinationLoadQueuePeon = new CuratorLoadQueuePeon(
- curator,
- DESTINATION_LOAD_PATH,
- objectMapper,
- peonExec,
- callbackExec,
- Duration.standardMinutes(15)
- );
- }
-
- @After
- public void tearDown() throws Exception
- {
- baseView.stop();
- sourceLoadQueuePeon.stop();
- sourceLoadQueueChildrenCache.close();
- destinationLoadQueueChildrenCache.close();
- tearDownServerAndCurator();
- }
-
- @Rule
- public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS);
-
- @Test
- public void testStopDoesntKillPoolItDoesntOwn() throws Exception
- {
- setupView();
- sourceLoadQueuePeon.stop();
- Assert.assertFalse(peonExec.isShutdown());
- Assert.assertFalse(callbackExec.isShutdown());
- }
-
- @Test
- public void testMoveSegment() throws Exception
- {
- segmentViewInitLatch = new CountDownLatch(1);
- segmentAddedLatch = new CountDownLatch(4);
-
- segmentRemovedLatch = new CountDownLatch(0);
-
- setupView();
-
- DruidServer source = new DruidServer(
- "localhost:1",
- "localhost:1",
- null,
- 10000000L,
- ServerType.HISTORICAL,
- "default_tier",
- 0
- );
-
- DruidServer dest = new DruidServer(
- "localhost:2",
- "localhost:2",
- null,
- 10000000L,
- ServerType.HISTORICAL,
- "default_tier",
- 0
- );
-
- setupZNodeForServer(source, zkPathsConfig, jsonMapper);
- setupZNodeForServer(dest, zkPathsConfig, jsonMapper);
-
- final List sourceSegments = Arrays.asList(
- createSegment("2011-04-01/2011-04-03", "v1"),
- createSegment("2011-04-03/2011-04-06", "v1"),
- createSegment("2011-04-06/2011-04-09", "v1")
- );
-
- final List destinationSegments = Collections.singletonList(
- createSegment("2011-03-31/2011-04-01", "v1")
- );
-
- DataSegment segmentToMove = sourceSegments.get(2);
-
- List sourceSegKeys = new ArrayList<>();
-
- for (DataSegment segment : sourceSegments) {
- sourceSegKeys.add(announceBatchSegmentsForServer(source, ImmutableSet.of(segment), zkPathsConfig, jsonMapper));
- }
-
- for (DataSegment segment : destinationSegments) {
- announceBatchSegmentsForServer(dest, ImmutableSet.of(segment), zkPathsConfig, jsonMapper);
- }
-
- Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
- Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
-
- // these child watchers are used to simulate actions of historicals, announcing a segment on noticing a load queue
- // for the destination and unannouncing from source server when noticing a drop request
-
- CountDownLatch srcCountdown = new CountDownLatch(1);
- sourceLoadQueueChildrenCache.getListenable().addListener(
- (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
- if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
- srcCountdown.countDown();
- } else if (CuratorUtils.isChildAdded(event)) {
- //Simulate source server dropping segment
- unannounceSegmentFromBatchForServer(source, segmentToMove, sourceSegKeys.get(2), zkPathsConfig);
- }
- }
- );
-
- CountDownLatch destCountdown = new CountDownLatch(1);
- destinationLoadQueueChildrenCache.getListenable().addListener(
- (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
- if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
- destCountdown.countDown();
- } else if (CuratorUtils.isChildAdded(event)) {
- //Simulate destination server loading segment
- announceBatchSegmentsForServer(dest, ImmutableSet.of(segmentToMove), zkPathsConfig, jsonMapper);
- }
- }
- );
-
- sourceLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
- destinationLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
-
- Assert.assertTrue(timing.forWaiting().awaitLatch(srcCountdown));
- Assert.assertTrue(timing.forWaiting().awaitLatch(destCountdown));
-
- sourceSegments.forEach(source::addDataSegment);
- destinationSegments.forEach(dest::addDataSegment);
-
- segmentRemovedLatch = new CountDownLatch(1);
- segmentAddedLatch = new CountDownLatch(1);
-
- ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
- EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(sourceSegments.get(2));
- EasyMock.replay(druidDataSource);
- EasyMock.expect(coordinatorRuntimeParams.getDataSourcesSnapshot())
- .andReturn(dataSourcesSnapshot).anyTimes();
- final CoordinatorDynamicConfig dynamicConfig =
- CoordinatorDynamicConfig.builder().withUseRoundRobinSegmentAssignment(false).build();
- EasyMock.expect(coordinatorRuntimeParams.getCoordinatorDynamicConfig())
- .andReturn(dynamicConfig)
- .anyTimes();
- EasyMock.expect(coordinatorRuntimeParams.getSegmentLoadingConfig())
- .andReturn(SegmentLoadingConfig.create(dynamicConfig, 100))
- .anyTimes();
-
- final ServerHolder sourceServer = new ServerHolder(source.toImmutableDruidServer(), sourceLoadQueuePeon);
- final ServerHolder destinationServer = new ServerHolder(dest.toImmutableDruidServer(), destinationLoadQueuePeon);
- final DruidCluster cluster = DruidCluster.builder().add(sourceServer).add(destinationServer).build();
-
- final BalancerStrategy balancerStrategy = EasyMock.mock(BalancerStrategy.class);
- EasyMock.expect(
- balancerStrategy.findDestinationServerToMoveSegment(
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject()
- )
- ).andReturn(destinationServer).atLeastOnce();
- EasyMock.expect(coordinatorRuntimeParams.getBalancerStrategy())
- .andReturn(balancerStrategy).anyTimes();
- EasyMock.expect(coordinatorRuntimeParams.getDruidCluster()).andReturn(cluster).anyTimes();
- EasyMock.replay(coordinatorRuntimeParams, balancerStrategy);
-
- EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString()))
- .andReturn(druidDataSource).anyTimes();
- EasyMock.replay(dataSourcesSnapshot);
-
- LoadQueueTaskMaster taskMaster = EasyMock.createMock(LoadQueueTaskMaster.class);
- EasyMock.expect(taskMaster.isHttpLoading()).andReturn(false).anyTimes();
- EasyMock.replay(taskMaster);
-
- // Move the segment from source to dest
- SegmentLoadQueueManager loadQueueManager =
- new SegmentLoadQueueManager(baseView, taskMaster);
- StrategicSegmentAssigner segmentAssigner = createSegmentAssigner(loadQueueManager, coordinatorRuntimeParams);
- segmentAssigner.moveSegment(
- segmentToMove,
- sourceServer,
- Collections.singletonList(destinationServer)
- );
-
- // wait for destination server to load segment
- Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
-
- // remove load queue key from destination server to trigger adding drop to load queue
- curator.delete().guaranteed().forPath(ZKPaths.makePath(DESTINATION_LOAD_PATH, segmentToMove.getId().toString()));
-
- // wait for drop
- Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
-
- // clean up drop from load queue
- curator.delete().guaranteed().forPath(ZKPaths.makePath(SOURCE_LOAD_PATH, segmentToMove.getId().toString()));
-
- List servers = new ArrayList<>(serverView.getInventory());
-
- Assert.assertEquals(2, servers.get(0).getTotalSegments());
- Assert.assertEquals(2, servers.get(1).getTotalSegments());
- }
-
- private void setupView() throws Exception
- {
- baseView = new BatchServerInventoryView(
- zkPathsConfig,
- curator,
- jsonMapper,
- Predicates.alwaysTrue(),
- "test"
- )
- {
- @Override
- public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
- {
- super.registerSegmentCallback(
- exec,
- new SegmentCallback()
- {
- @Override
- public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
- {
- CallbackAction res = callback.segmentAdded(server, segment);
- segmentAddedLatch.countDown();
- return res;
- }
-
- @Override
- public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
- {
- CallbackAction res = callback.segmentRemoved(server, segment);
- segmentRemovedLatch.countDown();
- return res;
- }
-
- @Override
- public CallbackAction segmentViewInitialized()
- {
- CallbackAction res = callback.segmentViewInitialized();
- segmentViewInitLatch.countDown();
- return res;
- }
-
- @Override
- public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
- {
- return CallbackAction.CONTINUE;
- }
- }
- );
- }
- };
-
- serverView = new CoordinatorServerView(baseView, new CoordinatorSegmentWatcherConfig(), new NoopServiceEmitter(), null);
-
- baseView.start();
-
- sourceLoadQueuePeon.start();
- destinationLoadQueuePeon.start();
- }
-
- private DataSegment createSegment(String intervalStr, String version)
- {
- return DataSegment.builder()
- .dataSource("test_curator_druid_coordinator")
- .interval(Intervals.of(intervalStr))
- .loadSpec(ImmutableMap.of("type", "local", "path", "somewhere"))
- .version(version)
- .dimensions(ImmutableList.of())
- .metrics(ImmutableList.of())
- .shardSpec(NoneShardSpec.instance())
- .binaryVersion(9)
- .size(0)
- .build();
- }
-
- private StrategicSegmentAssigner createSegmentAssigner(
- SegmentLoadQueueManager loadQueueManager,
- DruidCoordinatorRuntimeParams params
- )
- {
- return new StrategicSegmentAssigner(
- loadQueueManager,
- params.getDruidCluster(),
- params.getBalancerStrategy(),
- params.getSegmentLoadingConfig(),
- new CoordinatorRunStats()
- );
- }
-}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 512efd0316e8..d50e82e7baa0 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -25,27 +25,22 @@
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.discovery.LatchableServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient;
@@ -65,37 +60,37 @@
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.DutyGroupStatus;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
-import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon;
+import org.apache.druid.server.coordinator.loading.LoadPeonCallback;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Duration;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
-public class DruidCoordinatorTest extends CuratorTestBase
+public class DruidCoordinatorTest
{
- private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
- private static final Duration LOAD_TIMEOUT = Duration.standardMinutes(15);
private static final long COORDINATOR_START_DELAY = 1;
private static final long COORDINATOR_PERIOD = 100;
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
@@ -104,27 +99,22 @@ public class DruidCoordinatorTest extends CuratorTestBase
private SegmentsMetadataManager segmentsMetadataManager;
private DataSourcesSnapshot dataSourcesSnapshot;
- private BatchServerInventoryView serverInventoryView;
+ private ServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory;
- private DruidServer druidServer;
- private LoadQueuePeon loadQueuePeon;
private LoadQueueTaskMaster loadQueueTaskMaster;
private MetadataRuleManager metadataRuleManager;
private CountDownLatch leaderAnnouncerLatch;
private CountDownLatch leaderUnannouncerLatch;
- private PathChildrenCache pathChildrenCache;
private DruidCoordinatorConfig druidCoordinatorConfig;
- private ObjectMapper objectMapper;
private DruidNode druidNode;
private OverlordClient overlordClient;
private CompactionStatusTracker statusTracker;
- private final LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
+ private LatchableServiceEmitter serviceEmitter;
@Before
public void setUp() throws Exception
{
- druidServer = new DruidServer("from", "from", null, 5L, ServerType.HISTORICAL, "tier1", 0);
- serverInventoryView = EasyMock.createMock(BatchServerInventoryView.class);
+ serverInventoryView = EasyMock.createMock(ServerInventoryView.class);
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
@@ -147,11 +137,7 @@ public void setUp() throws Exception
)
).andReturn(new AtomicReference<>(DruidCompactionConfig.empty())).anyTimes();
EasyMock.replay(configManager);
- setupServerAndCurator();
- curator.start();
- curator.blockUntilConnected();
- curator.create().creatingParentsIfNeeded().forPath(LOADPATH);
- objectMapper = new DefaultObjectMapper();
+ final ObjectMapper objectMapper = new DefaultObjectMapper();
statusTracker = new CompactionStatusTracker(objectMapper);
druidCoordinatorConfig = new DruidCoordinatorConfig(
new CoordinatorRunConfig(new Duration(COORDINATOR_START_DELAY), new Duration(COORDINATOR_PERIOD)),
@@ -160,26 +146,11 @@ public void setUp() throws Exception
new CostBalancerStrategyFactory(),
null
);
- pathChildrenCache = new PathChildrenCache(
- curator,
- LOADPATH,
- true,
- true,
- Execs.singleThreaded("coordinator_test_path_children_cache-%d")
- );
- loadQueuePeon = new CuratorLoadQueuePeon(
- curator,
- LOADPATH,
- objectMapper,
- Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_scheduled-%d"),
- Execs.singleThreaded("coordinator_test_load_queue_peon-%d"),
- LOAD_TIMEOUT
- );
- loadQueuePeon.start();
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
scheduledExecutorFactory = ScheduledExecutors::fixed;
leaderAnnouncerLatch = new CountDownLatch(1);
leaderUnannouncerLatch = new CountDownLatch(1);
+ serviceEmitter = new LatchableServiceEmitter();
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
createMetadataManager(configManager),
@@ -213,14 +184,6 @@ private MetadataManager createMetadataManager(JacksonConfigManager configManager
);
}
- @After
- public void tearDown() throws Exception
- {
- loadQueuePeon.stop();
- pathChildrenCache.close();
- tearDownServerAndCurator();
- }
-
@Test(timeout = 60_000L)
public void testCoordinatorRun() throws Exception
{
@@ -261,7 +224,8 @@ public void testCoordinatorRun() throws Exception
EasyMock.replay(immutableDruidDataSource);
// Setup ServerInventoryView
- druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, tier, 0);
+ final DruidServer druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, tier, 0);
+ final LoadQueuePeon loadQueuePeon = createImmediateLoadPeonFor(druidServer);
setupPeons(Collections.singletonMap("server1", loadQueuePeon));
EasyMock.expect(serverInventoryView.getInventory()).andReturn(
ImmutableList.of(druidServer)
@@ -280,19 +244,8 @@ public void testCoordinatorRun() throws Exception
// This coordinator should be leader by now
Assert.assertTrue(coordinator.isLeader());
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());
- pathChildrenCache.start();
- final CountDownLatch assignSegmentLatch = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
- 1,
- pathChildrenCache,
- ImmutableMap.of("2010-01-01T00:00:00.000Z_2010-01-02T00:00:00.000Z", dataSegment),
- druidServer
- );
- assignSegmentLatch.await();
-
- final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
- serviceEmitter.latch = coordinatorRunLatch;
- coordinatorRunLatch.await();
+ serviceEmitter.coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus());
@@ -350,42 +303,24 @@ public void testCoordinatorTieredRun() throws Exception
final String dataSource = "dataSource", hotTierName = "hot", coldTierName = "cold";
final Rule hotTier = new IntervalLoadRule(Intervals.of("2018-01-01/P1M"), ImmutableMap.of(hotTierName, 1), null);
final Rule coldTier = new ForeverLoadRule(ImmutableMap.of(coldTierName, 1), null);
- final String loadPathCold = "/druid/loadqueue/cold:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);
- final Map dataSegments = ImmutableMap.of(
- "2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
+ final Set dataSegments = Set.of(
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
- "2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
- "2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);
- final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
- curator,
- loadPathCold,
- objectMapper,
- Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
- Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
- LOAD_TIMEOUT
- );
- final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
- curator,
- loadPathCold,
- true,
- true,
- Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
- );
- setupPeons(ImmutableMap.of("hot", loadQueuePeon, "cold", loadQueuePeonCold));
+ final LoadQueuePeon loadQueuePeonHot = createImmediateLoadPeonFor(hotServer);
+ final LoadQueuePeon loadQueuePeonCold = createImmediateLoadPeonFor(coldServer);
+ setupPeons(ImmutableMap.of("hot", loadQueuePeonHot, "cold", loadQueuePeonCold));
+ loadQueuePeonHot.start();
loadQueuePeonCold.start();
- pathChildrenCache.start();
- pathChildrenCacheCold.start();
DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
- dataSegments.values().forEach(druidDataSources[0]::addSegment);
+ dataSegments.forEach(druidDataSources[0]::addSegment);
setupSegmentsMetadataMock(druidDataSources[0]);
@@ -402,14 +337,7 @@ public void testCoordinatorTieredRun() throws Exception
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
- final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(2, pathChildrenCache, dataSegments, hotServer);
- final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(1, pathChildrenCacheCold, dataSegments, coldServer);
- assignSegmentLatchHot.await();
- assignSegmentLatchCold.await();
-
- final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
- serviceEmitter.latch = coordinatorRunLatch;
- coordinatorRunLatch.await();
+ serviceEmitter.coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus());
@@ -425,7 +353,7 @@ public void testCoordinatorTieredRun() throws Exception
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource));
- dataSegments.values().forEach(dataSegment -> Assert.assertEquals(Integer.valueOf(1), coordinator.getReplicationFactor(dataSegment.getId())));
+ dataSegments.forEach(dataSegment -> Assert.assertEquals(Integer.valueOf(1), coordinator.getReplicationFactor(dataSegment.getId())));
coordinator.stop();
leaderUnannouncerLatch.await();
@@ -443,109 +371,39 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith
final String coldTierName = "cold";
final String tierName1 = "tier1";
final String tierName2 = "tier2";
- final String loadPathCold = "/druid/loadqueue/cold:1234";
- final String loadPathBroker1 = "/druid/loadqueue/broker1:1234";
- final String loadPathBroker2 = "/druid/loadqueue/broker2:1234";
- final String loadPathPeon = "/druid/loadqueue/peon:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);
final DruidServer brokerServer1 = new DruidServer("broker1", "broker1", null, 5L, ServerType.BROKER, tierName1, 0);
final DruidServer brokerServer2 = new DruidServer("broker2", "broker2", null, 5L, ServerType.BROKER, tierName2, 0);
final DruidServer peonServer = new DruidServer("peon", "peon", null, 5L, ServerType.INDEXER_EXECUTOR, tierName2, 0);
- final Map dataSegments = ImmutableMap.of(
- "2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
+ final Set dataSegments = Set.of(
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
- "2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
- "2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);
- final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
- curator,
- loadPathCold,
- objectMapper,
- Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
- Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
- LOAD_TIMEOUT
- );
-
- final LoadQueuePeon loadQueuePeonBroker1 = new CuratorLoadQueuePeon(
- curator,
- loadPathBroker1,
- objectMapper,
- Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker1_scheduled-%d"),
- Execs.singleThreaded("coordinator_test_load_queue_peon_broker1-%d"),
- LOAD_TIMEOUT
- );
-
- final LoadQueuePeon loadQueuePeonBroker2 = new CuratorLoadQueuePeon(
- curator,
- loadPathBroker2,
- objectMapper,
- Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker2_scheduled-%d"),
- Execs.singleThreaded("coordinator_test_load_queue_peon_broker2-%d"),
- LOAD_TIMEOUT
- );
-
- final LoadQueuePeon loadQueuePeonPoenServer = new CuratorLoadQueuePeon(
- curator,
- loadPathPeon,
- objectMapper,
- Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_peon_scheduled-%d"),
- Execs.singleThreaded("coordinator_test_load_queue_peon_peon-%d"),
- LOAD_TIMEOUT
- );
- final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
- curator,
- loadPathCold,
- true,
- true,
- Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
- );
- final PathChildrenCache pathChildrenCacheBroker1 = new PathChildrenCache(
- curator,
- loadPathBroker1,
- true,
- true,
- Execs.singleThreaded("coordinator_test_path_children_cache_broker1-%d")
- );
- final PathChildrenCache pathChildrenCacheBroker2 = new PathChildrenCache(
- curator,
- loadPathBroker2,
- true,
- true,
- Execs.singleThreaded("coordinator_test_path_children_cache_broker2-%d")
- );
- final PathChildrenCache pathChildrenCachePeon = new PathChildrenCache(
- curator,
- loadPathPeon,
- true,
- true,
- Execs.singleThreaded("coordinator_test_path_children_cache_peon-%d")
- );
-
+ final LoadQueuePeon loadQueuePeonHot = createImmediateLoadPeonFor(hotServer);
+ final LoadQueuePeon loadQueuePeonCold = createImmediateLoadPeonFor(coldServer);
+ final LoadQueuePeon loadQueuePeonBroker1 = createImmediateLoadPeonFor(brokerServer1);
+ final LoadQueuePeon loadQueuePeonBroker2 = createImmediateLoadPeonFor(brokerServer2);
+ final LoadQueuePeon loadQueuePeonPoenServer = createImmediateLoadPeonFor(peonServer);
setupPeons(ImmutableMap.of(
- "hot", loadQueuePeon,
+ "hot", loadQueuePeonHot,
"cold", loadQueuePeonCold,
"broker1", loadQueuePeonBroker1,
"broker2", loadQueuePeonBroker2,
"peon", loadQueuePeonPoenServer
));
+ loadQueuePeonHot.start();
loadQueuePeonCold.start();
loadQueuePeonBroker1.start();
loadQueuePeonBroker2.start();
loadQueuePeonPoenServer.start();
- pathChildrenCache.start();
- pathChildrenCacheCold.start();
- pathChildrenCacheBroker1.start();
- pathChildrenCacheBroker2.start();
- pathChildrenCachePeon.start();
DruidDataSource druidDataSource = new DruidDataSource(dataSource, Collections.emptyMap());
- dataSegments.values().forEach(druidDataSource::addSegment);
+ dataSegments.forEach(druidDataSource::addSegment);
setupSegmentsMetadataMock(druidDataSource);
@@ -563,23 +421,10 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
- final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCache, dataSegments, hotServer);
- final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheCold, dataSegments, coldServer);
- final CountDownLatch assignSegmentLatchBroker1 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker1, dataSegments, brokerServer1);
- final CountDownLatch assignSegmentLatchBroker2 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker2, dataSegments, brokerServer2);
- final CountDownLatch assignSegmentLatchPeon = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCachePeon, dataSegments, peonServer);
- assignSegmentLatchHot.await();
- assignSegmentLatchCold.await();
- assignSegmentLatchBroker1.await();
- assignSegmentLatchBroker2.await();
- assignSegmentLatchPeon.await();
-
- final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
- serviceEmitter.latch = coordinatorRunLatch;
- coordinatorRunLatch.await();
+ serviceEmitter.coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus());
- Assert.assertEquals(new HashSet<>(dataSegments.values()), coordinator.getBroadcastSegments());
+ Assert.assertEquals(dataSegments, coordinator.getBroadcastSegments());
// Under-replicated counts are updated only after the next coordinator run
Map> underReplicationCountsPerDataSourcePerTier =
@@ -875,7 +720,6 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception
// Setup SegmentsMetadataManager
DruidDataSource[] dataSources = {
new DruidDataSource(dataSource, Collections.emptyMap())
-
};
final DataSegment dataSegment = new DataSegment(
dataSource,
@@ -908,11 +752,16 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception
EasyMock.replay(immutableDruidDataSource);
// Setup ServerInventoryView
- druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, hotTier, 0);
- DruidServer druidServer2 = new DruidServer("server2", "localhost", null, 5L, ServerType.HISTORICAL, coldTier, 0);
- setupPeons(ImmutableMap.of("server1", loadQueuePeon, "server2", loadQueuePeon));
+ final DruidServer druidServer1 = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, hotTier, 0);
+ final DruidServer druidServer2 = new DruidServer("server2", "localhost", null, 5L, ServerType.HISTORICAL, coldTier, 0);
+
+ // For hot server, use a load queue peon that does not perform immediate load
+ setupPeons(ImmutableMap.of(
+ "server1", new TestLoadQueuePeon(),
+ "server2", createImmediateLoadPeonFor(druidServer2))
+ );
EasyMock.expect(serverInventoryView.getInventory()).andReturn(
- ImmutableList.of(druidServer, druidServer2)
+ ImmutableList.of(druidServer1, druidServer2)
).atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(serverInventoryView, loadQueueTaskMaster);
@@ -925,11 +774,8 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception
// This coordinator should be leader by now
Assert.assertTrue(coordinator.isLeader());
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());
- pathChildrenCache.start();
- final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
- serviceEmitter.latch = coordinatorRunLatch;
- coordinatorRunLatch.await();
+ serviceEmitter.coordinatorRunLatch.await();
Object2IntMap numsUnavailableUsedSegmentsPerDataSource =
coordinator.getDatasourceToUnavailableSegmentCount();
@@ -980,33 +826,6 @@ public void testSimulateRunWithEmptyDatasourceCompactionConfigs()
Assert.assertEquals(Collections.emptyMap(), result.getCompactionStates());
}
- private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
- int latchCount,
- PathChildrenCache pathChildrenCache,
- Map segments,
- DruidServer server
- )
- {
- final CountDownLatch countDownLatch = new CountDownLatch(latchCount);
- pathChildrenCache.getListenable().addListener(
- (CuratorFramework client, PathChildrenCacheEvent event) -> {
- if (CuratorUtils.isChildAdded(event)) {
- DataSegment segment = findSegmentRelatedToCuratorEvent(segments, event);
- if (segment != null && server.getSegment(segment.getId()) == null) {
- if (countDownLatch.getCount() > 0) {
- server.addDataSegment(segment);
- curator.delete().guaranteed().forPath(event.getData().getPath());
- countDownLatch.countDown();
- } else {
- Assert.fail("The segment path " + event.getData().getPath() + " is not expected");
- }
- }
- }
- }
- );
- return countDownLatch;
- }
-
private void setupSegmentsMetadataMock(DruidDataSource dataSource)
{
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
@@ -1041,21 +860,6 @@ private void setupSegmentsMetadataMock(DruidDataSource dataSource)
EasyMock.replay(this.dataSourcesSnapshot);
}
- @Nullable
- private static DataSegment findSegmentRelatedToCuratorEvent(
- Map dataSegments,
- PathChildrenCacheEvent event
- )
- {
- return dataSegments
- .entrySet()
- .stream()
- .filter(x -> event.getData().getPath().contains(x.getKey()))
- .map(Map.Entry::getValue)
- .findFirst()
- .orElse(null);
- }
-
private void setupPeons(Map peonMap)
{
loadQueueTaskMaster.resetPeonsForNewServers(EasyMock.anyObject());
@@ -1071,6 +875,18 @@ private void setupPeons(Map peonMap)
() -> peonMap.get(((ImmutableDruidServer) EasyMock.getCurrentArgument(0)).getName())
).anyTimes();
}
+
+ private LoadQueuePeon createImmediateLoadPeonFor(DruidServer server)
+ {
+ return new TestLoadQueuePeon() {
+ @Override
+ public void loadSegment(DataSegment segment, SegmentAction action, @Nullable LoadPeonCallback callback)
+ {
+ server.addDataSegment(segment);
+ super.loadSegment(segment, action, callback);
+ }
+ };
+ }
private static class TestDruidLeaderSelector implements DruidLeaderSelector
{
@@ -1113,7 +929,7 @@ public void unregisterListener()
private static class LatchableServiceEmitter extends ServiceEmitter
{
- private CountDownLatch latch;
+ private final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
private LatchableServiceEmitter()
{
@@ -1123,8 +939,15 @@ private LatchableServiceEmitter()
@Override
public void emit(Event event)
{
- if (latch != null && "segment/count".equals(event.toMap().get("metric"))) {
- latch.countDown();
+ if (event instanceof ServiceMetricEvent) {
+ final ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
+
+ // Count down when the historical management duties group has finished
+ String dutyGroupName = (String) metricEvent.getUserDims().get("dutyGroup");
+ if (Stats.CoordinatorRun.GROUP_RUN_TIME.getMetricName().equals(metricEvent.getMetric())
+ && "HistoricalManagementDuties".equals(dutyGroupName)) {
+ coordinatorRunLatch.countDown();
+ }
}
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java
deleted file mode 100644
index 4de59af1f2d3..000000000000
--- a/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.loading;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.segment.TestHelper;
-import org.apache.druid.server.coordination.DataSegmentChangeCallback;
-import org.apache.druid.server.coordination.DataSegmentChangeHandler;
-import org.apache.druid.server.coordination.DataSegmentChangeRequest;
-import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
-import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.joda.time.Duration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-public class CuratorLoadQueuePeonTest extends CuratorTestBase
-{
- private static final String LOAD_QUEUE_PATH = "/druid/loadqueue/localhost:1234";
-
- private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
-
- private CuratorLoadQueuePeon loadQueuePeon;
- private PathChildrenCache loadQueueCache;
-
- @Before
- public void setUp() throws Exception
- {
- setupServerAndCurator();
- curator.start();
- curator.blockUntilConnected();
- curator.create().creatingParentsIfNeeded().forPath(LOAD_QUEUE_PATH);
-
- loadQueueCache = new PathChildrenCache(
- curator,
- LOAD_QUEUE_PATH,
- true,
- true,
- Execs.singleThreaded("load_queue_cache-%d")
- );
- }
-
- @Test
- public void testMultipleLoadDropSegments() throws Exception
- {
- loadQueuePeon = new CuratorLoadQueuePeon(
- curator,
- LOAD_QUEUE_PATH,
- jsonMapper,
- Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
- Execs.singleThreaded("test_load_queue_peon-%d"),
- Duration.standardMinutes(15)
- );
-
- loadQueuePeon.start();
-
- ConcurrentMap loadRequestSignals = new ConcurrentHashMap<>(5);
- ConcurrentMap dropRequestSignals = new ConcurrentHashMap<>(5);
- ConcurrentMap segmentLoadedSignals = new ConcurrentHashMap<>(5);
- ConcurrentMap segmentDroppedSignals = new ConcurrentHashMap<>(5);
-
- final List segmentToDrop = Lists.transform(
- ImmutableList.of(
- "2014-10-26T00:00:00Z/P1D",
- "2014-10-25T00:00:00Z/P1D",
- "2014-10-24T00:00:00Z/P1D",
- "2014-10-23T00:00:00Z/P1D",
- "2014-10-22T00:00:00Z/P1D"
- ), new Function<>()
- {
- @Override
- public DataSegment apply(String intervalStr)
- {
- DataSegment dataSegment = dataSegmentWithInterval(intervalStr);
- return dataSegment;
- }
- }
- );
-
- final CountDownLatch[] dropRequestLatches = new CountDownLatch[5];
- final CountDownLatch[] dropSegmentLatches = new CountDownLatch[5];
- for (int i = 0; i < 5; i++) {
- dropRequestLatches[i] = new CountDownLatch(1);
- dropSegmentLatches[i] = new CountDownLatch(1);
- }
- int i = 0;
- for (DataSegment s : segmentToDrop) {
- dropRequestSignals.put(s.getId(), dropRequestLatches[i]);
- segmentDroppedSignals.put(s.getId(), dropSegmentLatches[i++]);
- }
-
- final List segmentToLoad = Lists.transform(
- ImmutableList.of(
- "2014-10-27T00:00:00Z/P1D",
- "2014-10-29T00:00:00Z/P1M",
- "2014-10-31T00:00:00Z/P1D",
- "2014-10-30T00:00:00Z/P1D",
- "2014-10-28T00:00:00Z/P1D"
- ), new Function<>()
- {
- @Override
- public DataSegment apply(String intervalStr)
- {
- DataSegment dataSegment = dataSegmentWithInterval(intervalStr);
- loadRequestSignals.put(dataSegment.getId(), new CountDownLatch(1));
- segmentLoadedSignals.put(dataSegment.getId(), new CountDownLatch(1));
- return dataSegment;
- }
- }
- );
-
- final CountDownLatch[] loadRequestLatches = new CountDownLatch[5];
- final CountDownLatch[] segmentLoadedLatches = new CountDownLatch[5];
- for (i = 0; i < 5; i++) {
- loadRequestLatches[i] = new CountDownLatch(1);
- segmentLoadedLatches[i] = new CountDownLatch(1);
- }
- i = 0;
- for (DataSegment s : segmentToDrop) {
- loadRequestSignals.put(s.getId(), loadRequestLatches[i]);
- segmentLoadedSignals.put(s.getId(), segmentLoadedLatches[i++]);
- }
-
- // segment with latest interval should be loaded first
- final List expectedLoadOrder = Lists.transform(
- ImmutableList.of(
- "2014-10-29T00:00:00Z/P1M",
- "2014-10-31T00:00:00Z/P1D",
- "2014-10-30T00:00:00Z/P1D",
- "2014-10-28T00:00:00Z/P1D",
- "2014-10-27T00:00:00Z/P1D"
- ), intervalStr -> dataSegmentWithInterval(intervalStr)
- );
-
- final DataSegmentChangeHandler handler = new DataSegmentChangeHandler()
- {
- @Override
- public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
- {
- loadRequestSignals.get(segment.getId()).countDown();
- }
-
- @Override
- public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
- {
- dropRequestSignals.get(segment.getId()).countDown();
- }
- };
-
- loadQueueCache.getListenable().addListener(
- (client, event) -> {
- if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
- DataSegmentChangeRequest request = jsonMapper.readValue(
- event.getData().getData(),
- DataSegmentChangeRequest.class
- );
- request.go(handler, null);
- }
- }
- );
- loadQueueCache.start();
-
- for (final DataSegment segment : segmentToDrop) {
- loadQueuePeon.dropSegment(
- segment,
- success -> segmentDroppedSignals.get(segment.getId()).countDown()
- );
- }
-
- for (final DataSegment segment : segmentToLoad) {
- loadQueuePeon.loadSegment(
- segment,
- SegmentAction.LOAD,
- success -> segmentLoadedSignals.get(segment.getId()).countDown()
- );
- }
-
- Assert.assertEquals(6000, loadQueuePeon.getSizeOfSegmentsToLoad());
- Assert.assertEquals(5, loadQueuePeon.getSegmentsToLoad().size());
- Assert.assertEquals(5, loadQueuePeon.getSegmentsToDrop().size());
- Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size());
-
- for (DataSegment segment : segmentToDrop) {
- String dropRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString());
- Assert.assertTrue(
- "Latch not counted down for " + dropRequestSignals.get(segment.getId()),
- dropRequestSignals.get(segment.getId()).await(10, TimeUnit.SECONDS)
- );
- Assert.assertNotNull(
- "Path " + dropRequestPath + " doesn't exist",
- curator.checkExists().forPath(dropRequestPath)
- );
- Assert.assertEquals(
- segment,
- ((SegmentChangeRequestDrop) jsonMapper.readValue(
- curator.getData()
- .decompressed()
- .forPath(dropRequestPath), DataSegmentChangeRequest.class
- )).getSegment()
- );
-
- // simulate completion of drop request by historical
- curator.delete().guaranteed().forPath(dropRequestPath);
- Assert.assertTrue(timing.forWaiting().awaitLatch(segmentDroppedSignals.get(segment.getId())));
- }
-
- for (DataSegment segment : expectedLoadOrder) {
- String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString());
- Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignals.get(segment.getId())));
- Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
- Assert.assertEquals(
- segment,
- ((SegmentChangeRequestLoad) jsonMapper
- .readValue(curator.getData().decompressed().forPath(loadRequestPath), DataSegmentChangeRequest.class))
- .getSegment()
- );
-
- // simulate completion of load request by historical
- curator.delete().guaranteed().forPath(loadRequestPath);
- Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignals.get(segment.getId())));
- }
- }
-
- @Test
- public void testFailAssignForNonTimeoutFailures() throws Exception
- {
- final DataSegment segment = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
-
- final CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
-
- loadQueuePeon = new CuratorLoadQueuePeon(
- curator,
- LOAD_QUEUE_PATH,
- // This will fail inside SegmentChangeProcessor.run()
- null,
- Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
- Execs.singleThreaded("test_load_queue_peon-%d"),
- // set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly
- new Duration(1)
- );
-
- loadQueuePeon.start();
-
- loadQueueCache.start();
-
- loadQueuePeon.loadSegment(
- segment,
- SegmentAction.LOAD,
- success -> segmentLoadedSignal.countDown()
- );
-
- Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal));
- Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
- Assert.assertEquals(0L, loadQueuePeon.getSizeOfSegmentsToLoad());
- Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size());
-
- }
-
- @Test
- public void testFailAssignForLoadDropTimeout() throws Exception
- {
- final DataSegment segment = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
-
- final CountDownLatch loadRequestSignal = new CountDownLatch(1);
- final CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
- final CountDownLatch delayedSegmentLoadedSignal = new CountDownLatch(2);
- final CountDownLatch loadRequestRemoveSignal = new CountDownLatch(1);
-
- loadQueuePeon = new CuratorLoadQueuePeon(
- curator,
- LOAD_QUEUE_PATH,
- jsonMapper,
- Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
- Execs.singleThreaded("test_load_queue_peon-%d"),
- // The timeout here was set to 1ms, when this test was acting flakey. A cursory glance makes me wonder if
- // there's a race where the timeout actually happens before other code can run. 1ms timeout seems aggressive.
- // 100ms is a great price to pay if it removes the flakeyness,
- new Duration(100)
- );
-
- loadQueuePeon.start();
-
- loadQueueCache.getListenable().addListener(
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
- {
- switch (event.getType()) {
- case CHILD_ADDED:
- loadRequestSignal.countDown();
- break;
- case CHILD_REMOVED:
- loadRequestRemoveSignal.countDown();
- break;
- default:
- // pass
- }
- }
- }
- );
- loadQueueCache.start();
-
- loadQueuePeon.loadSegment(
- segment,
- SegmentAction.LOAD,
- success -> {
- segmentLoadedSignal.countDown();
- delayedSegmentLoadedSignal.countDown();
- }
- );
-
- String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString());
-
- Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal));
- Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
- Assert.assertEquals(
- segment,
- ((SegmentChangeRequestLoad) jsonMapper
- .readValue(curator.getData().decompressed().forPath(loadRequestPath), DataSegmentChangeRequest.class))
- .getSegment()
- );
-
- // simulate incompletion of load request since request has timed out
- Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal));
- Assert.assertEquals(1, loadQueuePeon.getSegmentsToLoad().size());
- Assert.assertEquals(1200L, loadQueuePeon.getSizeOfSegmentsToLoad());
- Assert.assertEquals(1, loadQueuePeon.getTimedOutSegments().size());
-
- // simulate completion of load request by historical after time out on coordinator
- curator.delete().guaranteed().forPath(loadRequestPath);
- Assert.assertTrue(timing.forWaiting().awaitLatch(delayedSegmentLoadedSignal));
- Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestRemoveSignal));
- Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
- Assert.assertEquals(0L, loadQueuePeon.getSizeOfSegmentsToLoad());
- Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size());
-
- }
-
- private DataSegment dataSegmentWithInterval(String intervalStr)
- {
- return DataSegment.builder()
- .dataSource("test_load_queue_peon")
- .interval(Intervals.of(intervalStr))
- .loadSpec(ImmutableMap.of())
- .version("2015-05-27T03:38:35.683Z")
- .dimensions(ImmutableList.of())
- .metrics(ImmutableList.of())
- .shardSpec(NoneShardSpec.instance())
- .binaryVersion(9)
- .size(1200)
- .build();
- }
-
- @After
- public void tearDown() throws Exception
- {
- loadQueueCache.close();
- loadQueuePeon.stop();
- tearDownServerAndCurator();
- }
-}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
index 9e8270832845..cd925ccfa724 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
@@ -74,7 +74,6 @@ of the coordinator in these situations.
- It should not be used to verify the absolute values of execution latencies, e.g. the time taken to compute the
balancing cost of a segment. But the relative values can still be a good indicator while doing comparisons between,
say two balancing strategies.
-- It does not support simulation of the zk-based `CuratorLoadQueuePeon`.
## Usage