Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supervisor: make rejection periods work with stopTasksCount #17442

Merged
merged 13 commits into from
Nov 18, 2024
2 changes: 1 addition & 1 deletion docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka I/O confi
|`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|`PT30M`|
|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No||
|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No||
|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.|No||
|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No||

#### Task autoscaler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public RabbitStreamIndexTaskIOConfig(
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("uri") String uri)
@JsonProperty("uri") String uri,
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
)
{
super(
taskGroupId,
Expand All @@ -63,7 +65,9 @@ public RabbitStreamIndexTaskIOConfig(
useTransaction,
minimumMessageTime,
maximumMessageTime,
inputFormat);
inputFormat,
refreshRejectionPeriodsInMinutes
);

this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.uri = uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -202,7 +201,9 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat(),
rabbitConfig.getUri());
rabbitConfig.getUri(),
ioConfig.getTaskDuration().getStandardMinutes()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.JsonInputFormat;
Expand All @@ -37,6 +38,7 @@
import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory;
import org.apache.druid.indexing.rabbitstream.RabbitStreamRecordSupplier;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
Expand Down Expand Up @@ -366,4 +368,49 @@ public void testReportPayload()
Assert.assertEquals(30 * 60, payload.getDurationSeconds());
}

@Test
public void testCreateTaskIOConfig()
{
supervisor = getSupervisor(
1,
1,
false,
"PT30M",
null,
null,
RabbitStreamSupervisorTest.dataSchema,
tuningConfig
);

SeekableStreamIndexTaskIOConfig ioConfig = supervisor.createTaskIoConfig(
1,
ImmutableMap.of(),
ImmutableMap.of(),
"test",
null,
null,
ImmutableSet.of(),
new RabbitStreamSupervisorIOConfig(
STREAM, // stream
URI, // uri
INPUT_FORMAT, // inputFormat
1, // replicas
1, // taskCount
new Period("PT30M"), // taskDuration
null, // consumerProperties
null, // autoscalerConfig
400L, // poll timeout
new Period("P1D"), // start delat
new Period("PT30M"), // period
new Period("PT30S"), // completiontimeout
false, // useearliest
null, // latemessagerejection
null, // early message rejection
null, // latemessagerejectionstartdatetime
1
)
);

Assert.assertEquals(30L, ioConfig.getRefreshRejectionPeriodsInMinutes().longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public KafkaIndexTaskIOConfig(
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides,
@JsonProperty("multiTopic") @Nullable Boolean multiTopic
@JsonProperty("multiTopic") @Nullable Boolean multiTopic,
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
)
{
super(
Expand All @@ -76,7 +77,8 @@ public KafkaIndexTaskIOConfig(
useTransaction,
minimumMessageTime,
maximumMessageTime,
inputFormat
inputFormat,
refreshRejectionPeriodsInMinutes
);

this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
Expand Down Expand Up @@ -107,7 +109,8 @@ public KafkaIndexTaskIOConfig(
DateTime minimumMessageTime,
DateTime maximumMessageTime,
InputFormat inputFormat,
KafkaConfigOverrides configOverrides
KafkaConfigOverrides configOverrides,
Long refreshRejectionPeriodsInMinutes
)
{
this(
Expand All @@ -124,7 +127,8 @@ public KafkaIndexTaskIOConfig(
maximumMessageTime,
inputFormat,
configOverrides,
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC,
refreshRejectionPeriodsInMinutes
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
maximumMessageTime,
ioConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides(),
kafkaIoConfig.isMultiTopic()
kafkaIoConfig.isMultiTopic(),
ioConfig.getTaskDuration().getStandardMinutes()
);
}

Expand Down
Loading
Loading