Skip to content

Commit

Permalink
Check if supervisor could be idle on startup (#16844)
Browse files Browse the repository at this point in the history
Fixes #13936 

In cases where a supervisor is idle and the overlord is restarted for some reason, the supervisor would
start spinning tasks again. In clusters where there are many low throughput streams, this would spike
the task count unnecessarily.

This commit compares the latest stream offset with the ones in metadata during the startup of supervisor
and sets it to idle state if they match.
  • Loading branch information
adithyachakilam authored Aug 9, 2024
1 parent 3d6cedb commit a7dd436
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,10 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
autoscaler.start();
supervisor.runInternal();
Thread.sleep(1000);
supervisor.runInternal();
verifyAll();

int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScale);
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState());


KafkaIndexTask task = captured.getValue();
Assert.assertEquals(KafkaSupervisorTest.dataSchema, task.getDataSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3640,12 +3640,21 @@ private void checkIfStreamInactiveAndTurnSupervisorIdle()
return;
}

Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream = getLatestSequencesFromStream();
final long nowTime = DateTimes.nowUtc().getMillis();
// if it is the first run and there is no lag observed when compared to the offsets from metadata storage stay idle
if (!stateManager.isAtLeastOneSuccessfulRun()) {
// Set previous sequences to the current offsets in metadata store
previousSequencesFromStream.clear();
previousSequencesFromStream.putAll(getOffsetsFromMetadataStorage());

// Force update partition lag since the reporting thread might not have run yet
updatePartitionLagFromStream();
}

Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream = getLatestSequencesFromStream();
final boolean idle;
final long idleTime;
if (lastActiveTimeMillis > 0
&& previousSequencesFromStream.equals(latestSequencesFromStream)
if (previousSequencesFromStream.equals(latestSequencesFromStream)
&& computeTotalLag() == 0) {
idleTime = nowTime - lastActiveTimeMillis;
idle = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,8 @@ public Duration getEmissionDuration()

replayAll();

SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.setStreamOffsets(ImmutableMap.of("0", "10"));
supervisor.start();

Assert.assertTrue(supervisor.stateManager.isHealthy());
Expand Down Expand Up @@ -757,6 +757,93 @@ public Duration getEmissionDuration()
verifyAll();
}

@Test
public void testIdleOnStartUpAndTurnsToRunningAfterLagUpdates()
{
Map<String, String> initialOffsets = ImmutableMap.of("0", "10");
Map<String, String> laterOffsets = ImmutableMap.of("0", "20");

EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
initialOffsets
)
)
).anyTimes();
EasyMock.reset(spec);
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
1,
new Period("PT1H"),
new Period("PT1S"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
null,
new IdleConfig(true, 200L),
null
)
{
}).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()
{
@Override
public Duration getEmissionDuration()
{
return new Period("PT1S").toStandardDuration();
}
}).anyTimes();
EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(recordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(true)
.anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
replayAll();

TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

supervisor.start();

Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

supervisor.setStreamOffsets(initialOffsets);
supervisor.runInternal();

Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

supervisor.setStreamOffsets(laterOffsets);
supervisor.runInternal();

Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
}

@Test
public void testCreatingTasksFailRecoveryFail()
{
Expand Down Expand Up @@ -2872,6 +2959,8 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()

private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
{
Map<String, String> streamOffsets = new HashMap<>();

@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
Expand All @@ -2883,6 +2972,17 @@ public LagStats computeLagStats()
{
return new LagStats(0, 0, 0);
}

@Override
protected Map<String, String> getLatestSequencesFromStream()
{
return streamOffsets;
}

public void setStreamOffsets(Map<String, String> streamOffsets)
{
this.streamOffsets = streamOffsets;
}
}

private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,26 @@ protected void doTestIndexDataWithIdleConfigEnabled(@Nullable Boolean transactio
"wait for no more creation of indexing tasks"
);

indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
indexer.submitSupervisor(taskSpec);

ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.IDLE.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
true,
10000,
30,
"Waiting for supervisor to be idle"
);
ITRetryUtil.retryUntil(
() -> indexer.getRunningTasks()
.stream()
.noneMatch(taskResponseObject -> taskResponseObject.getId().contains(dataSource)),
true,
1000,
10,
"wait for no more creation of indexing tasks"
);

// Start generating remainning half of the data
numWritten += streamGenerator.run(
generatedTestConfig.getStreamName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,10 @@ public synchronized void maybeSetState(State proposedState)
return;
}

// if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) or IDLE state but haven't had a successful run
// if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) but haven't had a successful run
// yet, refuse to switch and prefer the more specific states used for first run (CONNECTING_TO_STREAM,
// DISCOVERING_INITIAL_TASKS, CREATING_TASKS, etc.)
if ((healthySteadyState.equals(proposedState) || BasicState.IDLE.equals(proposedState))
&& !atLeastOneSuccessfulRun) {
if (healthySteadyState.equals(proposedState) && !atLeastOneSuccessfulRun) {
return;
}

Expand Down

0 comments on commit a7dd436

Please sign in to comment.