diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 9ebfb96b5677..3e1ac720bb96 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -55,6 +55,7 @@ import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.SegmentCacheManager; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -78,6 +79,7 @@ */ public class TaskToolbox { + private final SegmentLoaderConfig segmentLoaderConfig; private final TaskConfig config; private final DruidNode taskExecutorNode; private final TaskActionClient taskActionClient; @@ -130,6 +132,7 @@ public class TaskToolbox private final String attemptId; public TaskToolbox( + SegmentLoaderConfig segmentLoaderConfig, TaskConfig config, DruidNode taskExecutorNode, TaskActionClient taskActionClient, @@ -171,6 +174,7 @@ public TaskToolbox( String attemptId ) { + this.segmentLoaderConfig = segmentLoaderConfig; this.config = config; this.taskExecutorNode = taskExecutorNode; this.taskActionClient = taskActionClient; @@ -213,6 +217,11 @@ public TaskToolbox( this.attemptId = attemptId; } + public SegmentLoaderConfig getSegmentLoaderConfig() + { + return segmentLoaderConfig; + } + public TaskConfig getConfig() { return config; @@ -504,6 +513,7 @@ public static RuntimeInfo createAdjustedRuntimeInfo( public static class Builder { + private SegmentLoaderConfig segmentLoaderConfig; private TaskConfig config; private DruidNode taskExecutorNode; private TaskActionClient taskActionClient; @@ -550,6 +560,7 @@ public Builder() public Builder(TaskToolbox other) { + this.segmentLoaderConfig = other.segmentLoaderConfig; this.config = other.config; this.taskExecutorNode = other.taskExecutorNode; this.taskActionClient = other.taskActionClient; @@ -589,6 +600,12 @@ public Builder(TaskToolbox other) this.shuffleClient = other.shuffleClient; } + public Builder config(final SegmentLoaderConfig segmentLoaderConfig) + { + this.segmentLoaderConfig = segmentLoaderConfig; + return this; + } + public Builder config(final TaskConfig config) { this.config = config; @@ -826,6 +843,7 @@ public Builder attemptId(final String attemptId) public TaskToolbox build() { return new TaskToolbox( + segmentLoaderConfig, config, taskExecutorNode, taskActionClient, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 288d89919b98..f2df3ddc3a3a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -56,6 +56,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.DruidNode; @@ -72,6 +73,7 @@ */ public class TaskToolboxFactory { + private final SegmentLoaderConfig segmentLoaderConfig; private final TaskConfig config; private final DruidNode taskExecutorNode; private final TaskActionClientFactory taskActionClientFactory; @@ -115,6 +117,7 @@ public class TaskToolboxFactory @Inject public TaskToolboxFactory( + SegmentLoaderConfig segmentLoadConfig, TaskConfig config, @Parent DruidNode taskExecutorNode, TaskActionClientFactory taskActionClientFactory, @@ -155,6 +158,7 @@ public TaskToolboxFactory( @AttemptId String attemptId ) { + this.segmentLoaderConfig = segmentLoadConfig; this.config = config; this.taskExecutorNode = taskExecutorNode; this.taskActionClientFactory = taskActionClientFactory; @@ -210,6 +214,7 @@ public TaskToolbox build(TaskConfig config, Task task) final File taskWorkDir = config.getTaskWorkDir(task.getId()); return new TaskToolbox.Builder() .config(config) + .config(segmentLoaderConfig) .taskExecutorNode(taskExecutorNode) .taskActionClient(taskActionClientFactory.create(task)) .emitter(emitter) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 3a599dd485be..9e8817fc5ccb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -775,6 +775,7 @@ private Appenderator newAppenderator( ) { return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask( + null, getId(), dataSchema, tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index d74ee5c0be26..c881b3814e3d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -187,6 +187,7 @@ public Appenderator newAppenderator( ) { return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask( + toolbox.getSegmentLoaderConfig(), getId(), dataSchema, tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index c1f7a549d65d..75ac2eeb6701 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -48,6 +48,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager; @@ -94,6 +95,7 @@ public class TaskToolboxTest private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class); private Cache mockCache = EasyMock.createMock(Cache.class); private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class); + private SegmentLoaderConfig segmentLoaderConfig = EasyMock.createMock(SegmentLoaderConfig.class); @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -115,6 +117,7 @@ public void setUp() throws IOException .build(); taskToolbox = new TaskToolboxFactory( + segmentLoaderConfig, taskConfig, new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), mockTaskActionClientFactory, @@ -162,6 +165,12 @@ public void testGetDataSegmentArchiver() Assert.assertEquals(mockDataSegmentArchiver, taskToolbox.build(task).getDataSegmentArchiver()); } + @Test + public void testGetSegmentLoaderConfig() + { + Assert.assertEquals(segmentLoaderConfig, taskToolbox.build(task).getSegmentLoaderConfig()); + } + @Test public void testGetSegmentAnnouncer() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 5e088724f899..5b400a65111d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1606,6 +1606,7 @@ public void close() }; final TestUtils testUtils = new TestUtils(); taskToolboxFactory = new TaskToolboxFactory( + null, taskConfig, new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 9881561d61f1..e38f59d6d7a6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -979,6 +979,7 @@ public void close() }; final TestUtils testUtils = new TestUtils(); final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( + null, taskConfig, null, // taskExecutorNode taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index c4d4364f434f..24e7797602ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.appenderator.Appenderator; import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; @@ -50,6 +51,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager @Override public Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, @@ -72,6 +74,7 @@ public Appenderator createRealtimeAppenderatorForTask( ) { realtimeAppenderator = Appenderators.createRealtime( + segmentLoaderConfig, taskId, schema, config, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 087ae3e1fc16..3e9d776f8c88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -97,6 +97,7 @@ public void setup() throws IOException final ServiceEmitter emitter = new NoopServiceEmitter(); EmittingLogger.registerEmitter(emitter); final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( + null, taskConfig, null, EasyMock.createMock(TaskActionClientFactory.class), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 627c161863b6..f8809bb50525 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -619,6 +619,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( .build(); return new TaskToolboxFactory( + null, taskConfig, new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), tac, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java index 5c6afdbb61b1..33dc249fb41d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java @@ -77,6 +77,7 @@ public TestTaskToolboxFactory( ) { super( + null, bob.config, bob.taskExecutorNode, bob.taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 6be23407a418..2a1a8ac0b08c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -662,6 +662,7 @@ public void close() final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig); toolboxFactory = new TaskToolboxFactory( + null, taskConfig, null, // taskExecutorNode taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 93c5635492da..fb1eba7644b1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -128,6 +128,7 @@ private WorkerTaskManager createWorkerTaskManager() jsonMapper, new TestTaskRunner( new TaskToolboxFactory( + null, taskConfig, null, taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index aecfe29ab1fa..94d70545803d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -170,6 +170,7 @@ private WorkerTaskMonitor createTaskMonitor() jsonMapper, new SingleTaskBackgroundRunner( new TaskToolboxFactory( + null, taskConfig, null, taskActionClientFactory, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index b087c91988cb..974f4a9773ec 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; @@ -43,6 +44,7 @@ public class Appenderators { public static Appenderator createRealtime( + SegmentLoaderConfig segmentLoaderConfig, String id, DataSchema schema, AppenderatorConfig config, @@ -65,6 +67,7 @@ public static Appenderator createRealtime( ) { return new StreamAppenderator( + segmentLoaderConfig, id, schema, config, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java index 75e078e1bb94..7d76f14c4c06 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; @@ -64,6 +65,7 @@ public interface AppenderatorsManager * used for query processing. */ Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index 7698d41c8a04..4f5c5ed5b75c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -96,6 +96,7 @@ public Appenderator build( { final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters(); return Appenderators.createRealtime( + null, schema.getDataSource(), schema, config.withBasePersistDirectory( diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java index 10939cf5356c..281f053fecb6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java @@ -37,6 +37,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; @@ -55,6 +56,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag @Override public Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index 82df08c665a9..2370eb98d01f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -37,6 +37,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; @@ -61,6 +62,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager @Override public Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, @@ -88,6 +90,7 @@ public Appenderator createRealtimeAppenderatorForTask( throw new ISE("A batch appenderator was already created for this peon's task."); } else { realtimeAppenderator = Appenderators.createRealtime( + segmentLoaderConfig, taskId, schema, config, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index f21f67ed5041..83e4f9907097 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -67,6 +67,7 @@ import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; @@ -95,6 +96,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -170,6 +173,9 @@ public class StreamAppenderator implements Appenderator private volatile Throwable persistError; + private final SegmentLoaderConfig segmentLoaderConfig; + private ScheduledExecutorService exec; + /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. * @@ -180,6 +186,7 @@ public class StreamAppenderator implements Appenderator * Appenderators. */ StreamAppenderator( + SegmentLoaderConfig segmentLoaderConfig, String id, DataSchema schema, AppenderatorConfig tuningConfig, @@ -196,6 +203,7 @@ public class StreamAppenderator implements Appenderator boolean useMaxMemoryEstimates ) { + this.segmentLoaderConfig = segmentLoaderConfig; this.myId = id; this.schema = Preconditions.checkNotNull(schema, "schema"); this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); @@ -221,6 +229,20 @@ public class StreamAppenderator implements Appenderator maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault(); skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck(); this.useMaxMemoryEstimates = useMaxMemoryEstimates; + + this.exec = Executors.newScheduledThreadPool( + 1, + Execs.makeThreadFactory("StreamAppenderSegmentRemoval-%s") + ); + } + + @VisibleForTesting + void setExec(ScheduledExecutorService testExec) + { + if (exec != null) { + exec.shutdown(); + } + exec = testExec; } @Override @@ -1170,6 +1192,10 @@ private void shutdownExecutors() if (intermediateTempExecutor != null) { intermediateTempExecutor.shutdownNow(); } + + if (exec != null) { + exec.shutdownNow(); + } } private void resetNextFlush() @@ -1400,24 +1426,48 @@ public Void apply(@Nullable Object input) .emit(); } - droppingSinks.remove(identifier); - sinkTimeline.remove( - sink.getInterval(), - sink.getVersion(), - identifier.getShardSpec().createChunk(sink) - ); - for (FireHydrant hydrant : sink) { - if (cache != null) { - cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + Runnable removeRunnable = () -> { + droppingSinks.remove(identifier); + sinkTimeline.remove( + sink.getInterval(), + sink.getVersion(), + identifier.getShardSpec().createChunk(sink) + ); + for (FireHydrant hydrant : sink) { + if (cache != null) { + cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + } + hydrant.swapSegment(null); } - hydrant.swapSegment(null); - } - if (removeOnDiskData) { - removeDirectory(computePersistDir(identifier)); - } + if (removeOnDiskData) { + removeDirectory(computePersistDir(identifier)); + } + + log.info("Dropped segment[%s].", identifier); + }; - log.info("Dropped segment[%s].", identifier); + if (segmentLoaderConfig == null) { + log.info( + "Unannounced segment[%s]", + identifier + ); + removeRunnable.run(); + } else { + log.info( + "Unannounced segment[%s], scheduling drop in [%d] millisecs", + identifier, + segmentLoaderConfig.getDropSegmentDelayMillis() + ); + // Keep the segments in the cache and sinkTimeline for dropSegmentDelay after unannouncing the segments + // This way, in transit queries which still see the segments in this peon would be able to query the + // segments and not throw NullPtr exceptions. + exec.schedule( + removeRunnable, + segmentLoaderConfig.getDropSegmentDelayMillis(), + TimeUnit.MILLISECONDS + ); + } return null; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 0a728eb890c3..b9be326c8225 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -62,6 +62,7 @@ import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; @@ -149,6 +150,7 @@ public UnifiedIndexerAppenderatorsManager( @Override public Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, @@ -177,6 +179,7 @@ public Appenderator createRealtimeAppenderatorForTask( ); Appenderator appenderator = new StreamAppenderator( + null, taskId, schema, rewriteAppenderatorConfigMemoryLimits(config), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 2e05cb9053fb..bf3458b09759 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -61,6 +61,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class StreamAppenderatorTest extends InitializedNullHandlingTest @@ -950,6 +953,101 @@ public void testVerifyRowIngestionMetrics() throws Exception } } + @Test + public void testDelayedDrop() throws Exception + { + class TestScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor + { + ScheduledFuture scheduledFuture; + + public TestScheduledThreadPoolExecutor() + { + super(1); + } + + @Override + public ScheduledFuture schedule( + Runnable command, + long delay, TimeUnit unit + ) + { + ScheduledFuture future = super.schedule(command, delay, unit); + scheduledFuture = future; + return future; + } + + ScheduledFuture getLastScheduledFuture() + { + return scheduledFuture; + } + } + + try ( + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(2) + .basePersistDirectory(temporaryFolder.newFolder()) + .enablePushFailure(true) + .withSegmentDropDelayInMilli(1000) + .build()) { + final Appenderator appenderator = tester.getAppenderator(); + TestScheduledThreadPoolExecutor testExec = new TestScheduledThreadPoolExecutor(); + ((StreamAppenderator) appenderator).setExec(testExec); + + appenderator.startJob(); + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(1), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil())); + + // Query1: 2000/2001 + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals(ImmutableList.of(Intervals.of("2000/2001"))) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .build(); + + appenderator.drop(IDENTIFIERS.get(0)).get(); + + // segment 0 won't be dropped immediately + final List> results1 = + QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query1", + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L)) + ) + ), + results1 + ); + + // segment 0 would eventually be dropped at some time after 1 secs drop delay + testExec.getLastScheduledFuture().get(5000, TimeUnit.MILLISECONDS); + + final List> results = QueryPlus.wrap(query1) + .run(appenderator, ResponseContext.createEmpty()) + .toList(); + List> expectedResults = + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L)) + ) + ); + Assert.assertEquals("query after dropped", expectedResults, results); + } + } + @Test public void testQueryByIntervals() throws Exception { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index 217c90116c3f..3663af38b012 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -63,6 +63,7 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; @@ -93,6 +94,7 @@ public class StreamAppenderatorTester implements AutoCloseable private final List pushedSegments = new CopyOnWriteArrayList<>(); public StreamAppenderatorTester( + final int delayInMilli, final int maxRowsInMemory, final long maxSizeInBytes, final File basePersistDirectory, @@ -209,43 +211,93 @@ public Map makeLoadSpec(URI uri) throw new UnsupportedOperationException(); } }; - appenderator = Appenderators.createRealtime( - schema.getDataSource(), - schema, - tuningConfig, - metrics, - dataSegmentPusher, - objectMapper, - indexIO, - indexMerger, - new DefaultQueryRunnerFactoryConglomerate( - ImmutableMap.of( - TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(), - new TimeseriesQueryEngine(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER - ), - ScanQuery.class, new ScanQueryRunnerFactory( - new ScanQueryQueryToolChest( - new ScanQueryConfig(), - new DefaultGenericQueryMetricsFactory() - ), - new ScanQueryEngine(), - new ScanQueryConfig() - ) - ) - ), - new NoopDataSegmentAnnouncer(), - emitter, - new ForwardingQueryProcessingPool(queryExecutor), - NoopJoinableFactory.INSTANCE, - MapCache.create(2048), - new CacheConfig(), - new CachePopulatorStats(), - rowIngestionMeters, - new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), - true - ); + if (delayInMilli <= 0) { + appenderator = Appenderators.createRealtime( + null, + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.of( + TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + ScanQuery.class, new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + ), + new NoopDataSegmentAnnouncer(), + emitter, + new ForwardingQueryProcessingPool(queryExecutor), + NoopJoinableFactory.INSTANCE, + MapCache.create(2048), + new CacheConfig(), + new CachePopulatorStats(), + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + true + ); + } else { + SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() + { + @Override + public int getDropSegmentDelayMillis() + { + return delayInMilli; + } + }; + appenderator = Appenderators.createRealtime( + segmentLoaderConfig, + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.of( + TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + ScanQuery.class, new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + ), + new NoopDataSegmentAnnouncer(), + emitter, + new ForwardingQueryProcessingPool(queryExecutor), + NoopJoinableFactory.INSTANCE, + MapCache.create(2048), + new CacheConfig(), + new CachePopulatorStats(), + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + true + ); + } } private long getDefaultMaxBytesInMemory() @@ -305,6 +357,7 @@ public static class Builder private boolean enablePushFailure; private RowIngestionMeters rowIngestionMeters; private boolean skipBytesInMemoryOverheadCheck; + private int delayInMilli = 0; public Builder maxRowsInMemory(final int maxRowsInMemory) { @@ -342,9 +395,16 @@ public Builder skipBytesInMemoryOverheadCheck(final boolean skipBytesInMemoryOve return this; } + public Builder withSegmentDropDelayInMilli(int delayInMilli) + { + this.delayInMilli = delayInMilli; + return this; + } + public StreamAppenderatorTester build() { return new StreamAppenderatorTester( + delayInMilli, maxRowsInMemory, maxSizeInBytes, Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory"),