From 66560ae02f54fc36ce85865e71bc14c7119c1e09 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Tue, 14 Nov 2023 15:19:55 -0800 Subject: [PATCH 01/11] Add delay before the poen drops segements after publishing stage This is similar to make sure in transit query seeing the segment to be dropped in the peon would not fail with a NullPtr exception. The delay approach is similar to the strategy the historical handles segment moving out. --- .../kafka/KafkaIndexTaskTuningConfig.java | 16 ++++-- .../KafkaSupervisorTuningConfig.java | 11 ++-- .../indexing/kafka/KafkaIndexTaskTest.java | 4 +- .../kafka/KafkaIndexTaskTuningConfigTest.java | 5 +- .../kafka/supervisor/KafkaSupervisorTest.java | 8 ++- ...estModifiedKafkaIndexTaskTuningConfig.java | 4 +- .../kinesis/KinesisIndexTaskTuningConfig.java | 16 ++++-- .../KinesisSupervisorTuningConfig.java | 10 ++-- .../kinesis/KinesisIndexTaskSerdeTest.java | 1 + .../kinesis/KinesisIndexTaskTest.java | 4 +- .../KinesisIndexTaskTuningConfigTest.java | 7 ++- .../supervisor/KinesisSupervisorTest.java | 3 ++ ...tModifiedKinesisIndexTaskTuningConfig.java | 7 ++- .../SeekableStreamIndexTaskTuningConfig.java | 19 ++++++- .../SeekableStreamSupervisorSpecTest.java | 1 + .../SeekableStreamSupervisorStateTest.java | 1 + .../druid/segment/indexing/TuningConfig.java | 3 ++ .../appenderator/AppenderatorConfig.java | 5 ++ .../appenderator/StreamAppenderator.java | 54 ++++++++++++++----- 19 files changed, 138 insertions(+), 41 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index be3aac9ac25d..51d325ed1b61 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -51,7 +51,8 @@ public KafkaIndexTaskTuningConfig( @Nullable Period intermediateHandoffPeriod, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions + @Nullable Integer maxSavedParseExceptions, + @Nullable Long dropSegmentDelayMillis ) { super( @@ -74,7 +75,8 @@ public KafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + dropSegmentDelayMillis ); } @@ -97,7 +99,8 @@ private KafkaIndexTaskTuningConfig( @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis ) { this( @@ -119,7 +122,8 @@ private KafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + dropSegmentDelayMillis ); } @@ -145,7 +149,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), - getMaxSavedParseExceptions() + getMaxSavedParseExceptions(), + getDropSegmentDelayMillis() ); } @@ -171,6 +176,7 @@ public String toString() ", logParseExceptions=" + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + ", dropSegmentDelayMillis=" + getDropSegmentDelayMillis() + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index c8cc454bf272..33b2696edd9c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -64,6 +64,7 @@ public static KafkaSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -91,7 +92,8 @@ public KafkaSupervisorTuningConfig( @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis ) { super( @@ -113,7 +115,8 @@ public KafkaSupervisorTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + dropSegmentDelayMillis ); this.workerThreads = workerThreads; this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES); @@ -199,6 +202,7 @@ public String toString() ", logParseExceptions=" + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + ", dropSegmentDelayMillis=" + getDropSegmentDelayMillis() + '}'; } @@ -224,7 +228,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), - getMaxSavedParseExceptions() + getMaxSavedParseExceptions(), + getDropSegmentDelayMillis() ); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index f1001174266b..7201f3c8bc62 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -205,6 +205,7 @@ public static Iterable constructorFeeder() private boolean logParseExceptions = true; private Integer maxParseExceptions = null; private Integer maxSavedParseExceptions = null; + private Long dropSegmentDelayMillis = null; private boolean resetOffsetAutomatically = false; private boolean doHandoff = true; private Integer maxRowsPerSegment = null; @@ -2851,7 +2852,8 @@ private KafkaIndexTask createTask( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + dropSegmentDelayMillis ); if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { final TreeMap> checkpoints = new TreeMap<>(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 88d8de80605b..1db050a7491b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -148,6 +148,7 @@ public void testConvert() null, null, null, + null, null ); KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -187,7 +188,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException null, true, 42, - 42 + 42, + null ); String serialized = mapper.writeValueAsString(base); @@ -236,6 +238,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException true, 42, 42, + null, "extra string" ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 2ff8c5bf91e8..ed96ff6fc5da 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -336,6 +336,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); @@ -489,6 +490,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, null, null, + null, null ), null @@ -3999,6 +4001,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ) ); @@ -4037,6 +4040,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ); @@ -4564,7 +4568,8 @@ public SeekableStreamIndexTaskClient build( null, null, null, - 10 + 10, + null ); return new TestableKafkaSupervisor( @@ -4677,6 +4682,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index fd6cbb48b897..2946a6a655bc 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -55,6 +55,7 @@ public TestModifiedKafkaIndexTaskTuningConfig( @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis, @JsonProperty("extra") String extra ) { @@ -77,7 +78,8 @@ public TestModifiedKafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + dropSegmentDelayMillis ); this.extra = extra; } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index 06d8cf537230..5e9061a96990 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -90,7 +90,8 @@ public KinesisIndexTaskTuningConfig( @Nullable Integer maxParseExceptions, @Nullable Integer maxSavedParseExceptions, @Nullable Integer maxRecordsPerPoll, - @Nullable Period intermediateHandoffPeriod + @Nullable Period intermediateHandoffPeriod, + @Nullable Long dropSegmentDelayMillis ) { super( @@ -113,7 +114,8 @@ public KinesisIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + dropSegmentDelayMillis ); this.recordBufferSize = recordBufferSize; this.recordBufferOfferTimeout = recordBufferOfferTimeout == null @@ -154,7 +156,8 @@ private KinesisIndexTaskTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, - @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis ) { this( @@ -182,7 +185,8 @@ private KinesisIndexTaskTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + dropSegmentDelayMillis ); } @@ -270,7 +274,8 @@ public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir) getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), - getIntermediateHandoffPeriod() + getIntermediateHandoffPeriod(), + getDropSegmentDelayMillis() ); } @@ -333,6 +338,7 @@ public String toString() ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", maxRecordsPerPoll=" + maxRecordsPerPoll + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + + ", dropSegmentDelayMillis=" + getDropSegmentDelayMillis() + '}'; } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 357bc9e57ca4..6874dda38436 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -74,6 +74,7 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -109,7 +110,8 @@ public KinesisSupervisorTuningConfig( @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, - @JsonProperty("useListShards") Boolean useListShards + @JsonProperty("useListShards") Boolean useListShards, + @JsonProperty("dropSegmentDelayMillis") Long dropSegmentDelayMillis ) { super( @@ -137,7 +139,8 @@ public KinesisSupervisorTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + dropSegmentDelayMillis ); this.workerThreads = workerThreads; @@ -268,7 +271,8 @@ public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), - getIntermediateHandoffPeriod() + getIntermediateHandoffPeriod(), + getDropSegmentDelayMillis() ); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 2cba3f54187f..3bb43ba0d3bc 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -76,6 +76,7 @@ public class KinesisIndexTaskSerdeTest null, null, null, + null, null ); private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 69516979f3e1..9ed37712f6a1 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -183,6 +183,7 @@ public static Iterable constructorFeeder() private Integer maxRowsPerSegment = null; private Long maxTotalRows = null; private final Period intermediateHandoffPeriod = null; + private final Long dropSegmentDelayMillis = null; private int maxRecordsPerPoll; @BeforeClass @@ -2362,7 +2363,8 @@ private KinesisIndexTask createTask( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + dropSegmentDelayMillis ); return createTask(taskId, dataSchema, ioConfig, tuningConfig, context); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index cd99521c18e8..2502c6b87f55 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -162,7 +162,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException 500, 500, 6000, - new Period("P3D") + new Period("P3D"), + TuningConfig.DEFAULT_DROP_SEGMENT_DELAY ); String serialized = mapper.writeValueAsString(base); @@ -221,7 +222,8 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException 500, 500, 6000, - new Period("P3D") + new Period("P3D"), + TuningConfig.DEFAULT_DROP_SEGMENT_DELAY ); String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool")); @@ -312,6 +314,7 @@ public void testConvert() null, null, null, + null, null ); KinesisIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 39d943dbebef..a73873013291 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -211,6 +211,7 @@ public void setupTest() null, null, null, + null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -3978,6 +3979,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ); @@ -5085,6 +5087,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index 70611266efe3..3cc610bbeee2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -61,6 +61,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis, @JsonProperty("extra") String extra ) { @@ -89,7 +90,8 @@ public TestModifiedKinesisIndexTaskTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + dropSegmentDelayMillis ); this.extra = extra; } @@ -121,7 +123,8 @@ public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig bas base.getMaxParseExceptions(), base.getMaxSavedParseExceptions(), base.getMaxRecordsPerPollConfigured(), - base.getIntermediateHandoffPeriod() + base.getIntermediateHandoffPeriod(), + base.getDropSegmentDelayMillis() ); this.extra = extra; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index 904f2820af60..07750ae448c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -61,6 +61,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato private final int maxParseExceptions; private final int maxSavedParseExceptions; + private final long dropSegmentDelayMillis; + public SeekableStreamIndexTaskTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, @Nullable Integer maxRowsInMemory, @@ -81,7 +83,8 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable Period intermediateHandoffPeriod, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions + @Nullable Integer maxSavedParseExceptions, + @Nullable Long dropSegmentDelayMillis ) { // Cannot be a static because default basePersistDirectory is unique per-instance @@ -134,6 +137,9 @@ public SeekableStreamIndexTaskTuningConfig( this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; + this.dropSegmentDelayMillis = dropSegmentDelayMillis == null + ? TuningConfig.DEFAULT_DROP_SEGMENT_DELAY + : dropSegmentDelayMillis; } @Override @@ -277,6 +283,13 @@ public boolean isSkipSequenceNumberAvailabilityCheck() return skipSequenceNumberAvailabilityCheck; } + @Override + @JsonProperty + public long getDropSegmentDelayMillis() + { + return dropSegmentDelayMillis; + } + @Override public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); @@ -302,6 +315,7 @@ public boolean equals(Object o) logParseExceptions == that.logParseExceptions && maxParseExceptions == that.maxParseExceptions && maxSavedParseExceptions == that.maxSavedParseExceptions && + dropSegmentDelayMillis == that.dropSegmentDelayMillis && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && @@ -333,7 +347,8 @@ public int hashCode() skipSequenceNumberAvailabilityCheck, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + dropSegmentDelayMillis ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 9215f24a3858..12eb3c7e10ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -517,6 +517,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 7a587bb196e1..d67f88ffdc64 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2017,6 +2017,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java index ba638c2c4858..1fadcd5a037e 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java @@ -26,6 +26,8 @@ import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; +import java.util.concurrent.TimeUnit; + /** */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @@ -54,6 +56,7 @@ public interface TuningConfig int DEFAULT_MAX_ROWS_IN_MEMORY_REALTIME = 150_000; boolean DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK = false; long DEFAULT_AWAIT_SEGMENT_AVAILABILITY_TIMEOUT_MILLIS = 0L; + long DEFAULT_DROP_SEGMENT_DELAY = TimeUnit.SECONDS.toMillis(30); /** * The incremental index implementation to use diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index fff3466a94eb..e0798a45c3c5 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -64,6 +64,11 @@ default Long getMaxTotalRows() @Nullable SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(); + default long getDropSegmentDelayMillis() + { + return TuningConfig.DEFAULT_DROP_SEGMENT_DELAY; + } + default int getMaxColumnsToMerge() { return -1; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index f21f67ed5041..6c482cbe28ae 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -95,6 +95,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -170,6 +172,8 @@ public class StreamAppenderator implements Appenderator private volatile Throwable persistError; + private final ScheduledExecutorService exec; + /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. * @@ -221,6 +225,11 @@ public class StreamAppenderator implements Appenderator maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault(); skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck(); this.useMaxMemoryEstimates = useMaxMemoryEstimates; + + this.exec = Executors.newScheduledThreadPool( + 1, + Execs.makeThreadFactory("StreamAppenderSegmentRemoval-%s") + ); } @Override @@ -1400,24 +1409,41 @@ public Void apply(@Nullable Object input) .emit(); } - droppingSinks.remove(identifier); - sinkTimeline.remove( - sink.getInterval(), - sink.getVersion(), - identifier.getShardSpec().createChunk(sink) + log.info( + "Unannounced segment[%s], scheduling drop in [%d] millisecs", + identifier, + tuningConfig.getDropSegmentDelayMillis() ); - for (FireHydrant hydrant : sink) { - if (cache != null) { - cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + + Runnable removeRunnable = () -> { + droppingSinks.remove(identifier); + sinkTimeline.remove( + sink.getInterval(), + sink.getVersion(), + identifier.getShardSpec().createChunk(sink) + ); + for (FireHydrant hydrant : sink) { + if (cache != null) { + cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + } + hydrant.swapSegment(null); } - hydrant.swapSegment(null); - } - if (removeOnDiskData) { - removeDirectory(computePersistDir(identifier)); - } + if (removeOnDiskData) { + removeDirectory(computePersistDir(identifier)); + } + + log.info("Dropped segment[%s].", identifier); + }; - log.info("Dropped segment[%s].", identifier); + // Keep the segments in the cache and sinkTimeline for dropSegmentDelay after unannouncing the segments + // This way, in transit queries which still see the segments in this peon would be able to query the + // segments and not throw NullPtr exceptions. + exec.schedule( + removeRunnable, + tuningConfig.getDropSegmentDelayMillis(), + TimeUnit.MILLISECONDS + ); return null; } From bd66f721a8f4c2dd8553208f1f2b7ce89f0f1139 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Mon, 11 Dec 2023 16:39:16 -0800 Subject: [PATCH 02/11] revert the approaches to add the delay config in ingestion spec based on feedback; will use another approach --- .../kafka/KafkaIndexTaskTuningConfig.java | 16 +++++----------- .../KafkaSupervisorTuningConfig.java | 11 +++-------- .../indexing/kafka/KafkaIndexTaskTest.java | 4 +--- .../kafka/KafkaIndexTaskTuningConfigTest.java | 5 +---- .../kafka/supervisor/KafkaSupervisorTest.java | 8 +------- ...estModifiedKafkaIndexTaskTuningConfig.java | 4 +--- .../kinesis/KinesisIndexTaskTuningConfig.java | 16 +++++----------- .../KinesisSupervisorTuningConfig.java | 10 +++------- .../kinesis/KinesisIndexTaskSerdeTest.java | 1 - .../kinesis/KinesisIndexTaskTest.java | 4 +--- .../KinesisIndexTaskTuningConfigTest.java | 7 ++----- .../supervisor/KinesisSupervisorTest.java | 3 --- ...tModifiedKinesisIndexTaskTuningConfig.java | 7 ++----- .../SeekableStreamIndexTaskTuningConfig.java | 19 ++----------------- .../SeekableStreamSupervisorSpecTest.java | 1 - .../SeekableStreamSupervisorStateTest.java | 1 - .../druid/segment/indexing/TuningConfig.java | 3 --- .../appenderator/AppenderatorConfig.java | 5 ----- 18 files changed, 27 insertions(+), 98 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index 51d325ed1b61..be3aac9ac25d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -51,8 +51,7 @@ public KafkaIndexTaskTuningConfig( @Nullable Period intermediateHandoffPeriod, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions, - @Nullable Long dropSegmentDelayMillis + @Nullable Integer maxSavedParseExceptions ) { super( @@ -75,8 +74,7 @@ public KafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions, - dropSegmentDelayMillis + maxSavedParseExceptions ); } @@ -99,8 +97,7 @@ private KafkaIndexTaskTuningConfig( @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, - @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions ) { this( @@ -122,8 +119,7 @@ private KafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions, - dropSegmentDelayMillis + maxSavedParseExceptions ); } @@ -149,8 +145,7 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), - getMaxSavedParseExceptions(), - getDropSegmentDelayMillis() + getMaxSavedParseExceptions() ); } @@ -176,7 +171,6 @@ public String toString() ", logParseExceptions=" + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + - ", dropSegmentDelayMillis=" + getDropSegmentDelayMillis() + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 33b2696edd9c..c8cc454bf272 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -64,7 +64,6 @@ public static KafkaSupervisorTuningConfig defaultConfig() null, null, null, - null, null ); } @@ -92,8 +91,7 @@ public KafkaSupervisorTuningConfig( @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, - @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions ) { super( @@ -115,8 +113,7 @@ public KafkaSupervisorTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions, - dropSegmentDelayMillis + maxSavedParseExceptions ); this.workerThreads = workerThreads; this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES); @@ -202,7 +199,6 @@ public String toString() ", logParseExceptions=" + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + - ", dropSegmentDelayMillis=" + getDropSegmentDelayMillis() + '}'; } @@ -228,8 +224,7 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), - getMaxSavedParseExceptions(), - getDropSegmentDelayMillis() + getMaxSavedParseExceptions() ); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 7201f3c8bc62..f1001174266b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -205,7 +205,6 @@ public static Iterable constructorFeeder() private boolean logParseExceptions = true; private Integer maxParseExceptions = null; private Integer maxSavedParseExceptions = null; - private Long dropSegmentDelayMillis = null; private boolean resetOffsetAutomatically = false; private boolean doHandoff = true; private Integer maxRowsPerSegment = null; @@ -2852,8 +2851,7 @@ private KafkaIndexTask createTask( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions, - dropSegmentDelayMillis + maxSavedParseExceptions ); if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { final TreeMap> checkpoints = new TreeMap<>(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 1db050a7491b..88d8de80605b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -148,7 +148,6 @@ public void testConvert() null, null, null, - null, null ); KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -188,8 +187,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException null, true, 42, - 42, - null + 42 ); String serialized = mapper.writeValueAsString(base); @@ -238,7 +236,6 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException true, 42, 42, - null, "extra string" ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 3cce80c04886..b221cdf418cb 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -335,7 +335,6 @@ public SeekableStreamIndexTaskClient build( null, null, null, - null, null ); @@ -489,7 +488,6 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, null, null, - null, null ), null @@ -3999,7 +3997,6 @@ public void testIsTaskCurrent() null, null, null, - null, null ) ); @@ -4038,7 +4035,6 @@ public void testIsTaskCurrent() null, null, null, - null, null ); @@ -4665,8 +4661,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, - 10, - null + 10 ); return new TestableKafkaSupervisor( @@ -4779,7 +4774,6 @@ public SeekableStreamIndexTaskClient build( null, null, null, - null, null ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index 2946a6a655bc..fd6cbb48b897 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -55,7 +55,6 @@ public TestModifiedKafkaIndexTaskTuningConfig( @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, - @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis, @JsonProperty("extra") String extra ) { @@ -78,8 +77,7 @@ public TestModifiedKafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions, - dropSegmentDelayMillis + maxSavedParseExceptions ); this.extra = extra; } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index 5e9061a96990..06d8cf537230 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -90,8 +90,7 @@ public KinesisIndexTaskTuningConfig( @Nullable Integer maxParseExceptions, @Nullable Integer maxSavedParseExceptions, @Nullable Integer maxRecordsPerPoll, - @Nullable Period intermediateHandoffPeriod, - @Nullable Long dropSegmentDelayMillis + @Nullable Period intermediateHandoffPeriod ) { super( @@ -114,8 +113,7 @@ public KinesisIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions, - dropSegmentDelayMillis + maxSavedParseExceptions ); this.recordBufferSize = recordBufferSize; this.recordBufferOfferTimeout = recordBufferOfferTimeout == null @@ -156,8 +154,7 @@ private KinesisIndexTaskTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, - @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, - @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod ) { this( @@ -185,8 +182,7 @@ private KinesisIndexTaskTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod, - dropSegmentDelayMillis + intermediateHandoffPeriod ); } @@ -274,8 +270,7 @@ public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir) getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), - getIntermediateHandoffPeriod(), - getDropSegmentDelayMillis() + getIntermediateHandoffPeriod() ); } @@ -338,7 +333,6 @@ public String toString() ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", maxRecordsPerPoll=" + maxRecordsPerPoll + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + - ", dropSegmentDelayMillis=" + getDropSegmentDelayMillis() + '}'; } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 6874dda38436..357bc9e57ca4 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -74,7 +74,6 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, - null, null ); } @@ -110,8 +109,7 @@ public KinesisSupervisorTuningConfig( @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, - @JsonProperty("useListShards") Boolean useListShards, - @JsonProperty("dropSegmentDelayMillis") Long dropSegmentDelayMillis + @JsonProperty("useListShards") Boolean useListShards ) { super( @@ -139,8 +137,7 @@ public KinesisSupervisorTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod, - dropSegmentDelayMillis + intermediateHandoffPeriod ); this.workerThreads = workerThreads; @@ -271,8 +268,7 @@ public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), - getIntermediateHandoffPeriod(), - getDropSegmentDelayMillis() + getIntermediateHandoffPeriod() ); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 3bb43ba0d3bc..2cba3f54187f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -76,7 +76,6 @@ public class KinesisIndexTaskSerdeTest null, null, null, - null, null ); private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 9ed37712f6a1..69516979f3e1 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -183,7 +183,6 @@ public static Iterable constructorFeeder() private Integer maxRowsPerSegment = null; private Long maxTotalRows = null; private final Period intermediateHandoffPeriod = null; - private final Long dropSegmentDelayMillis = null; private int maxRecordsPerPoll; @BeforeClass @@ -2363,8 +2362,7 @@ private KinesisIndexTask createTask( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod, - dropSegmentDelayMillis + intermediateHandoffPeriod ); return createTask(taskId, dataSchema, ioConfig, tuningConfig, context); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 2502c6b87f55..cd99521c18e8 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -162,8 +162,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException 500, 500, 6000, - new Period("P3D"), - TuningConfig.DEFAULT_DROP_SEGMENT_DELAY + new Period("P3D") ); String serialized = mapper.writeValueAsString(base); @@ -222,8 +221,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException 500, 500, 6000, - new Period("P3D"), - TuningConfig.DEFAULT_DROP_SEGMENT_DELAY + new Period("P3D") ); String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool")); @@ -314,7 +312,6 @@ public void testConvert() null, null, null, - null, null ); KinesisIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 2994477c1309..d907a372d7ed 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -210,7 +210,6 @@ public void setupTest() null, null, null, - null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -3977,7 +3976,6 @@ public void testIsTaskCurrent() null, null, null, - null, null ); @@ -5156,7 +5154,6 @@ public SeekableStreamIndexTaskClient build( null, null, null, - null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index 3cc610bbeee2..70611266efe3 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -61,7 +61,6 @@ public TestModifiedKinesisIndexTaskTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, - @JsonProperty("dropSegmentDelayMillis") @Nullable Long dropSegmentDelayMillis, @JsonProperty("extra") String extra ) { @@ -90,8 +89,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod, - dropSegmentDelayMillis + intermediateHandoffPeriod ); this.extra = extra; } @@ -123,8 +121,7 @@ public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig bas base.getMaxParseExceptions(), base.getMaxSavedParseExceptions(), base.getMaxRecordsPerPollConfigured(), - base.getIntermediateHandoffPeriod(), - base.getDropSegmentDelayMillis() + base.getIntermediateHandoffPeriod() ); this.extra = extra; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index 07750ae448c4..904f2820af60 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -61,8 +61,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato private final int maxParseExceptions; private final int maxSavedParseExceptions; - private final long dropSegmentDelayMillis; - public SeekableStreamIndexTaskTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, @Nullable Integer maxRowsInMemory, @@ -83,8 +81,7 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable Period intermediateHandoffPeriod, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions, - @Nullable Long dropSegmentDelayMillis + @Nullable Integer maxSavedParseExceptions ) { // Cannot be a static because default basePersistDirectory is unique per-instance @@ -137,9 +134,6 @@ public SeekableStreamIndexTaskTuningConfig( this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; - this.dropSegmentDelayMillis = dropSegmentDelayMillis == null - ? TuningConfig.DEFAULT_DROP_SEGMENT_DELAY - : dropSegmentDelayMillis; } @Override @@ -283,13 +277,6 @@ public boolean isSkipSequenceNumberAvailabilityCheck() return skipSequenceNumberAvailabilityCheck; } - @Override - @JsonProperty - public long getDropSegmentDelayMillis() - { - return dropSegmentDelayMillis; - } - @Override public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); @@ -315,7 +302,6 @@ public boolean equals(Object o) logParseExceptions == that.logParseExceptions && maxParseExceptions == that.maxParseExceptions && maxSavedParseExceptions == that.maxSavedParseExceptions && - dropSegmentDelayMillis == that.dropSegmentDelayMillis && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && @@ -347,8 +333,7 @@ public int hashCode() skipSequenceNumberAvailabilityCheck, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions, - dropSegmentDelayMillis + maxSavedParseExceptions ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 12eb3c7e10ab..9215f24a3858 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -517,7 +517,6 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, - null, null ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 2a69ff064cb4..cb653c6aa848 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2019,7 +2019,6 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, - null, null ) { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java index 1fadcd5a037e..ba638c2c4858 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java @@ -26,8 +26,6 @@ import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; -import java.util.concurrent.TimeUnit; - /** */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @@ -56,7 +54,6 @@ public interface TuningConfig int DEFAULT_MAX_ROWS_IN_MEMORY_REALTIME = 150_000; boolean DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK = false; long DEFAULT_AWAIT_SEGMENT_AVAILABILITY_TIMEOUT_MILLIS = 0L; - long DEFAULT_DROP_SEGMENT_DELAY = TimeUnit.SECONDS.toMillis(30); /** * The incremental index implementation to use diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index e0798a45c3c5..fff3466a94eb 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -64,11 +64,6 @@ default Long getMaxTotalRows() @Nullable SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(); - default long getDropSegmentDelayMillis() - { - return TuningConfig.DEFAULT_DROP_SEGMENT_DELAY; - } - default int getMaxColumnsToMerge() { return -1; From 0ec32fac851058a0374d53c58bff167c887d6d74 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Mon, 11 Dec 2023 17:09:52 -0800 Subject: [PATCH 03/11] add the delay config from SegmentLoaderConfig, the same as historical path --- .../druid/indexing/common/TaskToolbox.java | 18 +++++++++++++ .../indexing/common/TaskToolboxFactory.java | 4 +++ .../AppenderatorDriverRealtimeIndexTask.java | 1 + .../SeekableStreamIndexTask.java | 1 + .../indexing/common/TaskToolboxTest.java | 1 + ...penderatorDriverRealtimeIndexTaskTest.java | 1 + .../common/task/RealtimeIndexTaskTest.java | 1 + .../common/task/TestAppenderatorsManager.java | 3 +++ .../SingleTaskBackgroundRunnerTest.java | 1 + .../indexing/overlord/TaskLifecycleTest.java | 1 + .../overlord/TestTaskToolboxFactory.java | 1 + .../SeekableStreamIndexTaskTestBase.java | 1 + .../worker/WorkerTaskManagerTest.java | 1 + .../worker/WorkerTaskMonitorTest.java | 1 + .../realtime/appenderator/Appenderators.java | 3 +++ .../appenderator/AppenderatorsManager.java | 2 ++ .../DefaultRealtimeAppenderatorFactory.java | 1 + ...DummyForInjectionAppenderatorsManager.java | 2 ++ .../PeonAppenderatorsManager.java | 3 +++ .../appenderator/StreamAppenderator.java | 26 ++++++++++++------- .../UnifiedIndexerAppenderatorsManager.java | 3 +++ .../StreamAppenderatorTester.java | 1 + 22 files changed, 68 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 9ebfb96b5677..3e1ac720bb96 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -55,6 +55,7 @@ import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.SegmentCacheManager; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -78,6 +79,7 @@ */ public class TaskToolbox { + private final SegmentLoaderConfig segmentLoaderConfig; private final TaskConfig config; private final DruidNode taskExecutorNode; private final TaskActionClient taskActionClient; @@ -130,6 +132,7 @@ public class TaskToolbox private final String attemptId; public TaskToolbox( + SegmentLoaderConfig segmentLoaderConfig, TaskConfig config, DruidNode taskExecutorNode, TaskActionClient taskActionClient, @@ -171,6 +174,7 @@ public TaskToolbox( String attemptId ) { + this.segmentLoaderConfig = segmentLoaderConfig; this.config = config; this.taskExecutorNode = taskExecutorNode; this.taskActionClient = taskActionClient; @@ -213,6 +217,11 @@ public TaskToolbox( this.attemptId = attemptId; } + public SegmentLoaderConfig getSegmentLoaderConfig() + { + return segmentLoaderConfig; + } + public TaskConfig getConfig() { return config; @@ -504,6 +513,7 @@ public static RuntimeInfo createAdjustedRuntimeInfo( public static class Builder { + private SegmentLoaderConfig segmentLoaderConfig; private TaskConfig config; private DruidNode taskExecutorNode; private TaskActionClient taskActionClient; @@ -550,6 +560,7 @@ public Builder() public Builder(TaskToolbox other) { + this.segmentLoaderConfig = other.segmentLoaderConfig; this.config = other.config; this.taskExecutorNode = other.taskExecutorNode; this.taskActionClient = other.taskActionClient; @@ -589,6 +600,12 @@ public Builder(TaskToolbox other) this.shuffleClient = other.shuffleClient; } + public Builder config(final SegmentLoaderConfig segmentLoaderConfig) + { + this.segmentLoaderConfig = segmentLoaderConfig; + return this; + } + public Builder config(final TaskConfig config) { this.config = config; @@ -826,6 +843,7 @@ public Builder attemptId(final String attemptId) public TaskToolbox build() { return new TaskToolbox( + segmentLoaderConfig, config, taskExecutorNode, taskActionClient, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 288d89919b98..9d9fb11529fd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -56,6 +56,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.DruidNode; @@ -72,6 +73,7 @@ */ public class TaskToolboxFactory { + private final SegmentLoaderConfig segmentLoaderConfig; private final TaskConfig config; private final DruidNode taskExecutorNode; private final TaskActionClientFactory taskActionClientFactory; @@ -115,6 +117,7 @@ public class TaskToolboxFactory @Inject public TaskToolboxFactory( + SegmentLoaderConfig segmentLoadConfig, TaskConfig config, @Parent DruidNode taskExecutorNode, TaskActionClientFactory taskActionClientFactory, @@ -155,6 +158,7 @@ public TaskToolboxFactory( @AttemptId String attemptId ) { + this.segmentLoaderConfig = segmentLoadConfig; this.config = config; this.taskExecutorNode = taskExecutorNode; this.taskActionClientFactory = taskActionClientFactory; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 3a599dd485be..9e8817fc5ccb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -775,6 +775,7 @@ private Appenderator newAppenderator( ) { return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask( + null, getId(), dataSchema, tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index d74ee5c0be26..c881b3814e3d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -187,6 +187,7 @@ public Appenderator newAppenderator( ) { return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask( + toolbox.getSegmentLoaderConfig(), getId(), dataSchema, tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index c1f7a549d65d..8d0d918a7ef9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -115,6 +115,7 @@ public void setUp() throws IOException .build(); taskToolbox = new TaskToolboxFactory( + null, taskConfig, new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), mockTaskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 5e088724f899..5b400a65111d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1606,6 +1606,7 @@ public void close() }; final TestUtils testUtils = new TestUtils(); taskToolboxFactory = new TaskToolboxFactory( + null, taskConfig, new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 9881561d61f1..e38f59d6d7a6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -979,6 +979,7 @@ public void close() }; final TestUtils testUtils = new TestUtils(); final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( + null, taskConfig, null, // taskExecutorNode taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index c4d4364f434f..24e7797602ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.appenderator.Appenderator; import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; @@ -50,6 +51,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager @Override public Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, @@ -72,6 +74,7 @@ public Appenderator createRealtimeAppenderatorForTask( ) { realtimeAppenderator = Appenderators.createRealtime( + segmentLoaderConfig, taskId, schema, config, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 087ae3e1fc16..3e9d776f8c88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -97,6 +97,7 @@ public void setup() throws IOException final ServiceEmitter emitter = new NoopServiceEmitter(); EmittingLogger.registerEmitter(emitter); final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( + null, taskConfig, null, EasyMock.createMock(TaskActionClientFactory.class), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 627c161863b6..f8809bb50525 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -619,6 +619,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( .build(); return new TaskToolboxFactory( + null, taskConfig, new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), tac, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java index 5c6afdbb61b1..33dc249fb41d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java @@ -77,6 +77,7 @@ public TestTaskToolboxFactory( ) { super( + null, bob.config, bob.taskExecutorNode, bob.taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 6be23407a418..2a1a8ac0b08c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -662,6 +662,7 @@ public void close() final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig); toolboxFactory = new TaskToolboxFactory( + null, taskConfig, null, // taskExecutorNode taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 93c5635492da..fb1eba7644b1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -128,6 +128,7 @@ private WorkerTaskManager createWorkerTaskManager() jsonMapper, new TestTaskRunner( new TaskToolboxFactory( + null, taskConfig, null, taskActionClientFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index aecfe29ab1fa..94d70545803d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -170,6 +170,7 @@ private WorkerTaskMonitor createTaskMonitor() jsonMapper, new SingleTaskBackgroundRunner( new TaskToolboxFactory( + null, taskConfig, null, taskActionClientFactory, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index b087c91988cb..974f4a9773ec 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; @@ -43,6 +44,7 @@ public class Appenderators { public static Appenderator createRealtime( + SegmentLoaderConfig segmentLoaderConfig, String id, DataSchema schema, AppenderatorConfig config, @@ -65,6 +67,7 @@ public static Appenderator createRealtime( ) { return new StreamAppenderator( + segmentLoaderConfig, id, schema, config, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java index 75e078e1bb94..7d76f14c4c06 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; @@ -64,6 +65,7 @@ public interface AppenderatorsManager * used for query processing. */ Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index 7698d41c8a04..4f5c5ed5b75c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -96,6 +96,7 @@ public Appenderator build( { final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters(); return Appenderators.createRealtime( + null, schema.getDataSource(), schema, config.withBasePersistDirectory( diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java index 10939cf5356c..281f053fecb6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java @@ -37,6 +37,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; @@ -55,6 +56,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag @Override public Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index 82df08c665a9..2370eb98d01f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -37,6 +37,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; @@ -61,6 +62,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager @Override public Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, @@ -88,6 +90,7 @@ public Appenderator createRealtimeAppenderatorForTask( throw new ISE("A batch appenderator was already created for this peon's task."); } else { realtimeAppenderator = Appenderators.createRealtime( + segmentLoaderConfig, taskId, schema, config, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 6c482cbe28ae..62c9fa3d4467 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -67,6 +67,7 @@ import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; @@ -172,6 +173,7 @@ public class StreamAppenderator implements Appenderator private volatile Throwable persistError; + private final SegmentLoaderConfig segmentLoaderConfig; private final ScheduledExecutorService exec; /** @@ -184,6 +186,7 @@ public class StreamAppenderator implements Appenderator * Appenderators. */ StreamAppenderator( + SegmentLoaderConfig segmentLoaderConfig, String id, DataSchema schema, AppenderatorConfig tuningConfig, @@ -200,6 +203,7 @@ public class StreamAppenderator implements Appenderator boolean useMaxMemoryEstimates ) { + this.segmentLoaderConfig = segmentLoaderConfig; this.myId = id; this.schema = Preconditions.checkNotNull(schema, "schema"); this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); @@ -1412,7 +1416,7 @@ public Void apply(@Nullable Object input) log.info( "Unannounced segment[%s], scheduling drop in [%d] millisecs", identifier, - tuningConfig.getDropSegmentDelayMillis() + segmentLoaderConfig.getDropSegmentDelayMillis() ); Runnable removeRunnable = () -> { @@ -1436,14 +1440,18 @@ public Void apply(@Nullable Object input) log.info("Dropped segment[%s].", identifier); }; - // Keep the segments in the cache and sinkTimeline for dropSegmentDelay after unannouncing the segments - // This way, in transit queries which still see the segments in this peon would be able to query the - // segments and not throw NullPtr exceptions. - exec.schedule( - removeRunnable, - tuningConfig.getDropSegmentDelayMillis(), - TimeUnit.MILLISECONDS - ); + if (segmentLoaderConfig == null) { + removeRunnable.run(); + } else { + // Keep the segments in the cache and sinkTimeline for dropSegmentDelay after unannouncing the segments + // This way, in transit queries which still see the segments in this peon would be able to query the + // segments and not throw NullPtr exceptions. + exec.schedule( + removeRunnable, + segmentLoaderConfig.getDropSegmentDelayMillis(), + TimeUnit.MILLISECONDS + ); + } return null; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 0a728eb890c3..b9be326c8225 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -62,6 +62,7 @@ import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; @@ -149,6 +150,7 @@ public UnifiedIndexerAppenderatorsManager( @Override public Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, @@ -177,6 +179,7 @@ public Appenderator createRealtimeAppenderatorForTask( ); Appenderator appenderator = new StreamAppenderator( + null, taskId, schema, rewriteAppenderatorConfigMemoryLimits(config), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index 217c90116c3f..ad168b9557c0 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -210,6 +210,7 @@ public Map makeLoadSpec(URI uri) } }; appenderator = Appenderators.createRealtime( + null, schema.getDataSource(), schema, tuningConfig, From bbcb70f69371b2083c6d5c2b97a568e7e5ba98a1 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Mon, 18 Dec 2023 23:58:39 -0800 Subject: [PATCH 04/11] add shutdown for exec thread pool and address the null segmentloadconfig issue --- .../appenderator/StreamAppenderator.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 62c9fa3d4467..793ff7dfd27a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -1183,6 +1183,10 @@ private void shutdownExecutors() if (intermediateTempExecutor != null) { intermediateTempExecutor.shutdownNow(); } + + if (exec != null) { + exec.shutdownNow(); + } } private void resetNextFlush() @@ -1413,12 +1417,6 @@ public Void apply(@Nullable Object input) .emit(); } - log.info( - "Unannounced segment[%s], scheduling drop in [%d] millisecs", - identifier, - segmentLoaderConfig.getDropSegmentDelayMillis() - ); - Runnable removeRunnable = () -> { droppingSinks.remove(identifier); sinkTimeline.remove( @@ -1441,8 +1439,17 @@ public Void apply(@Nullable Object input) }; if (segmentLoaderConfig == null) { + log.info( + "Unannounced segment[%s]", + identifier + ); removeRunnable.run(); } else { + log.info( + "Unannounced segment[%s], scheduling drop in [%d] millisecs", + identifier, + segmentLoaderConfig.getDropSegmentDelayMillis() + ); // Keep the segments in the cache and sinkTimeline for dropSegmentDelay after unannouncing the segments // This way, in transit queries which still see the segments in this peon would be able to query the // segments and not throw NullPtr exceptions. From 0e77531a9ce4a5cef0aef4ecf1600f90a1386417 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Tue, 19 Dec 2023 18:07:10 -0800 Subject: [PATCH 05/11] added test cases --- .../appenderator/StreamAppenderatorTest.java | 77 ++++++++++ .../StreamAppenderatorTester.java | 135 +++++++++++++----- 2 files changed, 174 insertions(+), 38 deletions(-) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 2e05cb9053fb..d285c7ec1f86 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -950,6 +950,83 @@ public void testVerifyRowIngestionMetrics() throws Exception } } + @Test(timeout = 10_000L) + public void testDelayedDrop() throws Exception + { + try ( + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(2) + .basePersistDirectory(temporaryFolder.newFolder()) + .enablePushFailure(true) + .withSegmentDropDelayInMilli(1000) + .build()) { + final Appenderator appenderator = tester.getAppenderator(); + appenderator.startJob(); + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(1), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil())); + + // Query1: 2000/2001 + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals(ImmutableList.of(Intervals.of("2000/2001"))) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .build(); + + appenderator.drop(IDENTIFIERS.get(0)).get(); + + // segment 0 won't be dropped immediately + final List> results1 = + QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query1", + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L)) + ) + ), + results1 + ); + + // segment 0 would eventually be dropped at some time after 1 secs drop delay + boolean dropped = false; + int loopCount = 0; + while (true) { + Thread.sleep(1000); + final List> results = QueryPlus.wrap(query1) + .run(appenderator, ResponseContext.createEmpty()) + .toList(); + List> expectedResult = + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L)) + ) + ); + dropped = expectedResult.equals(results); + if (dropped) { + break; + } + loopCount++; + if (loopCount >= 10) { + break; + } + } + Assert.assertTrue("segment dropped after configured delay", dropped); + } + } + @Test public void testQueryByIntervals() throws Exception { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index ad168b9557c0..3663af38b012 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -63,6 +63,7 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; @@ -93,6 +94,7 @@ public class StreamAppenderatorTester implements AutoCloseable private final List pushedSegments = new CopyOnWriteArrayList<>(); public StreamAppenderatorTester( + final int delayInMilli, final int maxRowsInMemory, final long maxSizeInBytes, final File basePersistDirectory, @@ -209,44 +211,93 @@ public Map makeLoadSpec(URI uri) throw new UnsupportedOperationException(); } }; - appenderator = Appenderators.createRealtime( - null, - schema.getDataSource(), - schema, - tuningConfig, - metrics, - dataSegmentPusher, - objectMapper, - indexIO, - indexMerger, - new DefaultQueryRunnerFactoryConglomerate( - ImmutableMap.of( - TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(), - new TimeseriesQueryEngine(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER - ), - ScanQuery.class, new ScanQueryRunnerFactory( - new ScanQueryQueryToolChest( - new ScanQueryConfig(), - new DefaultGenericQueryMetricsFactory() - ), - new ScanQueryEngine(), - new ScanQueryConfig() - ) - ) - ), - new NoopDataSegmentAnnouncer(), - emitter, - new ForwardingQueryProcessingPool(queryExecutor), - NoopJoinableFactory.INSTANCE, - MapCache.create(2048), - new CacheConfig(), - new CachePopulatorStats(), - rowIngestionMeters, - new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), - true - ); + if (delayInMilli <= 0) { + appenderator = Appenderators.createRealtime( + null, + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.of( + TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + ScanQuery.class, new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + ), + new NoopDataSegmentAnnouncer(), + emitter, + new ForwardingQueryProcessingPool(queryExecutor), + NoopJoinableFactory.INSTANCE, + MapCache.create(2048), + new CacheConfig(), + new CachePopulatorStats(), + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + true + ); + } else { + SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() + { + @Override + public int getDropSegmentDelayMillis() + { + return delayInMilli; + } + }; + appenderator = Appenderators.createRealtime( + segmentLoaderConfig, + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.of( + TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + ScanQuery.class, new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + ), + new NoopDataSegmentAnnouncer(), + emitter, + new ForwardingQueryProcessingPool(queryExecutor), + NoopJoinableFactory.INSTANCE, + MapCache.create(2048), + new CacheConfig(), + new CachePopulatorStats(), + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + true + ); + } } private long getDefaultMaxBytesInMemory() @@ -306,6 +357,7 @@ public static class Builder private boolean enablePushFailure; private RowIngestionMeters rowIngestionMeters; private boolean skipBytesInMemoryOverheadCheck; + private int delayInMilli = 0; public Builder maxRowsInMemory(final int maxRowsInMemory) { @@ -343,9 +395,16 @@ public Builder skipBytesInMemoryOverheadCheck(final boolean skipBytesInMemoryOve return this; } + public Builder withSegmentDropDelayInMilli(int delayInMilli) + { + this.delayInMilli = delayInMilli; + return this; + } + public StreamAppenderatorTester build() { return new StreamAppenderatorTester( + delayInMilli, maxRowsInMemory, maxSizeInBytes, Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory"), From a012a8a05f450344eb29371e9a7309833f97a0e9 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Tue, 19 Dec 2023 20:34:53 -0800 Subject: [PATCH 06/11] make sure drop config is passed to toolbox --- .../org/apache/druid/indexing/common/TaskToolboxFactory.java | 1 + 1 file changed, 1 insertion(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 9d9fb11529fd..f2df3ddc3a3a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -214,6 +214,7 @@ public TaskToolbox build(TaskConfig config, Task task) final File taskWorkDir = config.getTaskWorkDir(task.getId()); return new TaskToolbox.Builder() .config(config) + .config(segmentLoaderConfig) .taskExecutorNode(taskExecutorNode) .taskActionClient(taskActionClientFactory.create(task)) .emitter(emitter) From 3fe0a26f3a4189373246d5a0cc99c3c17d6f7cbf Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Tue, 19 Dec 2023 20:51:02 -0800 Subject: [PATCH 07/11] added one more test to cover the TaskToolbox creation with segmentLoadConfig --- .../apache/druid/indexing/common/TaskToolboxTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index 8d0d918a7ef9..134115a56b1b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -48,6 +48,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager; @@ -94,6 +95,7 @@ public class TaskToolboxTest private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class); private Cache mockCache = EasyMock.createMock(Cache.class); private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class); + private SegmentLoaderConfig segmentLoaderConfig = EasyMock.createMock(SegmentLoaderConfig.class); @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -115,7 +117,7 @@ public void setUp() throws IOException .build(); taskToolbox = new TaskToolboxFactory( - null, + segmentLoaderConfig, taskConfig, new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), mockTaskActionClientFactory, @@ -163,6 +165,11 @@ public void testGetDataSegmentArchiver() Assert.assertEquals(mockDataSegmentArchiver, taskToolbox.build(task).getDataSegmentArchiver()); } + @Test + public void testGetSegmentLoaderConfig() { + Assert.assertEquals(segmentLoaderConfig, taskToolbox.build(task).getSegmentLoaderConfig()); + } + @Test public void testGetSegmentAnnouncer() { From e8f4af9bca8b8fdc629d62503063c003635136b5 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Tue, 19 Dec 2023 22:18:24 -0800 Subject: [PATCH 08/11] fix code style check issue and intellij check --- .../java/org/apache/druid/indexing/common/TaskToolboxTest.java | 3 ++- .../segment/realtime/appenderator/StreamAppenderatorTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index 134115a56b1b..75ac2eeb6701 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -166,7 +166,8 @@ public void testGetDataSegmentArchiver() } @Test - public void testGetSegmentLoaderConfig() { + public void testGetSegmentLoaderConfig() + { Assert.assertEquals(segmentLoaderConfig, taskToolbox.build(task).getSegmentLoaderConfig()); } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index d285c7ec1f86..3bbbd2c43ab0 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -1000,7 +1000,7 @@ public void testDelayedDrop() throws Exception ); // segment 0 would eventually be dropped at some time after 1 secs drop delay - boolean dropped = false; + boolean dropped; int loopCount = 0; while (true) { Thread.sleep(1000); From de46a1aedb12d9228b2eebd25ec538e4b9fc4dfa Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Wed, 20 Dec 2023 14:00:46 -0800 Subject: [PATCH 09/11] as suggested, added a custom drop executor to make test robust --- .../appenderator/StreamAppenderator.java | 11 ++- .../appenderator/StreamAppenderatorTest.java | 75 ++++++++++++------- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 793ff7dfd27a..83e4f9907097 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -174,7 +174,7 @@ public class StreamAppenderator implements Appenderator private volatile Throwable persistError; private final SegmentLoaderConfig segmentLoaderConfig; - private final ScheduledExecutorService exec; + private ScheduledExecutorService exec; /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. @@ -236,6 +236,15 @@ public class StreamAppenderator implements Appenderator ); } + @VisibleForTesting + void setExec(ScheduledExecutorService testExec) + { + if (exec != null) { + exec.shutdown(); + } + exec = testExec; + } + @Override public String getId() { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 3bbbd2c43ab0..718fe2d51ab0 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -31,6 +31,7 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryPlus; @@ -59,8 +60,14 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class StreamAppenderatorTest extends InitializedNullHandlingTest @@ -950,9 +957,35 @@ public void testVerifyRowIngestionMetrics() throws Exception } } - @Test(timeout = 10_000L) + @Test public void testDelayedDrop() throws Exception { + class TestScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor + { + ScheduledFuture scheduledFuture; + + public TestScheduledThreadPoolExecutor() + { + super(1); + } + + @Override + public ScheduledFuture schedule( + Runnable command, + long delay, TimeUnit unit + ) + { + ScheduledFuture future = super.schedule(command, delay, unit); + scheduledFuture = future; + return future; + } + + ScheduledFuture getLastScheduledFuture() + { + return scheduledFuture; + } + } + try ( final StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2) @@ -961,6 +994,9 @@ public void testDelayedDrop() throws Exception .withSegmentDropDelayInMilli(1000) .build()) { final Appenderator appenderator = tester.getAppenderator(); + TestScheduledThreadPoolExecutor testExec = new TestScheduledThreadPoolExecutor(); + ((StreamAppenderator) appenderator).setExec(testExec); + appenderator.startJob(); appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil())); appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil())); @@ -1000,30 +1036,19 @@ public void testDelayedDrop() throws Exception ); // segment 0 would eventually be dropped at some time after 1 secs drop delay - boolean dropped; - int loopCount = 0; - while (true) { - Thread.sleep(1000); - final List> results = QueryPlus.wrap(query1) - .run(appenderator, ResponseContext.createEmpty()) - .toList(); - List> expectedResult = - ImmutableList.of( - new Result<>( - DateTimes.of("2000"), - new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L)) - ) - ); - dropped = expectedResult.equals(results); - if (dropped) { - break; - } - loopCount++; - if (loopCount >= 10) { - break; - } - } - Assert.assertTrue("segment dropped after configured delay", dropped); + testExec.getLastScheduledFuture().get(); + + final List> results = QueryPlus.wrap(query1) + .run(appenderator, ResponseContext.createEmpty()) + .toList(); + List> expectedResults = + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L)) + ) + ); + Assert.assertEquals("query after dropped", expectedResults, results); } } From 740e32b9efd31bfa64f968fd5e527d9a1e8af463 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Wed, 20 Dec 2023 21:42:12 -0800 Subject: [PATCH 10/11] remove the unused imports --- .../segment/realtime/appenderator/StreamAppenderatorTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 718fe2d51ab0..3ca8474459d9 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -31,7 +31,6 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryPlus; @@ -60,11 +59,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; From 711e820c2812a60b456d40c4b0c33d24f9d5eea7 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Wed, 20 Dec 2023 21:47:52 -0800 Subject: [PATCH 11/11] Enhance test by giving a time bound for waiting the drop future. This would avoid blocking test case infinitely if something goes unexpected. --- .../segment/realtime/appenderator/StreamAppenderatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 3ca8474459d9..bf3458b09759 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -1032,7 +1032,7 @@ ScheduledFuture getLastScheduledFuture() ); // segment 0 would eventually be dropped at some time after 1 secs drop delay - testExec.getLastScheduledFuture().get(); + testExec.getLastScheduledFuture().get(5000, TimeUnit.MILLISECONDS); final List> results = QueryPlus.wrap(query1) .run(appenderator, ResponseContext.createEmpty())