diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java index 6a8c6c149051..4ed2493da550 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java @@ -43,16 +43,20 @@ public class KafkaConsumerMonitor extends AbstractMonitor ImmutableMap.builder() .put("bytes-consumed-total", "kafka/consumer/bytesConsumed") .put("records-consumed-total", "kafka/consumer/recordsConsumed") + .put("io-wait-time-ns-total", "kafka/consumer/io/time") .build(); private static final String TOPIC_TAG = "topic"; + private static final String DATASOURCE_TAG = "datasource"; private static final Set TOPIC_METRIC_TAGS = ImmutableSet.of("client-id", TOPIC_TAG); private final KafkaConsumer consumer; + private final String datasource; private final Map counters = new HashMap<>(); - public KafkaConsumerMonitor(final KafkaConsumer consumer) + public KafkaConsumerMonitor(final KafkaConsumer consumer, String datasource) { this.consumer = consumer; + this.datasource = datasource; } @Override @@ -71,6 +75,9 @@ public boolean doMonitor(final ServiceEmitter emitter) if (newValue != priorValue) { final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic); + if (datasource != null) { + builder.setDimension(DATASOURCE_TAG, datasource); + } emitter.emit(builder.setMetric(METRICS.get(metricName.name()), newValue - priorValue)); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index e5ff77467cdb..2834f2d31ce5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -111,7 +111,8 @@ protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) final KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(), - kafkaIndexTaskIOConfig.isMultiTopic()); + kafkaIndexTaskIOConfig.isMultiTopic(), dataSchema.getDataSource() + ); if (toolbox.getMonitorScheduler() != null) { toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor()); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index a88300552e2a..5b8ea4f410e0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.serialization.Deserializer; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Type; @@ -78,21 +79,23 @@ public KafkaRecordSupplier( Map consumerProperties, ObjectMapper sortingMapper, KafkaConfigOverrides configOverrides, - boolean multiTopic + boolean multiTopic, + @Nullable String datasource ) { - this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic); + this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic, datasource); } @VisibleForTesting public KafkaRecordSupplier( KafkaConsumer consumer, - boolean multiTopic + boolean multiTopic, + @Nullable String datasource ) { this.consumer = consumer; this.multiTopic = multiTopic; - this.monitor = new KafkaConsumerMonitor(consumer); + this.monitor = new KafkaConsumerMonitor(consumer, datasource); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java index e0683fb605bb..e941bff3885f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java @@ -74,7 +74,7 @@ protected KafkaRecordSupplier createRecordSupplier() KafkaSupervisorIOConfig kafkaSupervisorIOConfig = (KafkaSupervisorIOConfig) ioConfig; return new KafkaRecordSupplier(props, objectMapper, kafkaSupervisorIOConfig.getConfigOverrides(), - kafkaSupervisorIOConfig.isMultiTopic() + kafkaSupervisorIOConfig.isMultiTopic(), null ); } finally { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index aebacecff662..9e1e09850db0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -137,7 +137,8 @@ protected RecordSupplier setupReco spec.getIoConfig().getConsumerProperties(), sortingMapper, spec.getIoConfig().getConfigOverrides(), - spec.getIoConfig().isMultiTopic() + spec.getIoConfig().isMultiTopic(), + spec.getId() ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index bfd81464ba2f..a12357f9dfc5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -73,6 +73,7 @@ public class KafkaRecordSupplierTest private static KafkaTopicPartition PARTITION_1 = new KafkaTopicPartition(false, null, 1); private static String TOPIC = "topic"; + private static String STREAM = "stream"; private static int TOPIC_POS_FIX = 0; private static TestingCluster ZK_SERVER; private static TestBroker KAFKA_SERVER; @@ -239,7 +240,7 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -264,7 +265,7 @@ public void testMultiTopicSupplierSetup() throws ExecutionException, Interrupted insertData(); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true, STREAM); String stream = Pattern.quote(TOPIC) + "|" + Pattern.quote(otherTopic); Set partitions = recordSupplier.getPartitionIds(stream); @@ -300,7 +301,8 @@ public void testSupplierSetupCustomDeserializer() throws ExecutionException, Int properties, OBJECT_MAPPER, null, - false + false, + STREAM ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -328,7 +330,8 @@ public void testSupplierSetupCustomDeserializerRequiresParameter() properties, OBJECT_MAPPER, null, - false + false, + STREAM ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated @@ -347,7 +350,8 @@ public void testSupplierSetupCustomDeserializerRequiresParameterButMissingIt() properties, OBJECT_MAPPER, null, - false + false, + STREAM ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated @@ -374,7 +378,8 @@ public void testPollCustomDeserializer() throws InterruptedException, ExecutionE properties, OBJECT_MAPPER, null, - false + false, + STREAM ); recordSupplier.assign(partitions); @@ -412,7 +417,8 @@ public void testPoll() throws InterruptedException, ExecutionException KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, - false + false, + STREAM ); final Monitor monitor = recordSupplier.monitor(); @@ -463,7 +469,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -533,7 +539,7 @@ public void testSeek() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -576,7 +582,7 @@ public void testSeekToLatest() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -609,7 +615,7 @@ public void testSeekUnassigned() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); @@ -635,7 +641,7 @@ public void testPosition() throws ExecutionException, InterruptedException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -670,7 +676,7 @@ public void testPosition() throws ExecutionException, InterruptedException public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -682,7 +688,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShoul public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -694,7 +700,7 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetSho public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -706,7 +712,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldR public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 7926a0568fd9..968378894964 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -5777,7 +5777,7 @@ protected RecordSupplier setupReco Deserializer valueDeserializerObject = new ByteArrayDeserializer(); return new KafkaRecordSupplier( new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject), - getIoConfig().isMultiTopic() + getIoConfig().isMultiTopic(), null ); }