Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delay before the peon drops the segments after publishing them #15373

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand All @@ -78,6 +79,7 @@
*/
public class TaskToolbox
{
private final SegmentLoaderConfig segmentLoaderConfig;
private final TaskConfig config;
private final DruidNode taskExecutorNode;
private final TaskActionClient taskActionClient;
Expand Down Expand Up @@ -130,6 +132,7 @@ public class TaskToolbox
private final String attemptId;

public TaskToolbox(
SegmentLoaderConfig segmentLoaderConfig,
TaskConfig config,
DruidNode taskExecutorNode,
TaskActionClient taskActionClient,
Expand Down Expand Up @@ -171,6 +174,7 @@ public TaskToolbox(
String attemptId
)
{
this.segmentLoaderConfig = segmentLoaderConfig;
this.config = config;
this.taskExecutorNode = taskExecutorNode;
this.taskActionClient = taskActionClient;
Expand Down Expand Up @@ -213,6 +217,11 @@ public TaskToolbox(
this.attemptId = attemptId;
}

public SegmentLoaderConfig getSegmentLoaderConfig()
{
return segmentLoaderConfig;
}

public TaskConfig getConfig()
{
return config;
Expand Down Expand Up @@ -504,6 +513,7 @@ public static RuntimeInfo createAdjustedRuntimeInfo(

public static class Builder
{
private SegmentLoaderConfig segmentLoaderConfig;
private TaskConfig config;
private DruidNode taskExecutorNode;
private TaskActionClient taskActionClient;
Expand Down Expand Up @@ -550,6 +560,7 @@ public Builder()

public Builder(TaskToolbox other)
{
this.segmentLoaderConfig = other.segmentLoaderConfig;
this.config = other.config;
this.taskExecutorNode = other.taskExecutorNode;
this.taskActionClient = other.taskActionClient;
Expand Down Expand Up @@ -589,6 +600,12 @@ public Builder(TaskToolbox other)
this.shuffleClient = other.shuffleClient;
}

public Builder config(final SegmentLoaderConfig segmentLoaderConfig)
{
this.segmentLoaderConfig = segmentLoaderConfig;
return this;
}

public Builder config(final TaskConfig config)
{
this.config = config;
Expand Down Expand Up @@ -826,6 +843,7 @@ public Builder attemptId(final String attemptId)
public TaskToolbox build()
{
return new TaskToolbox(
segmentLoaderConfig,
config,
taskExecutorNode,
taskActionClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
Expand All @@ -72,6 +73,7 @@
*/
public class TaskToolboxFactory
{
private final SegmentLoaderConfig segmentLoaderConfig;
private final TaskConfig config;
private final DruidNode taskExecutorNode;
private final TaskActionClientFactory taskActionClientFactory;
Expand Down Expand Up @@ -115,6 +117,7 @@ public class TaskToolboxFactory

@Inject
public TaskToolboxFactory(
SegmentLoaderConfig segmentLoadConfig,
TaskConfig config,
@Parent DruidNode taskExecutorNode,
TaskActionClientFactory taskActionClientFactory,
Expand Down Expand Up @@ -155,6 +158,7 @@ public TaskToolboxFactory(
@AttemptId String attemptId
)
{
this.segmentLoaderConfig = segmentLoadConfig;
this.config = config;
this.taskExecutorNode = taskExecutorNode;
this.taskActionClientFactory = taskActionClientFactory;
Expand Down Expand Up @@ -210,6 +214,7 @@ public TaskToolbox build(TaskConfig config, Task task)
final File taskWorkDir = config.getTaskWorkDir(task.getId());
return new TaskToolbox.Builder()
.config(config)
.config(segmentLoaderConfig)
.taskExecutorNode(taskExecutorNode)
.taskActionClient(taskActionClientFactory.create(task))
.emitter(emitter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ private Appenderator newAppenderator(
)
{
return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
null,
getId(),
dataSchema,
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public Appenderator newAppenderator(
)
{
return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
toolbox.getSegmentLoaderConfig(),
getId(),
dataSchema,
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class TaskToolboxTest
private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
private Cache mockCache = EasyMock.createMock(Cache.class);
private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class);
private SegmentLoaderConfig segmentLoaderConfig = EasyMock.createMock(SegmentLoaderConfig.class);

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand All @@ -115,6 +117,7 @@ public void setUp() throws IOException
.build();

taskToolbox = new TaskToolboxFactory(
segmentLoaderConfig,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
mockTaskActionClientFactory,
Expand Down Expand Up @@ -162,6 +165,12 @@ public void testGetDataSegmentArchiver()
Assert.assertEquals(mockDataSegmentArchiver, taskToolbox.build(task).getDataSegmentArchiver());
}

@Test
public void testGetSegmentLoaderConfig()
{
Assert.assertEquals(segmentLoaderConfig, taskToolbox.build(task).getSegmentLoaderConfig());
}

@Test
public void testGetSegmentAnnouncer()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,7 @@ public void close()
};
final TestUtils testUtils = new TestUtils();
taskToolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ public void close()
};
final TestUtils testUtils = new TestUtils();
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
null, // taskExecutorNode
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
Expand All @@ -50,6 +51,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager

@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
Expand All @@ -72,6 +74,7 @@ public Appenderator createRealtimeAppenderatorForTask(
)
{
realtimeAppenderator = Appenderators.createRealtime(
segmentLoaderConfig,
taskId,
schema,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void setup() throws IOException
final ServiceEmitter emitter = new NoopServiceEmitter();
EmittingLogger.registerEmitter(emitter);
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
null,
EasyMock.createMock(TaskActionClientFactory.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory(
.build();

return new TaskToolboxFactory(
null,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
tac,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public TestTaskToolboxFactory(
)
{
super(
null,
bob.config,
bob.taskExecutorNode,
bob.taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ public void close()
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);

toolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
null, // taskExecutorNode
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ private WorkerTaskManager createWorkerTaskManager()
jsonMapper,
new TestTaskRunner(
new TaskToolboxFactory(
null,
taskConfig,
null,
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ private WorkerTaskMonitor createTaskMonitor()
jsonMapper,
new SingleTaskBackgroundRunner(
new TaskToolboxFactory(
null,
taskConfig,
null,
taskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
Expand All @@ -43,6 +44,7 @@
public class Appenderators
{
public static Appenderator createRealtime(
SegmentLoaderConfig segmentLoaderConfig,
String id,
DataSchema schema,
AppenderatorConfig config,
Expand All @@ -65,6 +67,7 @@ public static Appenderator createRealtime(
)
{
return new StreamAppenderator(
segmentLoaderConfig,
id,
schema,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
Expand Down Expand Up @@ -64,6 +65,7 @@ public interface AppenderatorsManager
* used for query processing.
*/
Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public Appenderator build(
{
final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
return Appenderators.createRealtime(
null,
schema.getDataSource(),
schema,
config.withBasePersistDirectory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
Expand All @@ -55,6 +56,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag

@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
Expand All @@ -61,6 +62,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager

@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
Expand Down Expand Up @@ -88,6 +90,7 @@ public Appenderator createRealtimeAppenderatorForTask(
throw new ISE("A batch appenderator was already created for this peon's task.");
} else {
realtimeAppenderator = Appenderators.createRealtime(
segmentLoaderConfig,
taskId,
schema,
config,
Expand Down
Loading
Loading