diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 24d919918f47..fe851a183a26 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -358,6 +358,7 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception Thread.sleep(1 * 1000); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScale); + autoscaler.stop(); } @Test @@ -435,6 +436,7 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception Thread.sleep(1 * 1000); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountAfterScale); + autoscaler.stop(); } @Test diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1da5b4fbb9cc..86c4ba385bdd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -422,13 +422,19 @@ public boolean equals(Object obj) // change taskCount without resubmitting. private class DynamicAllocationTasksNotice implements Notice { - Callable scaleAction; + Callable computeDesiredTaskCount; ServiceEmitter emitter; + Runnable onSuccessfulScale; private static final String TYPE = "dynamic_allocation_tasks_notice"; - DynamicAllocationTasksNotice(Callable scaleAction, ServiceEmitter emitter) + DynamicAllocationTasksNotice( + Callable computeDesiredTaskCount, + Runnable onSuccessfulScale, + ServiceEmitter emitter + ) { - this.scaleAction = scaleAction; + this.computeDesiredTaskCount = computeDesiredTaskCount; + this.onSuccessfulScale = onSuccessfulScale; this.emitter = emitter; } @@ -470,7 +476,7 @@ public void handle() return; } } - final Integer desiredTaskCount = scaleAction.call(); + final Integer desiredTaskCount = computeDesiredTaskCount.call(); ServiceMetricEvent.Builder event = ServiceMetricEvent.builder() .setDimension(DruidMetrics.DATASOURCE, dataSource) .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()); @@ -500,6 +506,7 @@ public void handle() boolean allocationSuccess = changeTaskCount(desiredTaskCount); if (allocationSuccess) { + onSuccessfulScale.run(); dynamicTriggerLastRunTime = nowTime; } } @@ -1260,9 +1267,13 @@ public void tryInit() } } - public Runnable buildDynamicAllocationTask(Callable scaleAction, ServiceEmitter emitter) + public Runnable buildDynamicAllocationTask( + Callable scaleAction, + Runnable onSuccessfulScale, + ServiceEmitter emitter + ) { - return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter)); + return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter)); } private Runnable buildRunTask() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 22e36841199b..648d8a655e92 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -86,10 +86,6 @@ public void start() int desiredTaskCount = -1; try { desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue)); - - if (desiredTaskCount != -1) { - lagMetricsQueue.clear(); - } } catch (Exception ex) { log.warn(ex, "Exception while computing desired task count for [%s]", dataSource); @@ -100,6 +96,19 @@ public void start() return desiredTaskCount; }; + Runnable onSuccessfulScale = () -> { + LOCK.lock(); + try { + lagMetricsQueue.clear(); + } + catch (Exception ex) { + log.warn(ex, "Exception while clearing lags for [%s]", dataSource); + } + finally { + LOCK.unlock(); + } + }; + lagComputationExec.scheduleAtFixedRate( computeAndCollectLag(), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up @@ -107,7 +116,7 @@ public void start() TimeUnit.MILLISECONDS ); allocationExec.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask(scaleAction, emitter), + supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig .getLagCollectionRangeMillis(), lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),