Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supervisor: make rejection periods work with stopTasksCount #17442

Merged
merged 13 commits into from
Nov 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public RabbitStreamIndexTaskIOConfig(
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("uri") String uri)
@JsonProperty("uri") String uri,
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
)
{
super(
taskGroupId,
Expand All @@ -63,7 +65,9 @@ public RabbitStreamIndexTaskIOConfig(
useTransaction,
minimumMessageTime,
maximumMessageTime,
inputFormat);
inputFormat,
refreshRejectionPeriodsInMinutes
);

this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.uri = uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -202,7 +201,9 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat(),
rabbitConfig.getUri());
rabbitConfig.getUri(),
ioConfig.getTaskDuration().getStandardMinutes()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.JsonInputFormat;
Expand All @@ -37,6 +38,7 @@
import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory;
import org.apache.druid.indexing.rabbitstream.RabbitStreamRecordSupplier;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
Expand Down Expand Up @@ -366,4 +368,49 @@ public void testReportPayload()
Assert.assertEquals(30 * 60, payload.getDurationSeconds());
}

@Test
public void testCreateTaskIOConfig()
{
supervisor = getSupervisor(
1,
1,
false,
"PT30M",
null,
null,
RabbitStreamSupervisorTest.dataSchema,
tuningConfig
);

SeekableStreamIndexTaskIOConfig ioConfig = supervisor.createTaskIoConfig(
1,
ImmutableMap.of(),
ImmutableMap.of(),
"test",
null,
null,
ImmutableSet.of(),
new RabbitStreamSupervisorIOConfig(
STREAM, // stream
URI, // uri
INPUT_FORMAT, // inputFormat
1, // replicas
1, // taskCount
new Period("PT30M"), // taskDuration
null, // consumerProperties
null, // autoscalerConfig
400L, // poll timeout
new Period("P1D"), // start delat
new Period("PT30M"), // period
new Period("PT30S"), // completiontimeout
false, // useearliest
null, // latemessagerejection
null, // early message rejection
null, // latemessagerejectionstartdatetime
1
)
);

Assert.assertEquals(30L, ioConfig.getRefreshRejectionPeriodsInMinutes().longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public KafkaIndexTaskIOConfig(
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides,
@JsonProperty("multiTopic") @Nullable Boolean multiTopic
@JsonProperty("multiTopic") @Nullable Boolean multiTopic,
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
)
{
super(
Expand All @@ -76,7 +77,8 @@ public KafkaIndexTaskIOConfig(
useTransaction,
minimumMessageTime,
maximumMessageTime,
inputFormat
inputFormat,
refreshRejectionPeriodsInMinutes
);

this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
Expand Down Expand Up @@ -107,7 +109,8 @@ public KafkaIndexTaskIOConfig(
DateTime minimumMessageTime,
DateTime maximumMessageTime,
InputFormat inputFormat,
KafkaConfigOverrides configOverrides
KafkaConfigOverrides configOverrides,
Long refreshRejectionPeriodsInMinutes
)
{
this(
Expand All @@ -124,7 +127,8 @@ public KafkaIndexTaskIOConfig(
maximumMessageTime,
inputFormat,
configOverrides,
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC,
refreshRejectionPeriodsInMinutes
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
maximumMessageTime,
ioConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides(),
kafkaIoConfig.isMultiTopic()
kafkaIoConfig.isMultiTopic(),
ioConfig.getTaskDuration().getStandardMinutes()
);
}

Expand Down
Loading
Loading