Skip to content

Commit

Permalink
kafka-indexing: Report consumer io time
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyachakilam committed Nov 3, 2024
1 parent 5fcf420 commit 9988d8f
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,20 @@ public class KafkaConsumerMonitor extends AbstractMonitor
ImmutableMap.<String, String>builder()
.put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
.put("records-consumed-total", "kafka/consumer/recordsConsumed")
.put("io-wait-time-ns-total", "kafka/consumer/io/time")
.build();
private static final String TOPIC_TAG = "topic";
private static final String DATASOURCE_TAG = "datasource";
private static final Set<String> TOPIC_METRIC_TAGS = ImmutableSet.of("client-id", TOPIC_TAG);

private final KafkaConsumer<?, ?> consumer;
private final String datasource;
private final Map<String, AtomicLong> counters = new HashMap<>();

public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer, String datasource)
{
this.consumer = consumer;
this.datasource = datasource;
}

@Override
Expand All @@ -71,6 +75,9 @@ public boolean doMonitor(final ServiceEmitter emitter)
if (newValue != priorValue) {
final ServiceMetricEvent.Builder builder =
new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic);
if (datasource != null) {
builder.setDimension(DATASOURCE_TAG, datasource);
}
emitter.emit(builder.setMetric(METRICS.get(metricName.name()), newValue - priorValue));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)

final KafkaRecordSupplier recordSupplier =
new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(),
kafkaIndexTaskIOConfig.isMultiTopic());
kafkaIndexTaskIOConfig.isMultiTopic(), dataSchema.getDataSource()
);

if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.common.serialization.Deserializer;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -78,21 +79,23 @@ public KafkaRecordSupplier(
Map<String, Object> consumerProperties,
ObjectMapper sortingMapper,
KafkaConfigOverrides configOverrides,
boolean multiTopic
boolean multiTopic,
@Nullable String datasource
)
{
this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic);
this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic, datasource);
}

@VisibleForTesting
public KafkaRecordSupplier(
KafkaConsumer<byte[], byte[]> consumer,
boolean multiTopic
boolean multiTopic,
@Nullable String datasource
)
{
this.consumer = consumer;
this.multiTopic = multiTopic;
this.monitor = new KafkaConsumerMonitor(consumer);
this.monitor = new KafkaConsumerMonitor(consumer, datasource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected KafkaRecordSupplier createRecordSupplier()
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = (KafkaSupervisorIOConfig) ioConfig;

return new KafkaRecordSupplier(props, objectMapper, kafkaSupervisorIOConfig.getConfigOverrides(),
kafkaSupervisorIOConfig.isMultiTopic()
kafkaSupervisorIOConfig.isMultiTopic(), null
);
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> setupReco
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getConfigOverrides(),
spec.getIoConfig().isMultiTopic()
spec.getIoConfig().isMultiTopic(),
spec.getId()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class KafkaRecordSupplierTest
private static KafkaTopicPartition PARTITION_1 = new KafkaTopicPartition(false, null, 1);

private static String TOPIC = "topic";
private static String STREAM = "stream";
private static int TOPIC_POS_FIX = 0;
private static TestingCluster ZK_SERVER;
private static TestBroker KAFKA_SERVER;
Expand Down Expand Up @@ -239,7 +240,7 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException
);

KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);

Assert.assertTrue(recordSupplier.getAssignment().isEmpty());

Expand All @@ -264,7 +265,7 @@ public void testMultiTopicSupplierSetup() throws ExecutionException, Interrupted
insertData();

KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true, STREAM);

String stream = Pattern.quote(TOPIC) + "|" + Pattern.quote(otherTopic);
Set<KafkaTopicPartition> partitions = recordSupplier.getPartitionIds(stream);
Expand Down Expand Up @@ -300,7 +301,8 @@ public void testSupplierSetupCustomDeserializer() throws ExecutionException, Int
properties,
OBJECT_MAPPER,
null,
false
false,
STREAM
);

Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
Expand Down Expand Up @@ -328,7 +330,8 @@ public void testSupplierSetupCustomDeserializerRequiresParameter()
properties,
OBJECT_MAPPER,
null,
false
false,
STREAM
);

Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
Expand All @@ -347,7 +350,8 @@ public void testSupplierSetupCustomDeserializerRequiresParameterButMissingIt()
properties,
OBJECT_MAPPER,
null,
false
false,
STREAM
);

Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
Expand All @@ -374,7 +378,8 @@ public void testPollCustomDeserializer() throws InterruptedException, ExecutionE
properties,
OBJECT_MAPPER,
null,
false
false,
STREAM
);

recordSupplier.assign(partitions);
Expand Down Expand Up @@ -412,7 +417,8 @@ public void testPoll() throws InterruptedException, ExecutionException
KAFKA_SERVER.consumerProperties(),
OBJECT_MAPPER,
null,
false
false,
STREAM
);

final Monitor monitor = recordSupplier.monitor();
Expand Down Expand Up @@ -463,7 +469,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE


KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);

Expand Down Expand Up @@ -533,7 +539,7 @@ public void testSeek() throws InterruptedException, ExecutionException
);

KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);

recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
Expand Down Expand Up @@ -576,7 +582,7 @@ public void testSeekToLatest() throws InterruptedException, ExecutionException
);

KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);

recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
Expand Down Expand Up @@ -609,7 +615,7 @@ public void testSeekUnassigned() throws InterruptedException, ExecutionException
);

KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);

recordSupplier.assign(partitions);

Expand All @@ -635,7 +641,7 @@ public void testPosition() throws ExecutionException, InterruptedException
);

KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);

recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
Expand Down Expand Up @@ -670,7 +676,7 @@ public void testPosition() throws ExecutionException, InterruptedException
public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
Expand All @@ -682,7 +688,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShoul
public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
Expand All @@ -694,7 +700,7 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetSho
public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
Expand All @@ -706,7 +712,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldR
public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM);
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5777,7 +5777,7 @@ protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> setupReco
Deserializer valueDeserializerObject = new ByteArrayDeserializer();
return new KafkaRecordSupplier(
new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject),
getIoConfig().isMultiTopic()
getIoConfig().isMultiTopic(), null
);
}

Expand Down

0 comments on commit 9988d8f

Please sign in to comment.