Skip to content

Commit

Permalink
add requestbasedmetarepository to dvc
Browse files Browse the repository at this point in the history
  • Loading branch information
pthirun committed Feb 4, 2025
1 parent fe1500f commit ba43f3b
Show file tree
Hide file tree
Showing 10 changed files with 519 additions and 134 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.linkedin.davinci.repository;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME;
import static java.lang.Thread.currentThread;

import com.linkedin.davinci.stats.NativeMetadataRepositoryStats;
Expand All @@ -25,17 +23,11 @@
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.system.store.MetaStoreDataType;
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -70,7 +62,7 @@ public abstract class NativeMetadataRepository
private final Map<String, StoreConfig> storeConfigMap = new VeniceConcurrentHashMap<>();
// Local cache for key/value schemas. SchemaData supports one key schema per store only, which may need to be changed
// for key schema evolvability.
private final Map<String, SchemaData> schemaMap = new VeniceConcurrentHashMap<>();
protected final Map<String, SchemaData> schemaMap = new VeniceConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final Set<StoreDataChangedListener> listeners = new CopyOnWriteArraySet<>();
private final AtomicLong totalStoreReadQuota = new AtomicLong();
Expand Down Expand Up @@ -125,11 +117,20 @@ public static NativeMetadataRepository getInstance(
ClientConfig clientConfig,
VeniceProperties backendConfig,
ICProvider icProvider) {

NativeMetadataRepository nativeMetadataRepository;
if (clientConfig.isUseRequestBasedMetaRepository()) {
nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig);
} else {
nativeMetadataRepository = new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider);
}

LOGGER.info(
"Initializing {} with {}",
NativeMetadataRepository.class.getSimpleName(),
ThinClientMetaStoreBasedRepository.class.getSimpleName());
return new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider);
nativeMetadataRepository.getClass().getSimpleName());

return nativeMetadataRepository;
}

@Override
Expand Down Expand Up @@ -171,20 +172,14 @@ public boolean hasStore(String storeName) {
@Override
public Store refreshOneStore(String storeName) {
try {
getAndSetStoreConfigFromSystemStore(storeName);
StoreConfig storeConfig = storeConfigMap.get(storeName);
StoreConfig storeConfig = cacheStoreConfigFromRemote(storeName);
if (storeConfig == null) {
throw new VeniceException("StoreConfig is missing unexpectedly for store: " + storeName);
}
Store newStore = getStoreFromSystemStore(storeName, storeConfig.getCluster());
// isDeleting check to detect deleted store is only supported by meta system store based implementation.
if (newStore != null && !storeConfig.isDeleting()) {
putStore(newStore);
getAndCacheSchemaDataFromSystemStore(storeName);
nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis());
} else {
removeStore(storeName);
}
Store newStore = fetchStoreFromRemote(storeName, storeConfig.getCluster());
putStore(newStore);
getAndCacheSchemaData(storeName);
nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis());
return newStore;
} catch (ServiceDiscoveryException | MissingKeyInStoreMetadataException e) {
throw new VeniceNoStoreException(storeName, e);
Expand Down Expand Up @@ -393,74 +388,17 @@ public void clear() {
* Get the store cluster config from system store and update the local cache with it. Different implementation will
* get the data differently but should all populate the store cluster config map.
*/
protected void getAndSetStoreConfigFromSystemStore(String storeName) {
storeConfigMap.put(storeName, getStoreConfigFromSystemStore(storeName));
protected StoreConfig cacheStoreConfigFromRemote(String storeName) {
StoreConfig storeConfig = fetchStoreConfigFromRemote(storeName);
storeConfigMap.put(storeName, storeConfig);
return storeConfig;
}

protected abstract StoreConfig getStoreConfigFromSystemStore(String storeName);
protected abstract StoreConfig fetchStoreConfigFromRemote(String storeName);

protected abstract Store getStoreFromSystemStore(String storeName, String clusterName);
protected abstract Store fetchStoreFromRemote(String storeName, String clusterName);

protected abstract StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key);

// Helper function with common code for retrieving StoreConfig from meta system store.
protected StoreConfig getStoreConfigFromMetaSystemStore(String storeName) {
StoreClusterConfig clusterConfig = getStoreMetaValue(
storeName,
MetaStoreDataType.STORE_CLUSTER_CONFIG
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig;
return new StoreConfig(clusterConfig);
}

// Helper function with common code for retrieving SchemaData from meta system store.
protected SchemaData getSchemaDataFromMetaSystemStore(String storeName) {
SchemaData schemaData = schemaMap.get(storeName);
SchemaEntry keySchema;
if (schemaData == null) {
// Retrieve the key schema and initialize SchemaData only if it's not cached yet.
StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
Map<CharSequence, CharSequence> keySchemaMap =
getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap;
if (keySchemaMap.isEmpty()) {
throw new VeniceException("No key schema found for store: " + storeName);
}
Map.Entry<CharSequence, CharSequence> keySchemaEntry = keySchemaMap.entrySet().iterator().next();
keySchema =
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString());
schemaData = new SchemaData(storeName, keySchema);
}
StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
Map<CharSequence, CharSequence> valueSchemaMap =
getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap;
// Check the value schema string, if it's empty then try to query the other key space for individual value schema.
for (Map.Entry<CharSequence, CharSequence> entry: valueSchemaMap.entrySet()) {
// Check if we already have the corresponding value schema
int valueSchemaId = Integer.parseInt(entry.getKey().toString());
if (schemaData.getValueSchema(valueSchemaId) != null) {
continue;
}
if (entry.getValue().toString().isEmpty()) {
// The value schemas might be too large to be stored in a single K/V.
StoreMetaKey individualValueSchemaKey =
MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap<String, String>() {
{
put(KEY_STRING_STORE_NAME, storeName);
put(KEY_STRING_SCHEMA_ID, entry.getKey().toString());
}
});
// Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in
// the individual value schema key space.
String valueSchema =
getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString();
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema));
} else {
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString()));
}
}
return schemaData;
}
protected abstract SchemaData getSchemaData(String storeName);

protected Store putStore(Store newStore) {
// Workaround to make old metadata compatible with new fields
Expand Down Expand Up @@ -516,11 +454,11 @@ protected void notifyStoreChanged(Store store) {
}
}

protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) {
protected SchemaData getAndCacheSchemaData(String storeName) {
if (!hasStore(storeName)) {
throw new VeniceNoStoreException(storeName);
}
SchemaData schemaData = getSchemaDataFromSystemStore(storeName);
SchemaData schemaData = getSchemaData(storeName);
schemaMap.put(storeName, schemaData);
return schemaData;
}
Expand All @@ -532,7 +470,7 @@ protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) {
private SchemaData getSchemaDataFromReadThroughCache(String storeName) throws VeniceNoStoreException {
SchemaData schemaData = schemaMap.get(storeName);
if (schemaData == null) {
schemaData = getAndCacheSchemaDataFromSystemStore(storeName);
schemaData = getAndCacheSchemaData(storeName);
}
return schemaData;
}
Expand All @@ -545,8 +483,6 @@ protected SchemaEntry getValueSchemaInternally(String storeName, int id) {
return schemaData.getValueSchema(id);
}

protected abstract SchemaData getSchemaDataFromSystemStore(String storeName);

/**
* This function is used to remove schema entry for the given store from local cache,
* and related listeners as well.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package com.linkedin.davinci.repository;

import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.D2ServiceDiscovery;
import com.linkedin.venice.client.store.transport.D2TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
import com.linkedin.venice.systemstore.schemas.StoreProperties;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import org.apache.avro.Schema;


public class RequestBasedMetaRepository extends NativeMetadataRepository {

// cluster -> client
private final Map<String, D2TransportClient> d2TransportClientMap = new VeniceConcurrentHashMap<>();

// storeName -> T
private final Map<String, SchemaData> storeSchemaMap = new VeniceConcurrentHashMap<>();

private final D2TransportClient d2DiscoveryTransportClient;
private D2ServiceDiscovery d2ServiceDiscovery;

public RequestBasedMetaRepository(ClientConfig clientConfig, VeniceProperties backendConfig) {
super(clientConfig, backendConfig);
this.d2ServiceDiscovery = new D2ServiceDiscovery();
this.d2DiscoveryTransportClient =
new D2TransportClient(clientConfig.getD2ServiceName(), clientConfig.getD2Client());
}

@Override
public void clear() {
super.clear();

// Clear cache
d2TransportClientMap.clear();
storeSchemaMap.clear();
}

@Override
protected StoreConfig fetchStoreConfigFromRemote(String storeName) {
// Create StoreConfig from D2
D2TransportClient d2TransportClient = getD2TransportClient(storeName);

StoreClusterConfig storeClusterConfig = new StoreClusterConfig();
storeClusterConfig.setStoreName(storeName);
storeClusterConfig.setCluster(d2TransportClient.getServiceName());

return new StoreConfig(storeClusterConfig);
}

@Override
protected Store fetchStoreFromRemote(String storeName, String clusterName) {
// Fetch store, bypass cache
StorePropertiesResponseRecord record = fetchAndCacheStorePropertiesResponseRecord(storeName);
StoreProperties storeProperties = record.storeMetaValue.storeProperties;
return new ZKStore(storeProperties);
}

@Override
protected SchemaData getSchemaData(String storeName) {
if (!storeSchemaMap.containsKey(storeName)) {
// Cache miss
fetchAndCacheStorePropertiesResponseRecord(storeName);
}
return storeSchemaMap.get(storeName);
}

protected StorePropertiesResponseRecord fetchAndCacheStorePropertiesResponseRecord(String storeName) {
// TODO PRANAV mutex by storeName needed?

// Request
int maxValueSchemaId = getMaxValueSchemaId(storeName);
D2TransportClient d2TransportClient = getD2TransportClient(storeName);
String requestBasedStorePropertiesURL = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + storeName;
if (maxValueSchemaId > SchemaData.UNKNOWN_SCHEMA_ID) {
requestBasedStorePropertiesURL += "/" + maxValueSchemaId;
}

TransportClientResponse response;
try {
response = d2TransportClient.get(requestBasedStorePropertiesURL).get();
} catch (Exception e) {
throw new RuntimeException(
"Encountered exception while trying to send store properties request to " + requestBasedStorePropertiesURL
+ ": " + e);
}

// Deserialize
Schema writerSchema = StorePropertiesResponseRecord.SCHEMA$;
RecordDeserializer<StorePropertiesResponseRecord> recordDeserializer = FastSerializerDeserializerFactory
.getFastAvroSpecificDeserializer(writerSchema, StorePropertiesResponseRecord.class);
StorePropertiesResponseRecord record = recordDeserializer.deserialize(response.getBody());

// Cache
cacheStoreSchema(storeName, record);

return record;
}

D2TransportClient getD2TransportClient(String storeName) {
synchronized (this) {
// Get cluster for store
String serverD2ServiceName =
d2ServiceDiscovery.find(d2DiscoveryTransportClient, storeName, true).getServerD2Service();
if (d2TransportClientMap.containsKey(serverD2ServiceName)) {
return d2TransportClientMap.get(serverD2ServiceName);
}
D2TransportClient d2TransportClient = new D2TransportClient(serverD2ServiceName, clientConfig.getD2Client());
d2TransportClientMap.put(serverD2ServiceName, d2TransportClient);
return d2TransportClient;
}
}

private int getMaxValueSchemaId(String storeName) {
if (!schemaMap.containsKey(storeName)) {
return SchemaData.UNKNOWN_SCHEMA_ID;
}
return schemaMap.get(storeName).getMaxValueSchemaId();
}

private void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) {

if (!storeSchemaMap.containsKey(storeName)) {
// New schema data
Map.Entry<CharSequence, CharSequence> keySchemaEntry =
record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next();
SchemaData schemaData = new SchemaData(
storeName,
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()));
storeSchemaMap.put(storeName, schemaData);
}

// Store Value Schemas
for (Map.Entry<CharSequence, CharSequence> entry: record.getStoreMetaValue()
.getStoreValueSchemas()
.getValueSchemaMap()
.entrySet()) {
storeSchemaMap.get(storeName)
.addValueSchema(new SchemaEntry(Integer.parseInt(entry.getKey().toString()), entry.getValue().toString()));
}
}
}
Loading

0 comments on commit ba43f3b

Please sign in to comment.