Skip to content

Commit

Permalink
KAFKA-16014: Add RemoteLogMetadataCount metric (apache#15026)
Browse files Browse the repository at this point in the history
Reviewers: Christo Lolov <[email protected]>, Kamal Chandraprakash<[email protected]>, Satish Duggana <[email protected]>
  • Loading branch information
showuon authored Dec 20, 2023
1 parent db8af51 commit 4e11de0
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 25 deletions.
24 changes: 16 additions & 8 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,10 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader,
leaderPartitions.forEach(this::cacheTopicPartitionIds);
followerPartitions.forEach(this::cacheTopicPartitionIds);
followerPartitions.forEach(
topicIdPartition -> brokerTopicStats.topicStats(topicIdPartition.topic()).removeRemoteCopyBytesLag(topicIdPartition.partition()));
topicIdPartition -> {
brokerTopicStats.topicStats(topicIdPartition.topic()).removeRemoteCopyBytesLag(topicIdPartition.partition());
brokerTopicStats.topicStats(topicIdPartition.topic()).removeRemoteLogMetadataCount(topicIdPartition.partition());
});

remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
followerPartitions.forEach(topicIdPartition ->
Expand Down Expand Up @@ -377,6 +380,7 @@ public void stopPartitions(Set<StopPartition> stopPartitions,
}

brokerTopicStats.topicStats(tp.topic()).removeRemoteCopyBytesLag(tp.partition());
brokerTopicStats.topicStats(tp.topic()).removeRemoteLogMetadataCount(tp.partition());

if (stopPartition.deleteRemoteLog()) {
LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
Expand Down Expand Up @@ -968,13 +972,6 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
return;
}

// Cleanup remote log segments and update the log start offset if applicable.
final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
if (!segmentMetadataIter.hasNext()) {
logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition);
return;
}

final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
if (!logOptional.isPresent()) {
logger.debug("No UnifiedLog instance available for partition: {}", topicIdPartition);
Expand All @@ -988,13 +985,24 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
return;
}

// Cleanup remote log segments and update the log start offset if applicable.
final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
if (!segmentMetadataIter.hasNext()) {
brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogMetadataCount(topicIdPartition.partition(), 0);
logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition);
return;
}

final Set<Integer> epochsSet = new HashSet<>();
int metadataCount = 0;
// Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition
// instead of going through all the segments and building it here.
while (segmentMetadataIter.hasNext()) {
RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
metadataCount++;
}
brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogMetadataCount(topicIdPartition.partition(), metadataCount);

// All the leader epochs in sorted order that exists in remote storage
final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf

def gauge(): Gauge[Long] = gaugeLock synchronized {
if (gaugeObject == null) {
gaugeObject = metricsGroup.newGauge(metricType, () => brokerTopicAggregatedMetric.value())
gaugeObject = metricsGroup.newGauge(metricType, () => brokerTopicAggregatedMetric.value(), tags)
}
return gaugeObject
}

def close(): Unit = gaugeLock synchronized {
if (gaugeObject != null) {
metricsGroup.removeMetric(metricType)
metricsGroup.removeMetric(metricType, tags)
brokerTopicAggregatedMetric.close()
gaugeObject = null
}
Expand Down Expand Up @@ -348,7 +348,8 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf
).asJava)
metricGaugeTypeMap.putAll(Map(
RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric),
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, new BrokerTopicAggregatedMetric),
RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName, new BrokerTopicAggregatedMetric),
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, new BrokerTopicAggregatedMetric)
).asJava)
})

Expand Down Expand Up @@ -413,6 +414,18 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf

def remoteCopyBytesLag: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value()

def recordRemoteLogMetadataCount(partition: Int, count: Long): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.setPartitionMetricValue(partition, count)
}

def removeRemoteLogMetadataCount(partition: Int): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.removePartition(partition)
}

def remoteLogMetadataCount: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric.value()

def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent)
Expand Down Expand Up @@ -552,6 +565,7 @@ class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Op
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName)
Expand Down
49 changes: 40 additions & 9 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -204,6 +205,7 @@ void setUp() throws Exception {
topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId());
Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100");
remoteLogManagerConfig = createRLMConfig(props);
brokerTopicStats = new BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));

Expand Down Expand Up @@ -640,9 +642,10 @@ void testCustomMetadataSizeExceedsLimit() throws Exception {
}

@Test
void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
int segmentCount = 3;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());

// leader epoch preparation
Expand Down Expand Up @@ -675,6 +678,12 @@ void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(250L);
when(mockLog.logEndOffset()).thenReturn(500L);
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", 100L);
logProps.put("retention.ms", -1L);
LogConfig logConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(logConfig);

OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get();
Expand All @@ -690,21 +699,43 @@ void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);

CountDownLatch latch = new CountDownLatch(1);
CountDownLatch copyLogSegmentLatch = new CountDownLatch(1);
doAnswer(ans -> {
// waiting for verification
latch.await();
return null;
copyLogSegmentLatch.await(5000, TimeUnit.MILLISECONDS);
return Optional.empty();
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));

CountDownLatch remoteLogMetadataCountLatch = new CountDownLatch(1);
doAnswer(ans -> {
remoteLogMetadataCountLatch.await(5000, TimeUnit.MILLISECONDS);
return null;
}).when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));

Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
Partition mockFollowerPartition = mockPartition(followerTopicIdPartition);

// before running tasks, the remote log manager tasks should be all idle
List<RemoteLogSegmentMetadata> list = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// return 3 metadata and then return 0 to simulate all segments are deleted
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(list.iterator()).thenReturn(Collections.emptyIterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenReturn(list.iterator()).thenReturn(list.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1)).thenReturn(list.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 2)).thenReturn(list.iterator());

// before running tasks, the remote log manager tasks should be all idle and the remote log metadata count should be 0
assertEquals(1.0, (double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent"));
assertEquals(0, safeLongYammerMetricValue("RemoteLogMetadataCount,topic=" + leaderTopicIdPartition.topic()));
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.singleton(mockFollowerPartition), topicIds);
assertTrue((double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 1.0);
// unlock copyLogSegmentData
latch.countDown();

copyLogSegmentLatch.countDown();

// Now, the `RemoteLogMetadataCount` should set to the expected value
TestUtils.waitForCondition(() -> safeLongYammerMetricValue("RemoteLogMetadataCount,topic=" + leaderTopicIdPartition.topic()) == segmentCount,
"Didn't show the expected RemoteLogMetadataCount metric value.");
remoteLogMetadataCountLatch.countDown();

TestUtils.waitForCondition(() -> safeLongYammerMetricValue("RemoteLogMetadataCount,topic=" + leaderTopicIdPartition.topic()) == 0,
"Didn't reset to 0 for RemoteLogMetadataCount metric value when no remote log metadata.");
}

@Test
Expand Down Expand Up @@ -793,7 +824,7 @@ void testRemoteLogManagerRemoteMetrics() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
doAnswer(ans -> Optional.empty()).doAnswer(ans -> {
// waiting for verification
latch.await();
latch.await(5000, TimeUnit.MILLISECONDS);
return Optional.empty();
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));

Expand Down
24 changes: 21 additions & 3 deletions core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ class KafkaRequestHandlerTest {
brokerTopicStats.topicStats(topic)
val gaugeMetrics = Set(
RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName,
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName)
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName,
RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName)

RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => {
if (systemRemoteStorageEnabled) {
if (!gaugeMetrics.contains(metric.getName)) {
Expand All @@ -209,9 +211,9 @@ class KafkaRequestHandlerTest {
})
gaugeMetrics.foreach(metricName => {
if (systemRemoteStorageEnabled) {
assertTrue(brokerTopicStats.topicStats(topic).metricGaugeMap.contains(metricName), metricName)
assertTrue(brokerTopicStats.topicStats(topic).metricGaugeMap.contains(metricName), "The metric is missing:" + metricName)
} else {
assertFalse(brokerTopicStats.topicStats(topic).metricGaugeMap.contains(metricName), metricName)
assertFalse(brokerTopicStats.topicStats(topic).metricGaugeMap.contains(metricName), "The metric should appear:" + metricName)
}
})
}
Expand Down Expand Up @@ -321,4 +323,20 @@ class KafkaRequestHandlerTest {
assertEquals(0, brokerTopicMetrics.remoteCopyBytesLag)
}

@Test
def testRemoteLogMetadataCount(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()

assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount)
brokerTopicMetrics.recordRemoteLogMetadataCount(0, 1)
assertEquals(1, brokerTopicMetrics.remoteLogMetadataCount)

brokerTopicMetrics.recordRemoteLogMetadataCount(1, 2)
brokerTopicMetrics.recordRemoteLogMetadataCount(2, 3)
assertEquals(6, brokerTopicMetrics.remoteLogMetadataCount)

brokerTopicMetrics.close()

assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3992,7 +3992,7 @@ class ReplicaManagerTest {
doAnswer(_ => {
queueLatch.countDown()
// wait until verification completed
doneLatch.await()
doneLatch.await(5000, TimeUnit.MILLISECONDS)
new FetchDataInfo(new LogOffsetMetadata(startOffset), mock(classOf[Records]))
}).when(spyRLM).read(any())

Expand All @@ -4001,7 +4001,7 @@ class ReplicaManagerTest {
replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(topicId, fetchOffset, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, fetchCallback)

// wait until at least 2 task submitted to use all the available threads
queueLatch.await()
queueLatch.await(5000, TimeUnit.MILLISECONDS)
// RemoteLogReader should not be all idle
assertTrue(yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double] < 1.0)
// RemoteLogReader should queue some tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class RemoteStorageMetrics {
private static final String FAILED_REMOTE_FETCH_PER_SEC = "RemoteFetchErrorsPerSec";
private static final String FAILED_REMOTE_COPY_PER_SEC = "RemoteCopyErrorsPerSec";
private static final String REMOTE_COPY_LAG_BYTES = "RemoteCopyLagBytes";
private static final String REMOTE_LOG_METADATA_COUNT = "RemoteLogMetadataCount";
private static final String REMOTE_LOG_SIZE_COMPUTATION_TIME = "RemoteLogSizeComputationTime";
private static final String FAILED_REMOTE_DELETE_PER_SEC = "RemoteDeleteErrorsPerSec";
private static final String FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC = "BuildRemoteLogAuxStateErrorsPerSec";
Expand All @@ -70,6 +71,8 @@ public class RemoteStorageMetrics {
"kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC);
public final static MetricName REMOTE_COPY_LOG_BYTES_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_COPY_LAG_BYTES);
public final static MetricName REMOTE_LOG_METADATA_COUNT_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_LOG_METADATA_COUNT);
public final static MetricName REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_LOG_SIZE_COMPUTATION_TIME);
public final static MetricName FAILED_REMOTE_DELETE_PER_SEC_METRIC = getMetricName(
Expand Down Expand Up @@ -100,6 +103,7 @@ public static Set<MetricName> allMetrics() {
metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);
metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC);

return metrics;
Expand All @@ -117,6 +121,7 @@ public static Set<MetricName> brokerTopicStatsMetrics() {
metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
metrics.add(REMOTE_COPY_LOG_BYTES_METRIC);
metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);
metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC);
metrics.add(FAILED_REMOTE_DELETE_PER_SEC_METRIC);
metrics.add(FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC);
Expand Down

0 comments on commit 4e11de0

Please sign in to comment.