Skip to content

Commit

Permalink
Add delay before the peon drops the segments after publishing them (#…
Browse files Browse the repository at this point in the history
…15373)

Currently in the realtime ingestion (Kafka/Kinesis) case, after publishing the segments, upon acknowledgement from the coordinator that the segments are already placed in some historicals, the peon would unannounce the segments (basically saying the segments are not in this peon anymore to the whole cluster) and drop the segments from cache and sink timeline in one shot.

The in transit queries from the brokers that still thinks the segments are in the peon can get a NullPointer exception when the peon is unsetting the hydrants in the sinks.

The fix would let the peon to wait for a configurable delay period before dropping segments, remove segments from cache etc after the peon unannounce the segments.

This delayed approach is similar to how the historicals handle segments moving out.
  • Loading branch information
kaisun2000 authored Jan 2, 2024
1 parent cce5394 commit a5e9b14
Show file tree
Hide file tree
Showing 23 changed files with 319 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand All @@ -78,6 +79,7 @@
*/
public class TaskToolbox
{
private final SegmentLoaderConfig segmentLoaderConfig;
private final TaskConfig config;
private final DruidNode taskExecutorNode;
private final TaskActionClient taskActionClient;
Expand Down Expand Up @@ -130,6 +132,7 @@ public class TaskToolbox
private final String attemptId;

public TaskToolbox(
SegmentLoaderConfig segmentLoaderConfig,
TaskConfig config,
DruidNode taskExecutorNode,
TaskActionClient taskActionClient,
Expand Down Expand Up @@ -171,6 +174,7 @@ public TaskToolbox(
String attemptId
)
{
this.segmentLoaderConfig = segmentLoaderConfig;
this.config = config;
this.taskExecutorNode = taskExecutorNode;
this.taskActionClient = taskActionClient;
Expand Down Expand Up @@ -213,6 +217,11 @@ public TaskToolbox(
this.attemptId = attemptId;
}

public SegmentLoaderConfig getSegmentLoaderConfig()
{
return segmentLoaderConfig;
}

public TaskConfig getConfig()
{
return config;
Expand Down Expand Up @@ -504,6 +513,7 @@ public static RuntimeInfo createAdjustedRuntimeInfo(

public static class Builder
{
private SegmentLoaderConfig segmentLoaderConfig;
private TaskConfig config;
private DruidNode taskExecutorNode;
private TaskActionClient taskActionClient;
Expand Down Expand Up @@ -550,6 +560,7 @@ public Builder()

public Builder(TaskToolbox other)
{
this.segmentLoaderConfig = other.segmentLoaderConfig;
this.config = other.config;
this.taskExecutorNode = other.taskExecutorNode;
this.taskActionClient = other.taskActionClient;
Expand Down Expand Up @@ -589,6 +600,12 @@ public Builder(TaskToolbox other)
this.shuffleClient = other.shuffleClient;
}

public Builder config(final SegmentLoaderConfig segmentLoaderConfig)
{
this.segmentLoaderConfig = segmentLoaderConfig;
return this;
}

public Builder config(final TaskConfig config)
{
this.config = config;
Expand Down Expand Up @@ -826,6 +843,7 @@ public Builder attemptId(final String attemptId)
public TaskToolbox build()
{
return new TaskToolbox(
segmentLoaderConfig,
config,
taskExecutorNode,
taskActionClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
Expand All @@ -72,6 +73,7 @@
*/
public class TaskToolboxFactory
{
private final SegmentLoaderConfig segmentLoaderConfig;
private final TaskConfig config;
private final DruidNode taskExecutorNode;
private final TaskActionClientFactory taskActionClientFactory;
Expand Down Expand Up @@ -115,6 +117,7 @@ public class TaskToolboxFactory

@Inject
public TaskToolboxFactory(
SegmentLoaderConfig segmentLoadConfig,
TaskConfig config,
@Parent DruidNode taskExecutorNode,
TaskActionClientFactory taskActionClientFactory,
Expand Down Expand Up @@ -155,6 +158,7 @@ public TaskToolboxFactory(
@AttemptId String attemptId
)
{
this.segmentLoaderConfig = segmentLoadConfig;
this.config = config;
this.taskExecutorNode = taskExecutorNode;
this.taskActionClientFactory = taskActionClientFactory;
Expand Down Expand Up @@ -210,6 +214,7 @@ public TaskToolbox build(TaskConfig config, Task task)
final File taskWorkDir = config.getTaskWorkDir(task.getId());
return new TaskToolbox.Builder()
.config(config)
.config(segmentLoaderConfig)
.taskExecutorNode(taskExecutorNode)
.taskActionClient(taskActionClientFactory.create(task))
.emitter(emitter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ private Appenderator newAppenderator(
)
{
return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
null,
getId(),
dataSchema,
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public Appenderator newAppenderator(
)
{
return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
toolbox.getSegmentLoaderConfig(),
getId(),
dataSchema,
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class TaskToolboxTest
private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
private Cache mockCache = EasyMock.createMock(Cache.class);
private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class);
private SegmentLoaderConfig segmentLoaderConfig = EasyMock.createMock(SegmentLoaderConfig.class);

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand All @@ -115,6 +117,7 @@ public void setUp() throws IOException
.build();

taskToolbox = new TaskToolboxFactory(
segmentLoaderConfig,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
mockTaskActionClientFactory,
Expand Down Expand Up @@ -162,6 +165,12 @@ public void testGetDataSegmentArchiver()
Assert.assertEquals(mockDataSegmentArchiver, taskToolbox.build(task).getDataSegmentArchiver());
}

@Test
public void testGetSegmentLoaderConfig()
{
Assert.assertEquals(segmentLoaderConfig, taskToolbox.build(task).getSegmentLoaderConfig());
}

@Test
public void testGetSegmentAnnouncer()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,7 @@ public void close()
};
final TestUtils testUtils = new TestUtils();
taskToolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ public void close()
};
final TestUtils testUtils = new TestUtils();
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
null, // taskExecutorNode
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
Expand All @@ -50,6 +51,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager

@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
Expand All @@ -72,6 +74,7 @@ public Appenderator createRealtimeAppenderatorForTask(
)
{
realtimeAppenderator = Appenderators.createRealtime(
segmentLoaderConfig,
taskId,
schema,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void setup() throws IOException
final ServiceEmitter emitter = new NoopServiceEmitter();
EmittingLogger.registerEmitter(emitter);
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
null,
EasyMock.createMock(TaskActionClientFactory.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory(
.build();

return new TaskToolboxFactory(
null,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
tac,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public TestTaskToolboxFactory(
)
{
super(
null,
bob.config,
bob.taskExecutorNode,
bob.taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ public void close()
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);

toolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
null, // taskExecutorNode
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ private WorkerTaskManager createWorkerTaskManager()
jsonMapper,
new TestTaskRunner(
new TaskToolboxFactory(
null,
taskConfig,
null,
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ private WorkerTaskMonitor createTaskMonitor()
jsonMapper,
new SingleTaskBackgroundRunner(
new TaskToolboxFactory(
null,
taskConfig,
null,
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
Expand All @@ -43,6 +44,7 @@
public class Appenderators
{
public static Appenderator createRealtime(
SegmentLoaderConfig segmentLoaderConfig,
String id,
DataSchema schema,
AppenderatorConfig config,
Expand All @@ -65,6 +67,7 @@ public static Appenderator createRealtime(
)
{
return new StreamAppenderator(
segmentLoaderConfig,
id,
schema,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
Expand Down Expand Up @@ -64,6 +65,7 @@ public interface AppenderatorsManager
* used for query processing.
*/
Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public Appenderator build(
{
final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
return Appenderators.createRealtime(
null,
schema.getDataSource(),
schema,
config.withBasePersistDirectory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
Expand All @@ -55,6 +56,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag

@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
Expand All @@ -61,6 +62,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager

@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
Expand Down Expand Up @@ -88,6 +90,7 @@ public Appenderator createRealtimeAppenderatorForTask(
throw new ISE("A batch appenderator was already created for this peon's task.");
} else {
realtimeAppenderator = Appenderators.createRealtime(
segmentLoaderConfig,
taskId,
schema,
config,
Expand Down
Loading

0 comments on commit a5e9b14

Please sign in to comment.