Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supervisor: make rejection periods work with stopTasksCount #17442

Merged
merged 13 commits into from
Nov 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.Map;
Expand All @@ -55,7 +54,7 @@ public RabbitStreamIndexTaskIOConfig(
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("uri") String uri,
@JsonProperty("taskDuration") Duration taskDuration
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
)
{
super(
Expand All @@ -67,7 +66,7 @@ public RabbitStreamIndexTaskIOConfig(
minimumMessageTime,
maximumMessageTime,
inputFormat,
taskDuration
refreshRejectionPeriodsInMinutes
);

this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
maximumMessageTime,
ioConfig.getInputFormat(),
rabbitConfig.getUri(),
ioConfig.getTaskDuration()
ioConfig.getTaskDuration().getStandardMinutes()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory;
import org.apache.druid.indexing.rabbitstream.RabbitStreamRecordSupplier;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
Expand All @@ -51,7 +52,6 @@
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMockSupport;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -382,7 +382,7 @@ public void testCreateTaskIOConfig()
tuningConfig
);

Assert.assertEquals(supervisor.createTaskIoConfig(
SeekableStreamIndexTaskIOConfig ioConfig = supervisor.createTaskIoConfig(
1,
ImmutableMap.of(),
ImmutableMap.of(),
Expand All @@ -409,6 +409,8 @@ public void testCreateTaskIOConfig()
null, // latemessagerejectionstartdatetime
1
)
).getTaskDuration(), Duration.standardMinutes(30));
);

Assert.assertEquals(30L, ioConfig.getRefreshRejectionPeriodsInMinutes().longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,16 @@ public class KafkaConsumerMonitor extends AbstractMonitor
ImmutableMap.<String, String>builder()
.put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
.put("records-consumed-total", "kafka/consumer/recordsConsumed")
.put("io-wait-time-ns-total", "kafka/consumer/io/time")
.build();
private static final String TOPIC_TAG = "topic";
private static final String DATASOURCE_TAG = "datasource";
private static final Set<String> TOPIC_METRIC_TAGS = ImmutableSet.of("client-id", TOPIC_TAG);

private final KafkaConsumer<?, ?> consumer;
private final String datasource;
private final Map<String, AtomicLong> counters = new HashMap<>();

public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer, String datasource)
public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
{
this.consumer = consumer;
this.datasource = datasource;
}

@Override
Expand All @@ -75,9 +71,6 @@ public boolean doMonitor(final ServiceEmitter emitter)
if (newValue != priorValue) {
final ServiceMetricEvent.Builder builder =
new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic);
if (datasource != null) {
builder.setDimension(DATASOURCE_TAG, datasource);
}
emitter.emit(builder.setMetric(METRICS.get(metricName.name()), newValue - priorValue));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)

final KafkaRecordSupplier recordSupplier =
new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(),
kafkaIndexTaskIOConfig.isMultiTopic(), dataSchema.getDataSource()
);
kafkaIndexTaskIOConfig.isMultiTopic());

if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.Map;
Expand Down Expand Up @@ -65,7 +64,7 @@ public KafkaIndexTaskIOConfig(
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides,
@JsonProperty("multiTopic") @Nullable Boolean multiTopic,
@JsonProperty("taskDuration") Duration taskDuration
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
)
{
super(
Expand All @@ -79,7 +78,7 @@ public KafkaIndexTaskIOConfig(
minimumMessageTime,
maximumMessageTime,
inputFormat,
taskDuration
refreshRejectionPeriodsInMinutes
);

this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
Expand Down Expand Up @@ -111,7 +110,7 @@ public KafkaIndexTaskIOConfig(
DateTime maximumMessageTime,
InputFormat inputFormat,
KafkaConfigOverrides configOverrides,
Duration taskDuration
Long refreshRejectionPeriodsInMinutes
)
{
this(
Expand All @@ -129,7 +128,7 @@ public KafkaIndexTaskIOConfig(
inputFormat,
configOverrides,
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC,
taskDuration
refreshRejectionPeriodsInMinutes
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.kafka.common.serialization.Deserializer;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -79,23 +78,21 @@ public KafkaRecordSupplier(
Map<String, Object> consumerProperties,
ObjectMapper sortingMapper,
KafkaConfigOverrides configOverrides,
boolean multiTopic,
@Nullable String datasource
boolean multiTopic
)
{
this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic, datasource);
this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic);
}

@VisibleForTesting
public KafkaRecordSupplier(
KafkaConsumer<byte[], byte[]> consumer,
boolean multiTopic,
@Nullable String datasource
boolean multiTopic
)
{
this.consumer = consumer;
this.multiTopic = multiTopic;
this.monitor = new KafkaConsumerMonitor(consumer, datasource);
this.monitor = new KafkaConsumerMonitor(consumer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected KafkaRecordSupplier createRecordSupplier()
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = (KafkaSupervisorIOConfig) ioConfig;

return new KafkaRecordSupplier(props, objectMapper, kafkaSupervisorIOConfig.getConfigOverrides(),
kafkaSupervisorIOConfig.isMultiTopic(), null
kafkaSupervisorIOConfig.isMultiTopic()
);
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> setupReco
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getConfigOverrides(),
spec.getIoConfig().isMultiTopic(),
spec.getId()
spec.getIoConfig().isMultiTopic()
);
}

Expand Down Expand Up @@ -220,7 +219,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
ioConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides(),
kafkaIoConfig.isMultiTopic(),
ioConfig.getTaskDuration()
ioConfig.getTaskDuration().getStandardMinutes()
);
}

Expand Down
Loading
Loading