Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store remote segment metadata in RocksDB #66

Open
wants to merge 5 commits into
base: tiered-storage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@ project(':clients') {
compile libs.lz4
compile libs.snappy
compile libs.slf4jApi
compile libs.rocksDBJni

compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
compileOnly libs.jacksonJDK8Datatypes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.apache.kafka.common.log.remote.metadata.storage;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.common.log.remote.storage.RemoteLogSegmentMetadata;

import java.io.IOException;
import java.util.NavigableMap;

public interface MetadataStore {
void flush() throws IOException;

void load() throws IOException;

void update(TopicPartition tp, RemoteLogSegmentMetadata metadata);

NavigableMap<Long, RemoteLogSegmentId> getSegmentIds(TopicPartition tp);

RemoteLogSegmentMetadata getMetaData(RemoteLogSegmentId id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

/**
* This is an implementation of {@link RemoteLogMetadataManager} based on internal topic storage.
*
* <p>
* This may be moved to core module as it is part of cluster running on brokers.
*/
public class RLMMWithTopicStorage implements RemoteLogMetadataManager, RemoteLogSegmentMetadataUpdater {
Expand All @@ -75,31 +75,26 @@ public class RLMMWithTopicStorage implements RemoteLogMetadataManager, RemoteLog

public static final String REMOTE_LOG_METADATA_TOPIC_NAME = Topic.REMOTE_LOG_METADATA_TOPIC_NAME;
public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP =
"remote.log.metadata.topic.replication.factor";
"remote.log.metadata.topic.replication.factor";
public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.partitions";

public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS_PROP =
"remote.log.metadata.topic.retention.ms";
public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS = 365 * 24 * 60 * 60 * 1000L;
public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;

private static final String COMMITTED_LOG_METADATA_FILE_NAME = "_rlmm_committed_metadata_log";

private static final int PUBLISH_TIMEOUT_SECS = 120;
public static final String REMOTE_LOG_METADATA_CLIENT_PREFIX = "__remote_log_metadata_client";

protected int noOfMetadataTopicPartitions;
private ConcurrentSkipListMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> idWithSegmentMetadata =
new ConcurrentSkipListMap<>();
private Map<TopicPartition, NavigableMap<Long, RemoteLogSegmentId>> partitionsWithSegmentIds =
new ConcurrentHashMap<>();
private MetadataStore metadataStore;
private KafkaProducer<String, Object> producer;
private AdminClient adminClient;
private KafkaConsumer<String, RemoteLogSegmentMetadata> consumer;
private String logDir;
private volatile Map<String, ?> configs;

private CommittedLogMetadataStore committedLogMetadataStore;
private ConsumerTask consumerTask;
private volatile boolean initialized;
private boolean configured;
Expand Down Expand Up @@ -128,7 +123,7 @@ public void putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMet

private void publishMessageToPartition(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
log.info("Publishing messages to remote log metadata topic for remote log segment metadata [{}]",
remoteLogSegmentMetadata);
remoteLogSegmentMetadata);

RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
int partitionNo = metadataPartitionFor(remoteLogSegmentId.topicPartition());
Expand Down Expand Up @@ -164,15 +159,15 @@ private void waitTillConsumerCatchesUp(RecordMetadata recordMetadata) throws Int
// if the current assignment does not have the subscription for this partition then return immediately.
if (!consumerTask.assignedPartition(partition)) {
log.warn("This consumer is not subscribed to the target partition [{}] on which message is produced.",
partition);
partition);
return;
}

final long offset = recordMetadata.offset();
final long sleepTimeMs = 1000L;
while (consumerTask.committedOffset(partition) < offset) {
log.debug("Did not receive the messages till the expected offset [{}] for partition [{}], Sleeping for [{}]",
offset, partition, sleepTimeMs);
offset, partition, sleepTimeMs);
Utils.sleep(sleepTimeMs);
}
}
Expand All @@ -181,7 +176,7 @@ private void waitTillConsumerCatchesUp(RecordMetadata recordMetadata) throws Int
public RemoteLogSegmentMetadata remoteLogSegmentMetadata(TopicPartition topicPartition, long offset, int epochForOffset) throws RemoteStorageException {
ensureInitialized();

NavigableMap<Long, RemoteLogSegmentId> remoteLogSegmentIdMap = partitionsWithSegmentIds.get(topicPartition);
NavigableMap<Long, RemoteLogSegmentId> remoteLogSegmentIdMap = metadataStore.getSegmentIds(topicPartition);
if (remoteLogSegmentIdMap == null) {
return null;
}
Expand All @@ -193,15 +188,15 @@ public RemoteLogSegmentMetadata remoteLogSegmentMetadata(TopicPartition topicPar
return null;
}

RemoteLogSegmentMetadata remoteLogSegmentMetadata = idWithSegmentMetadata.get(entry.getValue());
RemoteLogSegmentMetadata remoteLogSegmentMetadata = metadataStore.getMetaData(entry.getValue());
// look forward for the segment which has the target offset, if it does not exist then return the highest
// offset segment available.
while (remoteLogSegmentMetadata != null && remoteLogSegmentMetadata.endOffset() < offset) {
entry = remoteLogSegmentIdMap.higherEntry(entry.getKey());
if (entry == null) {
break;
}
remoteLogSegmentMetadata = idWithSegmentMetadata.get(entry.getValue());
remoteLogSegmentMetadata = metadataStore.getMetaData(entry.getValue());
}

return remoteLogSegmentMetadata;
Expand All @@ -211,15 +206,15 @@ public RemoteLogSegmentMetadata remoteLogSegmentMetadata(TopicPartition topicPar
public Optional<Long> earliestLogOffset(TopicPartition topicPartition, int leaderEpoch) throws RemoteStorageException {
ensureInitialized();

NavigableMap<Long, RemoteLogSegmentId> map = partitionsWithSegmentIds.get(topicPartition);
NavigableMap<Long, RemoteLogSegmentId> map = metadataStore.getSegmentIds(topicPartition);

return map == null || map.isEmpty() ? Optional.empty() : Optional.of(map.firstEntry().getKey());
}

public Optional<Long> highestLogOffset(TopicPartition topicPartition, int leaderEpoch) throws RemoteStorageException {
ensureInitialized();

NavigableMap<Long, RemoteLogSegmentId> map = partitionsWithSegmentIds.get(topicPartition);
NavigableMap<Long, RemoteLogSegmentId> map = metadataStore.getSegmentIds(topicPartition);

return map == null || map.isEmpty() ? Optional.empty() : Optional.of(map.lastEntry().getKey());
}
Expand All @@ -236,15 +231,15 @@ public void deleteRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSeg
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition, long minOffset) {
ensureInitialized();

NavigableMap<Long, RemoteLogSegmentId> map = partitionsWithSegmentIds.get(topicPartition);
NavigableMap<Long, RemoteLogSegmentId> map = metadataStore.getSegmentIds(topicPartition);
if (map == null) {
return Collections.emptyIterator();
}

return map.tailMap(minOffset, true).values().stream()
.filter(id -> idWithSegmentMetadata.get(id) != null)
.map(remoteLogSegmentId -> idWithSegmentMetadata.get(remoteLogSegmentId))
.collect(Collectors.toList()).iterator();
.filter(id -> metadataStore.getMetaData(id) != null)
.map(remoteLogSegmentId -> metadataStore.getMetaData(remoteLogSegmentId))
.collect(Collectors.toList()).iterator();
}

@Override
Expand All @@ -256,7 +251,7 @@ public void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions,
ensureInitialized();

log.info("Received leadership notifications with leader partitions {} and follower partitions {}",
leaderPartitions, followerPartitions);
leaderPartitions, followerPartitions);

final HashSet<TopicPartition> allPartitions = new HashSet<>(leaderPartitions);
allPartitions.addAll(followerPartitions);
Expand All @@ -277,17 +272,20 @@ private synchronized void initializeResources() {

if (configs.get(BOOTSTRAP_SERVERS_CONFIG) == null) {
throw new InvalidConfigurationException("Broker endpoint must be configured for the remote log " +
"metadata manager.");
"metadata manager.");
}

//create clients
createAdminClient();
createProducer();
createConsumer();

// todo-tier use rocksdb
//load the stored data
loadMetadataStore();
//load the stored data from local storage
try {
metadataStore.load();
} catch (IOException e) {
throw new KafkaException("Error occurred while loading remote log metadata file.", e);
}

initConsumerThread();

Expand All @@ -314,8 +312,7 @@ public void close() throws IOException {
} catch (Exception ex) { /* ignore */}

Utils.closeQuietly(consumerTask, "ConsumerTask");
idWithSegmentMetadata = new ConcurrentSkipListMap<>();
partitionsWithSegmentIds = new ConcurrentHashMap<>();
metadataStore.flush();
}

public int noOfMetadataTopicPartitions() {
Expand All @@ -337,13 +334,26 @@ public synchronized void configure(Map<String, ?> configs) {
(propVal == null) ? DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS : Integer.parseInt(propVal.toString());
log.info("No of remote log metadata topic partitions: [{}]", noOfMetadataTopicPartitions);


logDir = (String) configs.get("log.dir");
if (logDir == null || logDir.trim().isEmpty()) {
throw new IllegalArgumentException("log.dir can not be null or empty");
}

File metadataLogFile = new File(logDir, COMMITTED_LOG_METADATA_FILE_NAME);
committedLogMetadataStore = new CommittedLogMetadataStore(metadataLogFile);
boolean loadRocksDB = false;
try {
// Try to load the native libraries of RocksDB
// If the platform is not supported by RocksDB, this will fail.
RocksDBMetadataStore.loadLibrary();
loadRocksDB = true;
} catch (Exception e) {
log.warn("Failed to load RocksDB native libraries. Will use simple file metadata store instead.", e);
}

if (loadRocksDB)
metadataStore = new RocksDBMetadataStore(logDir, configs);
else
metadataStore = new SimpleMetadataStore(logDir, configs);

configured = true;

Expand All @@ -352,24 +362,11 @@ public synchronized void configure(Map<String, ?> configs) {
log.info("RLMMWithTopicStorage is initialized: {}", this);
}

private void loadMetadataStore() {
try {
final Collection<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = committedLogMetadataStore.read();
for (RemoteLogSegmentMetadata entry : remoteLogSegmentMetadatas) {
partitionsWithSegmentIds.computeIfAbsent(entry.remoteLogSegmentId().topicPartition(),
k -> new ConcurrentSkipListMap<>()).put(entry.startOffset(), entry.remoteLogSegmentId());
idWithSegmentMetadata.put(entry.remoteLogSegmentId(), entry);
}
} catch (IOException e) {
throw new KafkaException("Error occurred while loading remote log metadata file.", e);
}
}

public void syncLogMetadataDataFile() throws IOException {
ensureInitialized();

// idWithSegmentMetadata and partitionsWithSegmentIds are not going to be modified while this is being done.
committedLogMetadataStore.write(idWithSegmentMetadata.values());
metadataStore.flush();
}

private void initConsumerThread() {
Expand All @@ -388,16 +385,7 @@ private void initConsumerThread() {
public void updateRemoteLogSegmentMetadata(TopicPartition tp, RemoteLogSegmentMetadata metadata) {
ensureInitialized();

final NavigableMap<Long, RemoteLogSegmentId> map = partitionsWithSegmentIds
.computeIfAbsent(tp, topicPartition -> new ConcurrentSkipListMap<>());
if (metadata.markedForDeletion()) {
idWithSegmentMetadata.remove(metadata.remoteLogSegmentId());
// todo-tier check for concurrent updates when leader/follower switches occur
map.remove(metadata.startOffset());
} else {
map.put(metadata.startOffset(), metadata.remoteLogSegmentId());
idWithSegmentMetadata.put(metadata.remoteLogSegmentId(), metadata);
}
metadataStore.update(tp, metadata);
}

public int metadataPartitionFor(TopicPartition tp) {
Expand Down
Loading