diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java index 4c053d89455d..8739fbed9b55 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.joda.time.DateTime; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Map; @@ -53,7 +54,9 @@ public RabbitStreamIndexTaskIOConfig( @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, - @JsonProperty("uri") String uri) + @JsonProperty("uri") String uri, + @JsonProperty("taskDuration") Duration taskDuration + ) { super( taskGroupId, @@ -63,7 +66,9 @@ public RabbitStreamIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat); + inputFormat, + taskDuration + ); this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; this.uri = uri; diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index 222022e3e66f..d0c1045141ca 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -59,7 +59,6 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -202,7 +201,9 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), - rabbitConfig.getUri()); + rabbitConfig.getUri(), + ioConfig.getTaskDuration() + ); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index 82c9ad71c973..3d21d2d8aa46 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; import org.joda.time.DateTime; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Map; @@ -63,7 +64,8 @@ public KafkaIndexTaskIOConfig( @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides, - @JsonProperty("multiTopic") @Nullable Boolean multiTopic + @JsonProperty("multiTopic") @Nullable Boolean multiTopic, + @JsonProperty("taskDuration") Duration taskDuration ) { super( @@ -76,7 +78,8 @@ public KafkaIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat + inputFormat, + taskDuration ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); @@ -107,7 +110,8 @@ public KafkaIndexTaskIOConfig( DateTime minimumMessageTime, DateTime maximumMessageTime, InputFormat inputFormat, - KafkaConfigOverrides configOverrides + KafkaConfigOverrides configOverrides, + Duration taskDuration ) { this( @@ -124,7 +128,8 @@ public KafkaIndexTaskIOConfig( maximumMessageTime, inputFormat, configOverrides, - KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC + KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC, + taskDuration ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 9e1e09850db0..cff3e508aa0c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -219,7 +219,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( maximumMessageTime, ioConfig.getInputFormat(), kafkaIoConfig.getConfigOverrides(), - kafkaIoConfig.isMultiTopic() + kafkaIoConfig.isMultiTopic(), + ioConfig.getTaskDuration() ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 23bdeb14acb8..854d9234ced9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -120,6 +120,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -357,7 +358,8 @@ public void testRunAfterDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); Assert.assertTrue(task.supportsQueries()); @@ -413,7 +415,8 @@ public void testIngestNullColumnAfterDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -461,7 +464,8 @@ public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNot null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); @@ -496,7 +500,8 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception null, null, null, - null + null, + Duration.standardHours(2) ) ); @@ -537,7 +542,8 @@ public void testRunBeforeDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -588,7 +594,8 @@ public void testRunAfterDataInsertedLiveReport() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -665,7 +672,8 @@ public void testIncrementalHandOff() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -768,7 +776,8 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -894,7 +903,8 @@ public void testTimeBasedIncrementalHandOff() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -972,7 +982,8 @@ public void testCheckpointResetWithSameEndOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -1035,7 +1046,8 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask staleReplica = createTask( @@ -1051,7 +1063,8 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1117,7 +1130,8 @@ public void testRunWithMinimumMessageTime() throws Exception DateTimes.of("2010"), null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1165,7 +1179,8 @@ public void testRunWithMaximumMessageTime() throws Exception null, DateTimes.of("2010"), INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1222,7 +1237,8 @@ public void testRunWithTransformSpec() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1293,7 +1309,8 @@ public void testKafkaRecordEntityInputFormat() throws Exception null, null, new TestKafkaInputFormat(INPUT_FORMAT), - null + null, + Duration.standardHours(2) ) ); Assert.assertTrue(task.supportsQueries()); @@ -1365,7 +1382,8 @@ public void testKafkaInputFormat() throws Exception null, null, KAFKA_INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); Assert.assertTrue(task.supportsQueries()); @@ -1416,7 +1434,8 @@ public void testRunOnNothing() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1451,7 +1470,8 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1497,7 +1517,8 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1548,7 +1569,8 @@ public void testReportParseExceptions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1586,7 +1608,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1678,7 +1701,8 @@ public void testMultipleParseExceptionsFailure() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1744,7 +1768,8 @@ public void testRunReplicas() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask task2 = createTask( @@ -1760,7 +1785,8 @@ public void testRunReplicas() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1808,7 +1834,8 @@ public void testRunConflicting() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask task2 = createTask( @@ -1824,7 +1851,8 @@ public void testRunConflicting() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1874,7 +1902,8 @@ public void testRunConflictingWithoutTransactions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask task2 = createTask( @@ -1890,7 +1919,8 @@ public void testRunConflictingWithoutTransactions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1938,7 +1968,8 @@ public void testRunOneTaskTwoPartitions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1984,7 +2015,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask task2 = createTask( @@ -2000,7 +2032,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2050,7 +2083,8 @@ public void testRestore() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2084,7 +2118,8 @@ public void testRestore() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2134,7 +2169,8 @@ public void testRestoreAfterPersistingSequences() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2177,7 +2213,8 @@ public void testRestoreAfterPersistingSequences() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2228,7 +2265,8 @@ public void testRunWithPauseAndResume() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2299,7 +2337,8 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2336,7 +2375,8 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2383,7 +2423,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ), context ); @@ -2427,7 +2468,8 @@ public void testRunWithDuplicateRequest() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2466,7 +2508,8 @@ public void testRunTransactionModeRollback() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2547,7 +2590,8 @@ public void testRunUnTransactionMode() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2607,7 +2651,8 @@ public void testCanStartFromLaterThanEarliestOffset() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -2630,7 +2675,8 @@ public void testRunWithoutDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2676,7 +2722,8 @@ public void testSerde() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2708,7 +2755,8 @@ public void testCorrectInputSources() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2970,7 +3018,8 @@ public void testMultipleLinesJSONText() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -3031,7 +3080,8 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2) ) ); @@ -3103,7 +3153,8 @@ public void testNoParseExceptionsTaskSucceeds() throws Exception null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2) ) ); @@ -3177,7 +3228,8 @@ public void testParseExceptionsBeyondThresholdTaskFails() throws Exception null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2) ) ); @@ -3229,7 +3281,8 @@ public void testCompletionReportPartitionStats() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -3283,7 +3336,8 @@ public void testCompletionReportMultiplePartitionStats() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 968378894964..e04b1f030f8b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -107,6 +107,7 @@ import org.easymock.EasyMockSupport; import org.easymock.IAnswer; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -496,7 +497,8 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ), new KafkaIndexTaskTuningConfig( null, @@ -5641,7 +5643,8 @@ private KafkaIndexTask createKafkaIndexTask( minimumMessageTime, maximumMessageTime, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ), Collections.emptyMap(), OBJECT_MAPPER diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 881d68ba8968..065bf9636ce8 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.joda.time.DateTime; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Set; @@ -78,7 +79,8 @@ public KinesisIndexTaskIOConfig( @JsonProperty("endpoint") String endpoint, @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, - @JsonProperty("awsExternalId") String awsExternalId + @JsonProperty("awsExternalId") String awsExternalId, + @JsonProperty("taskDuration") Duration taskDuration ) { super( @@ -89,7 +91,8 @@ public KinesisIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat + inputFormat, + taskDuration ); Preconditions.checkArgument( getEndSequenceNumbers().getPartitionSequenceNumberMap() @@ -117,7 +120,8 @@ public KinesisIndexTaskIOConfig( String endpoint, Integer fetchDelayMillis, String awsAssumedRoleArn, - String awsExternalId + String awsExternalId, + Duration taskDuration ) { this( @@ -135,7 +139,8 @@ public KinesisIndexTaskIOConfig( endpoint, fetchDelayMillis, awsAssumedRoleArn, - awsExternalId + awsExternalId, + taskDuration ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 2f00c8c16cc9..16ff751a4891 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -145,7 +145,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ioConfig.getEndpoint(), ioConfig.getFetchDelayMillis(), ioConfig.getAwsAssumedRoleArn(), - ioConfig.getAwsExternalId() + ioConfig.getAwsExternalId(), + ioConfig.getTaskDuration() ); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java index 3162b2ea0eee..a917d9a43a9c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.indexing.IOConfig; import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -267,7 +268,8 @@ public void testDeserializeToOldIoConfig() throws IOException "endpoint", 2000, "awsAssumedRoleArn", - "awsExternalId" + "awsExternalId", + Duration.standardHours(2) ); final byte[] json = mapper.writeValueAsBytes(currentConfig); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index e84581af6013..5648255deddf 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -40,6 +40,7 @@ import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -94,7 +95,8 @@ public class KinesisIndexTaskSerdeTest "endpoint", null, null, - null + null, + Duration.standardHours(2) ); private static final String ACCESS_KEY = "test-access-key"; private static final String SECRET_KEY = "test-secret-key"; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 2ef391484008..757f6735a700 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -89,6 +89,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -785,7 +786,8 @@ public void testRunWithMinimumMessageTime() throws Exception "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ) ); @@ -847,7 +849,8 @@ public void testRunWithMaximumMessageTime() throws Exception "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ) ); @@ -1946,7 +1949,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ), context ); @@ -2108,7 +2112,8 @@ public void testSequencesFromContext() throws IOException "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ), context ); @@ -2309,7 +2314,8 @@ private KinesisIndexTask createTask( "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ), null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 122a8e1c5ae8..aec9cae152fd 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -94,6 +94,7 @@ import org.easymock.EasyMockSupport; import org.easymock.IAnswer; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -5563,7 +5564,8 @@ private KinesisIndexTask createKinesisIndexTask( "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ), Collections.emptyMap(), false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 41cd084cd960..d43b83d78d07 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever; @@ -244,32 +243,6 @@ public StreamAppenderatorDriver newDriver( ); } - public boolean withinMinMaxRecordTime(final InputRow row) - { - final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent() - && ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp()); - - final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent() - && ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp()); - - if (log.isDebugEnabled()) { - if (beforeMinimumMessageTime) { - log.debug( - "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", - row.getTimestamp(), - ioConfig.getMinimumMessageTime().get() - ); - } else if (afterMaximumMessageTime) { - log.debug( - "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", - row.getTimestamp(), - ioConfig.getMaximumMessageTime().get() - ); - } - } - return !beforeMinimumMessageTime && !afterMaximumMessageTime; - } - @Override public String getTaskAllocatorId() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index 6526bb81b1e3..b916affe65f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.segment.indexing.IOConfig; import org.joda.time.DateTime; +import org.joda.time.Duration; import javax.annotation.Nullable; @@ -42,6 +43,7 @@ public abstract class SeekableStreamIndexTaskIOConfig minimumMessageTime; private final Optional maximumMessageTime; private final InputFormat inputFormat; + private final Duration taskDuration; public SeekableStreamIndexTaskIOConfig( @Nullable final Integer taskGroupId, // can be null for backward compabitility @@ -51,7 +53,8 @@ public SeekableStreamIndexTaskIOConfig( final Boolean useTransaction, final DateTime minimumMessageTime, final DateTime maximumMessageTime, - @Nullable final InputFormat inputFormat + @Nullable final InputFormat inputFormat, + final Duration taskDuration ) { this.taskGroupId = taskGroupId; @@ -62,6 +65,7 @@ public SeekableStreamIndexTaskIOConfig( this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.inputFormat = inputFormat; + this.taskDuration = taskDuration; Preconditions.checkArgument( startSequenceNumbers.getStream().equals(endSequenceNumbers.getStream()), @@ -134,4 +138,9 @@ public InputFormat getInputFormat() { return inputFormat; } + + public Duration getTaskDuration() + { + return taskDuration; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index ab437eb7a60a..07902f8e35d6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -76,6 +77,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -246,6 +248,9 @@ public enum Status private final Map partitionsThroughput = new HashMap<>(); + private volatile Optional minMessageTime; + private volatile Optional maxMessageTime; + public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @Nullable final InputRowParser parser, @@ -267,6 +272,16 @@ public SeekableStreamIndexTaskRunner( this.ingestionState = IngestionState.NOT_STARTED; this.lockGranularityToUse = lockGranularityToUse; + minMessageTime = ioConfig.getMinimumMessageTime(); + maxMessageTime = ioConfig.getMaximumMessageTime(); + + Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d") + .scheduleWithFixedDelay( + this::addTaskDurationToMinMaxTimes, + ioConfig.getTaskDuration().getStandardMinutes(), + ioConfig.getTaskDuration().getStandardMinutes(), + TimeUnit.MINUTES + ); resetNextCheckpointTime(); } @@ -388,7 +403,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception inputRowSchema, task.getDataSchema().getTransformSpec(), toolbox.getIndexingTmpDir(), - row -> row != null && task.withinMinMaxRecordTime(row), + row -> row != null && withinMinMaxRecordTime(row), rowIngestionMeters, parseExceptionHandler ); @@ -2092,4 +2107,47 @@ protected abstract void possiblyResetDataSourceMetadata( protected abstract boolean isEndOffsetExclusive(); protected abstract TypeReference>> getSequenceMetadataTypeReference(); + + + private void addTaskDurationToMinMaxTimes() + { + if (minMessageTime.isPresent()) { + minMessageTime = Optional.of(minMessageTime.get() + .plusMinutes(ioConfig.getTaskDuration() + .toStandardMinutes() + .getMinutes())); + } + + if (maxMessageTime.isPresent()) { + maxMessageTime = Optional.of(maxMessageTime.get() + .plusMinutes(ioConfig.getTaskDuration() + .toStandardMinutes() + .getMinutes())); + } + } + + public boolean withinMinMaxRecordTime(final InputRow row) + { + final boolean beforeMinimumMessageTime = minMessageTime.isPresent() && + minMessageTime.get().isAfter(row.getTimestamp()); + final boolean afterMaximumMessageTime = maxMessageTime.isPresent() && + maxMessageTime.get().isBefore(row.getTimestamp()); + + if (log.isDebugEnabled()) { + if (beforeMinimumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", + row.getTimestamp(), + ioConfig.getMinimumMessageTime().get() + ); + } else if (afterMaximumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", + row.getTimestamp(), + ioConfig.getMaximumMessageTime().get() + ); + } + } + return !beforeMinimumMessageTime && !afterMaximumMessageTime; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java index 8b1bd4fb0963..c0ed6984f19c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java @@ -44,6 +44,7 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.ResourceType; import org.easymock.EasyMock; +import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -380,7 +381,8 @@ public TestSeekableStreamIndexTaskIOConfig() false, DateTimes.nowUtc().minusDays(2), DateTimes.nowUtc(), - new CsvInputFormat(null, null, true, null, 0, null) + new CsvInputFormat(null, null, true, null, 0, null), + Duration.standardHours(2) ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java new file mode 100644 index 000000000000..847b5f29cc15 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.security.AuthorizerMapper; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +@RunWith(MockitoJUnitRunner.class) +public class SeekableStreamIndexTaskRunnerTest +{ + @Mock + private InputRow row; + + @Mock + private SeekableStreamIndexTask task; + + @Test + public void testWithinMinMaxTime() + { + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2") + ) + ); + DataSchema schema = + DataSchema.builder() + .withDataSource("datasource") + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) + ) + .build(); + + SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class); + SeekableStreamStartSequenceNumbers sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); + SeekableStreamEndSequenceNumbers endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); + + DateTime now = DateTimes.nowUtc(); + + Mockito.when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(2)); + Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().plusHours(2))); + Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().minusHours(2))); + Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); + Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers); + Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers); + + Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(sequenceNumbers.getStream()).thenReturn("test"); + + Mockito.when(task.getDataSchema()).thenReturn(schema); + Mockito.when(task.getIOConfig()).thenReturn(ioConfig); + Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); + + TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK); + + Mockito.when(row.getTimestamp()).thenReturn(now); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1)); + Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1)); + Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + } + + @Test + public void testWithinMinMaxTimeNotPopulated() + { + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2") + ) + ); + DataSchema schema = + DataSchema.builder() + .withDataSource("datasource") + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) + ) + .build(); + + SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class); + SeekableStreamStartSequenceNumbers sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); + SeekableStreamEndSequenceNumbers endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); + + DateTime now = DateTimes.nowUtc(); + + Mockito.when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(2)); + Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.absent()); + Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.absent()); + Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); + Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers); + Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers); + + Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(sequenceNumbers.getStream()).thenReturn("test"); + + Mockito.when(task.getDataSchema()).thenReturn(schema); + Mockito.when(task.getIOConfig()).thenReturn(ioConfig); + Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); + TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK); + + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + } + + static class TestasbleSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner + { + public TestasbleSeekableStreamIndexTaskRunner( + SeekableStreamIndexTask task, + @Nullable InputRowParser parser, + AuthorizerMapper authorizerMapper, + LockGranularity lockGranularityToUse + ) + { + super(task, parser, authorizerMapper, lockGranularityToUse); + } + + @Override + protected boolean isEndOfShard(Object seqNum) + { + return false; + } + + @Nullable + @Override + protected TreeMap getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString) + throws IOException + { + return null; + } + + @Override + protected Object getNextStartOffset(Object sequenceNumber) + { + return null; + } + + @Override + protected SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata(ObjectMapper mapper, Object object) + { + return null; + } + + @Override + protected List getRecords(RecordSupplier recordSupplier, TaskToolbox toolbox) + throws Exception + { + return null; + } + + @Override + protected SeekableStreamDataSourceMetadata createDataSourceMetadata(SeekableStreamSequenceNumbers partitions) + { + return null; + } + + @Override + protected OrderedSequenceNumber createSequenceNumber(Object sequenceNumber) + { + return null; + } + + @Override + protected boolean isEndOffsetExclusive() + { + return false; + } + + @Override + protected TypeReference> getSequenceMetadataTypeReference() + { + return null; + } + + @Override + protected void possiblyResetDataSourceMetadata(TaskToolbox toolbox, RecordSupplier recordSupplier, Set assignment) + { + + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 3281360f5806..4ce4a95a7650 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -196,7 +196,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration() ) { }; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b97..6edf77f53c71 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2807,7 +2807,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration() ) { }; @@ -3166,7 +3167,8 @@ private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt( true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration() ) { };