Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supervisor: make rejection periods work with stopTasksCount #17442

Merged
merged 13 commits into from
Nov 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.Map;
Expand All @@ -53,7 +54,9 @@ public RabbitStreamIndexTaskIOConfig(
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("uri") String uri)
@JsonProperty("uri") String uri,
@JsonProperty("taskDuration") Duration taskDuration
)
{
super(
taskGroupId,
Expand All @@ -63,7 +66,9 @@ public RabbitStreamIndexTaskIOConfig(
useTransaction,
minimumMessageTime,
maximumMessageTime,
inputFormat);
inputFormat,
taskDuration
);

this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.uri = uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -202,7 +201,9 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat(),
rabbitConfig.getUri());
rabbitConfig.getUri(),
ioConfig.getTaskDuration()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,20 @@ public class KafkaConsumerMonitor extends AbstractMonitor
ImmutableMap.<String, String>builder()
.put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
.put("records-consumed-total", "kafka/consumer/recordsConsumed")
.put("io-wait-time-ns-total", "kafka/consumer/io/time")
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
.build();
private static final String TOPIC_TAG = "topic";
private static final String DATASOURCE_TAG = "datasource";
private static final Set<String> TOPIC_METRIC_TAGS = ImmutableSet.of("client-id", TOPIC_TAG);

private final KafkaConsumer<?, ?> consumer;
private final String datasource;
private final Map<String, AtomicLong> counters = new HashMap<>();

public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer, String datasource)
{
this.consumer = consumer;
this.datasource = datasource;
}

@Override
Expand All @@ -71,6 +75,9 @@ public boolean doMonitor(final ServiceEmitter emitter)
if (newValue != priorValue) {
final ServiceMetricEvent.Builder builder =
new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic);
if (datasource != null) {
builder.setDimension(DATASOURCE_TAG, datasource);
}
emitter.emit(builder.setMetric(METRICS.get(metricName.name()), newValue - priorValue));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)

final KafkaRecordSupplier recordSupplier =
new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(),
kafkaIndexTaskIOConfig.isMultiTopic());
kafkaIndexTaskIOConfig.isMultiTopic(), dataSchema.getDataSource()
);

if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.Map;
Expand Down Expand Up @@ -63,7 +64,8 @@ public KafkaIndexTaskIOConfig(
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides,
@JsonProperty("multiTopic") @Nullable Boolean multiTopic
@JsonProperty("multiTopic") @Nullable Boolean multiTopic,
@JsonProperty("taskDuration") Duration taskDuration
)
{
super(
Expand All @@ -76,7 +78,8 @@ public KafkaIndexTaskIOConfig(
useTransaction,
minimumMessageTime,
maximumMessageTime,
inputFormat
inputFormat,
taskDuration
);

this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
Expand Down Expand Up @@ -107,7 +110,8 @@ public KafkaIndexTaskIOConfig(
DateTime minimumMessageTime,
DateTime maximumMessageTime,
InputFormat inputFormat,
KafkaConfigOverrides configOverrides
KafkaConfigOverrides configOverrides,
Duration taskDuration
)
{
this(
Expand All @@ -124,7 +128,8 @@ public KafkaIndexTaskIOConfig(
maximumMessageTime,
inputFormat,
configOverrides,
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC,
taskDuration
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.common.serialization.Deserializer;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -78,21 +79,23 @@ public KafkaRecordSupplier(
Map<String, Object> consumerProperties,
ObjectMapper sortingMapper,
KafkaConfigOverrides configOverrides,
boolean multiTopic
boolean multiTopic,
@Nullable String datasource
)
{
this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic);
this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic, datasource);
}

@VisibleForTesting
public KafkaRecordSupplier(
KafkaConsumer<byte[], byte[]> consumer,
boolean multiTopic
boolean multiTopic,
@Nullable String datasource
)
{
this.consumer = consumer;
this.multiTopic = multiTopic;
this.monitor = new KafkaConsumerMonitor(consumer);
this.monitor = new KafkaConsumerMonitor(consumer, datasource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected KafkaRecordSupplier createRecordSupplier()
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = (KafkaSupervisorIOConfig) ioConfig;

return new KafkaRecordSupplier(props, objectMapper, kafkaSupervisorIOConfig.getConfigOverrides(),
kafkaSupervisorIOConfig.isMultiTopic()
kafkaSupervisorIOConfig.isMultiTopic(), null
);
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getConfigOverrides(),
spec.getIoConfig().isMultiTopic()
spec.getIoConfig().isMultiTopic(),
Fixed Show fixed Hide fixed
spec.getId()
);
}

Expand Down Expand Up @@ -218,7 +219,8 @@
maximumMessageTime,
ioConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides(),
kafkaIoConfig.isMultiTopic()
kafkaIoConfig.isMultiTopic(),
ioConfig.getTaskDuration()
);
}

Expand Down
Loading
Loading