From ba43f3ba7f31a1c12cc063eec9ece584145b2899 Mon Sep 17 00:00:00 2001 From: pthirun Date: Wed, 22 Jan 2025 16:43:04 -0800 Subject: [PATCH] add requestbasedmetarepository to dvc --- .../repository/NativeMetadataRepository.java | 118 +++----------- .../RequestBasedMetaRepository.java | 154 ++++++++++++++++++ .../ThinClientMetaStoreBasedRepository.java | 73 ++++++++- .../NativeMetadataRepositoryTest.java | 96 +++++++++-- .../RequestBasedMetaRepositoryTest.java | 80 +++++++++ .../venice/client/store/ClientConfig.java | 14 +- .../linkedin/venice/meta/ReadOnlyStore.java | 2 +- .../venice/endToEnd/MetaSystemStoreTest.java | 71 +++++++- .../ServerReadMetadataRepository.java | 2 +- .../ServerReadMetadataRepositoryTest.java | 43 ++++- 10 files changed, 519 insertions(+), 134 deletions(-) create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java create mode 100644 clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java index 7d595198864..c332cb63206 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java @@ -1,8 +1,6 @@ package com.linkedin.davinci.repository; import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; -import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID; -import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; import static java.lang.Thread.currentThread; import com.linkedin.davinci.stats.NativeMetadataRepositoryStats; @@ -25,17 +23,11 @@ import com.linkedin.venice.schema.rmd.RmdSchemaEntry; import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.service.ICProvider; -import com.linkedin.venice.system.store.MetaStoreDataType; -import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; -import com.linkedin.venice.systemstore.schemas.StoreMetaKey; -import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,7 +62,7 @@ public abstract class NativeMetadataRepository private final Map storeConfigMap = new VeniceConcurrentHashMap<>(); // Local cache for key/value schemas. SchemaData supports one key schema per store only, which may need to be changed // for key schema evolvability. - private final Map schemaMap = new VeniceConcurrentHashMap<>(); + protected final Map schemaMap = new VeniceConcurrentHashMap<>(); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final Set listeners = new CopyOnWriteArraySet<>(); private final AtomicLong totalStoreReadQuota = new AtomicLong(); @@ -125,11 +117,20 @@ public static NativeMetadataRepository getInstance( ClientConfig clientConfig, VeniceProperties backendConfig, ICProvider icProvider) { + + NativeMetadataRepository nativeMetadataRepository; + if (clientConfig.isUseRequestBasedMetaRepository()) { + nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig); + } else { + nativeMetadataRepository = new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider); + } + LOGGER.info( "Initializing {} with {}", NativeMetadataRepository.class.getSimpleName(), - ThinClientMetaStoreBasedRepository.class.getSimpleName()); - return new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider); + nativeMetadataRepository.getClass().getSimpleName()); + + return nativeMetadataRepository; } @Override @@ -171,20 +172,14 @@ public boolean hasStore(String storeName) { @Override public Store refreshOneStore(String storeName) { try { - getAndSetStoreConfigFromSystemStore(storeName); - StoreConfig storeConfig = storeConfigMap.get(storeName); + StoreConfig storeConfig = cacheStoreConfigFromRemote(storeName); if (storeConfig == null) { throw new VeniceException("StoreConfig is missing unexpectedly for store: " + storeName); } - Store newStore = getStoreFromSystemStore(storeName, storeConfig.getCluster()); - // isDeleting check to detect deleted store is only supported by meta system store based implementation. - if (newStore != null && !storeConfig.isDeleting()) { - putStore(newStore); - getAndCacheSchemaDataFromSystemStore(storeName); - nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis()); - } else { - removeStore(storeName); - } + Store newStore = fetchStoreFromRemote(storeName, storeConfig.getCluster()); + putStore(newStore); + getAndCacheSchemaData(storeName); + nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis()); return newStore; } catch (ServiceDiscoveryException | MissingKeyInStoreMetadataException e) { throw new VeniceNoStoreException(storeName, e); @@ -393,74 +388,17 @@ public void clear() { * Get the store cluster config from system store and update the local cache with it. Different implementation will * get the data differently but should all populate the store cluster config map. */ - protected void getAndSetStoreConfigFromSystemStore(String storeName) { - storeConfigMap.put(storeName, getStoreConfigFromSystemStore(storeName)); + protected StoreConfig cacheStoreConfigFromRemote(String storeName) { + StoreConfig storeConfig = fetchStoreConfigFromRemote(storeName); + storeConfigMap.put(storeName, storeConfig); + return storeConfig; } - protected abstract StoreConfig getStoreConfigFromSystemStore(String storeName); + protected abstract StoreConfig fetchStoreConfigFromRemote(String storeName); - protected abstract Store getStoreFromSystemStore(String storeName, String clusterName); + protected abstract Store fetchStoreFromRemote(String storeName, String clusterName); - protected abstract StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key); - - // Helper function with common code for retrieving StoreConfig from meta system store. - protected StoreConfig getStoreConfigFromMetaSystemStore(String storeName) { - StoreClusterConfig clusterConfig = getStoreMetaValue( - storeName, - MetaStoreDataType.STORE_CLUSTER_CONFIG - .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig; - return new StoreConfig(clusterConfig); - } - - // Helper function with common code for retrieving SchemaData from meta system store. - protected SchemaData getSchemaDataFromMetaSystemStore(String storeName) { - SchemaData schemaData = schemaMap.get(storeName); - SchemaEntry keySchema; - if (schemaData == null) { - // Retrieve the key schema and initialize SchemaData only if it's not cached yet. - StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS - .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); - Map keySchemaMap = - getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap; - if (keySchemaMap.isEmpty()) { - throw new VeniceException("No key schema found for store: " + storeName); - } - Map.Entry keySchemaEntry = keySchemaMap.entrySet().iterator().next(); - keySchema = - new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()); - schemaData = new SchemaData(storeName, keySchema); - } - StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS - .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); - Map valueSchemaMap = - getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap; - // Check the value schema string, if it's empty then try to query the other key space for individual value schema. - for (Map.Entry entry: valueSchemaMap.entrySet()) { - // Check if we already have the corresponding value schema - int valueSchemaId = Integer.parseInt(entry.getKey().toString()); - if (schemaData.getValueSchema(valueSchemaId) != null) { - continue; - } - if (entry.getValue().toString().isEmpty()) { - // The value schemas might be too large to be stored in a single K/V. - StoreMetaKey individualValueSchemaKey = - MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap() { - { - put(KEY_STRING_STORE_NAME, storeName); - put(KEY_STRING_SCHEMA_ID, entry.getKey().toString()); - } - }); - // Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in - // the individual value schema key space. - String valueSchema = - getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString(); - schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema)); - } else { - schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString())); - } - } - return schemaData; - } + protected abstract SchemaData getSchemaData(String storeName); protected Store putStore(Store newStore) { // Workaround to make old metadata compatible with new fields @@ -516,11 +454,11 @@ protected void notifyStoreChanged(Store store) { } } - protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) { + protected SchemaData getAndCacheSchemaData(String storeName) { if (!hasStore(storeName)) { throw new VeniceNoStoreException(storeName); } - SchemaData schemaData = getSchemaDataFromSystemStore(storeName); + SchemaData schemaData = getSchemaData(storeName); schemaMap.put(storeName, schemaData); return schemaData; } @@ -532,7 +470,7 @@ protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) { private SchemaData getSchemaDataFromReadThroughCache(String storeName) throws VeniceNoStoreException { SchemaData schemaData = schemaMap.get(storeName); if (schemaData == null) { - schemaData = getAndCacheSchemaDataFromSystemStore(storeName); + schemaData = getAndCacheSchemaData(storeName); } return schemaData; } @@ -545,8 +483,6 @@ protected SchemaEntry getValueSchemaInternally(String storeName, int id) { return schemaData.getValueSchema(id); } - protected abstract SchemaData getSchemaDataFromSystemStore(String storeName); - /** * This function is used to remove schema entry for the given store from local cache, * and related listeners as well. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java new file mode 100644 index 00000000000..24531532945 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java @@ -0,0 +1,154 @@ +package com.linkedin.davinci.repository; + +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.D2ServiceDiscovery; +import com.linkedin.venice.client.store.transport.D2TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; +import com.linkedin.venice.meta.QueryAction; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord; +import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; +import com.linkedin.venice.serializer.RecordDeserializer; +import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; +import com.linkedin.venice.systemstore.schemas.StoreProperties; +import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Map; +import org.apache.avro.Schema; + + +public class RequestBasedMetaRepository extends NativeMetadataRepository { + + // cluster -> client + private final Map d2TransportClientMap = new VeniceConcurrentHashMap<>(); + + // storeName -> T + private final Map storeSchemaMap = new VeniceConcurrentHashMap<>(); + + private final D2TransportClient d2DiscoveryTransportClient; + private D2ServiceDiscovery d2ServiceDiscovery; + + public RequestBasedMetaRepository(ClientConfig clientConfig, VeniceProperties backendConfig) { + super(clientConfig, backendConfig); + this.d2ServiceDiscovery = new D2ServiceDiscovery(); + this.d2DiscoveryTransportClient = + new D2TransportClient(clientConfig.getD2ServiceName(), clientConfig.getD2Client()); + } + + @Override + public void clear() { + super.clear(); + + // Clear cache + d2TransportClientMap.clear(); + storeSchemaMap.clear(); + } + + @Override + protected StoreConfig fetchStoreConfigFromRemote(String storeName) { + // Create StoreConfig from D2 + D2TransportClient d2TransportClient = getD2TransportClient(storeName); + + StoreClusterConfig storeClusterConfig = new StoreClusterConfig(); + storeClusterConfig.setStoreName(storeName); + storeClusterConfig.setCluster(d2TransportClient.getServiceName()); + + return new StoreConfig(storeClusterConfig); + } + + @Override + protected Store fetchStoreFromRemote(String storeName, String clusterName) { + // Fetch store, bypass cache + StorePropertiesResponseRecord record = fetchAndCacheStorePropertiesResponseRecord(storeName); + StoreProperties storeProperties = record.storeMetaValue.storeProperties; + return new ZKStore(storeProperties); + } + + @Override + protected SchemaData getSchemaData(String storeName) { + if (!storeSchemaMap.containsKey(storeName)) { + // Cache miss + fetchAndCacheStorePropertiesResponseRecord(storeName); + } + return storeSchemaMap.get(storeName); + } + + protected StorePropertiesResponseRecord fetchAndCacheStorePropertiesResponseRecord(String storeName) { + // TODO PRANAV mutex by storeName needed? + + // Request + int maxValueSchemaId = getMaxValueSchemaId(storeName); + D2TransportClient d2TransportClient = getD2TransportClient(storeName); + String requestBasedStorePropertiesURL = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + storeName; + if (maxValueSchemaId > SchemaData.UNKNOWN_SCHEMA_ID) { + requestBasedStorePropertiesURL += "/" + maxValueSchemaId; + } + + TransportClientResponse response; + try { + response = d2TransportClient.get(requestBasedStorePropertiesURL).get(); + } catch (Exception e) { + throw new RuntimeException( + "Encountered exception while trying to send store properties request to " + requestBasedStorePropertiesURL + + ": " + e); + } + + // Deserialize + Schema writerSchema = StorePropertiesResponseRecord.SCHEMA$; + RecordDeserializer recordDeserializer = FastSerializerDeserializerFactory + .getFastAvroSpecificDeserializer(writerSchema, StorePropertiesResponseRecord.class); + StorePropertiesResponseRecord record = recordDeserializer.deserialize(response.getBody()); + + // Cache + cacheStoreSchema(storeName, record); + + return record; + } + + D2TransportClient getD2TransportClient(String storeName) { + synchronized (this) { + // Get cluster for store + String serverD2ServiceName = + d2ServiceDiscovery.find(d2DiscoveryTransportClient, storeName, true).getServerD2Service(); + if (d2TransportClientMap.containsKey(serverD2ServiceName)) { + return d2TransportClientMap.get(serverD2ServiceName); + } + D2TransportClient d2TransportClient = new D2TransportClient(serverD2ServiceName, clientConfig.getD2Client()); + d2TransportClientMap.put(serverD2ServiceName, d2TransportClient); + return d2TransportClient; + } + } + + private int getMaxValueSchemaId(String storeName) { + if (!schemaMap.containsKey(storeName)) { + return SchemaData.UNKNOWN_SCHEMA_ID; + } + return schemaMap.get(storeName).getMaxValueSchemaId(); + } + + private void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) { + + if (!storeSchemaMap.containsKey(storeName)) { + // New schema data + Map.Entry keySchemaEntry = + record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next(); + SchemaData schemaData = new SchemaData( + storeName, + new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString())); + storeSchemaMap.put(storeName, schemaData); + } + + // Store Value Schemas + for (Map.Entry entry: record.getStoreMetaValue() + .getStoreValueSchemas() + .getValueSchemaMap() + .entrySet()) { + storeSchemaMap.get(storeName) + .addValueSchema(new SchemaEntry(Integer.parseInt(entry.getKey().toString()), entry.getValue().toString())); + } + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java index 637ba1682ed..dd6b86eb6fc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java @@ -1,6 +1,7 @@ package com.linkedin.davinci.repository; import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_CLUSTER_NAME; +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID; import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; import com.linkedin.venice.client.exceptions.ServiceDiscoveryException; @@ -9,13 +10,16 @@ import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.exceptions.MissingKeyInStoreMetadataException; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceRetriableException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.service.ICProvider; import com.linkedin.venice.system.store.MetaStoreDataType; +import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; import com.linkedin.venice.systemstore.schemas.StoreMetaKey; import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.systemstore.schemas.StoreProperties; @@ -63,12 +67,12 @@ public void subscribe(String storeName) throws InterruptedException { } @Override - protected StoreConfig getStoreConfigFromSystemStore(String storeName) { + protected StoreConfig fetchStoreConfigFromRemote(String storeName) { return getStoreConfigFromMetaSystemStore(storeName); } @Override - protected Store getStoreFromSystemStore(String storeName, String clusterName) { + protected Store fetchStoreFromRemote(String storeName, String clusterName) { StoreProperties storeProperties = getStoreMetaValue(storeName, MetaStoreDataType.STORE_PROPERTIES.getStoreMetaKey(new HashMap() { { @@ -79,12 +83,6 @@ protected Store getStoreFromSystemStore(String storeName, String clusterName) { return new ZKStore(storeProperties); } - @Override - protected SchemaData getSchemaDataFromSystemStore(String storeName) { - return getSchemaDataFromMetaSystemStore(storeName); - } - - @Override protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { final Callable> supplier = () -> getAvroClientForMetaStore(storeName).get(key); Callable> wrappedSupplier = @@ -107,6 +105,56 @@ protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { return value; } + @Override + protected SchemaData getSchemaData(String storeName) { + SchemaData schemaData = schemaMap.get(storeName); + SchemaEntry keySchema; + if (schemaData == null) { + // Retrieve the key schema and initialize SchemaData only if it's not cached yet. + StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + Map keySchemaMap = + getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap; + if (keySchemaMap.isEmpty()) { + throw new VeniceException("No key schema found for store: " + storeName); + } + Map.Entry keySchemaEntry = keySchemaMap.entrySet().iterator().next(); + keySchema = + new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()); + schemaData = new SchemaData(storeName, keySchema); + } + StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + Map valueSchemaMap = + getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap; + // Check the value schema string, if it's empty then try to query the other key space for individual value schema. + for (Map.Entry entry: valueSchemaMap.entrySet()) { + // Check if we already have the corresponding value schema + int valueSchemaId = Integer.parseInt(entry.getKey().toString()); + if (schemaData.getValueSchema(valueSchemaId) != null) { + continue; + } + if (entry.getValue().toString().isEmpty()) { + // The value schemas might be too large to be stored in a single K/V. + StoreMetaKey individualValueSchemaKey = + MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap() { + { + put(KEY_STRING_STORE_NAME, storeName); + put(KEY_STRING_SCHEMA_ID, entry.getKey().toString()); + } + }); + // Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in + // the individual value schema key space. + String valueSchema = + getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString(); + schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema)); + } else { + schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString())); + } + } + return schemaData; + } + private AvroSpecificStoreClient getAvroClientForMetaStore(String storeName) { return storeClientMap.computeIfAbsent(storeName, k -> { ClientConfig clonedClientConfig = ClientConfig.cloneConfig(clientConfig) @@ -118,4 +166,13 @@ private AvroSpecificStoreClient getAvroClientForMe return ClientFactory.getAndStartSpecificAvroClient(clonedClientConfig); }); } + + // Helper function with common code for retrieving StoreConfig from meta system store. + protected StoreConfig getStoreConfigFromMetaSystemStore(String storeName) { + StoreClusterConfig clusterConfig = getStoreMetaValue( + storeName, + MetaStoreDataType.STORE_CLUSTER_CONFIG + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig; + return new StoreConfig(clusterConfig); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java index 8477945c160..dfed0eb4f51 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java @@ -1,6 +1,8 @@ package com.linkedin.davinci.repository; import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID; +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; @@ -10,10 +12,12 @@ import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.schemas.TestKeyRecord; import com.linkedin.venice.client.store.schemas.TestValueRecord; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.system.store.MetaStoreDataType; import com.linkedin.venice.systemstore.schemas.StoreKeySchemas; import com.linkedin.venice.systemstore.schemas.StoreMetaKey; @@ -25,6 +29,7 @@ import io.tehuti.Metric; import io.tehuti.metrics.MetricsRepository; import java.time.Clock; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -34,7 +39,8 @@ public class NativeMetadataRepositoryTest { - private ClientConfig clientConfig; + private ClientConfig clientConfigThinClient; + private ClientConfig clientConfigRequestBased; private VeniceProperties backendConfig; private MetricsRepository metricsRepository; private Clock clock; @@ -42,19 +48,22 @@ public class NativeMetadataRepositoryTest { @BeforeMethod public void setUpMocks() { - clientConfig = mock(ClientConfig.class); + clientConfigThinClient = mock(ClientConfig.class); + clientConfigRequestBased = mock(ClientConfig.class); backendConfig = mock(VeniceProperties.class); doReturn(1L).when(backendConfig).getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); metricsRepository = new MetricsRepository(); - doReturn(metricsRepository).when(clientConfig).getMetricsRepository(); + doReturn(metricsRepository).when(clientConfigThinClient).getMetricsRepository(); + doReturn(metricsRepository).when(clientConfigRequestBased).getMetricsRepository(); + doReturn(true).when(clientConfigRequestBased).isUseRequestBasedMetaRepository(); clock = mock(Clock.class); doReturn(0L).when(clock).millis(); } @Test - public void testGetInstance() { + public void testGetThinClientInstance() { NativeMetadataRepository nativeMetadataRepository = - NativeMetadataRepository.getInstance(clientConfig, backendConfig); + NativeMetadataRepository.getInstance(clientConfigThinClient, backendConfig); Assert.assertTrue(nativeMetadataRepository instanceof ThinClientMetaStoreBasedRepository); Assert.assertThrows(() -> nativeMetadataRepository.subscribe(STORE_NAME)); @@ -64,9 +73,22 @@ public void testGetInstance() { Assert.assertThrows(() -> nativeMetadataRepository.start()); } + @Test + public void testGetRequestBasedInstance() { + NativeMetadataRepository nativeMetadataRepository = + NativeMetadataRepository.getInstance(clientConfigRequestBased, backendConfig); + Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository); + + Assert.assertThrows(() -> nativeMetadataRepository.subscribe(STORE_NAME)); + nativeMetadataRepository.start(); + nativeMetadataRepository.clear(); + Assert.assertThrows(() -> nativeMetadataRepository.subscribe(STORE_NAME)); + Assert.assertThrows(() -> nativeMetadataRepository.start()); + } + @Test public void testGetSchemaDataFromReadThroughCache() throws InterruptedException { - TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock); + TestNMR nmr = new TestNMR(clientConfigThinClient, backendConfig, clock); nmr.start(); Assert.assertThrows(VeniceNoStoreException.class, () -> nmr.getKeySchema(STORE_NAME)); nmr.subscribe(STORE_NAME); @@ -77,7 +99,7 @@ public void testGetSchemaDataFromReadThroughCache() throws InterruptedException public void testGetSchemaDataEfficiently() throws InterruptedException { doReturn(Long.MAX_VALUE).when(backendConfig) .getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); - TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock); + TestNMR nmr = new TestNMR(clientConfigThinClient, backendConfig, clock); nmr.start(); Assert.assertEquals(nmr.keySchemaRequestCount, 0); Assert.assertEquals(nmr.valueSchemasRequestCount, 0); @@ -115,7 +137,7 @@ public void testGetSchemaDataEfficiently() throws InterruptedException { public void testNativeMetadataRepositoryStats() throws InterruptedException { doReturn(Long.MAX_VALUE).when(backendConfig) .getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); - TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock); + TestNMR nmr = new TestNMR(clientConfigThinClient, backendConfig, clock); nmr.start(); nmr.subscribe(STORE_NAME); doReturn(1000L).when(clock).millis(); @@ -159,14 +181,14 @@ protected TestNMR(ClientConfig clientConfig, VeniceProperties backendConfig, Clo } @Override - protected StoreConfig getStoreConfigFromSystemStore(String storeName) { + protected StoreConfig fetchStoreConfigFromRemote(String storeName) { StoreConfig storeConfig = mock(StoreConfig.class); when(storeConfig.isDeleting()).thenReturn(false); return storeConfig; } @Override - protected Store getStoreFromSystemStore(String storeName, String clusterName) { + protected Store fetchStoreFromRemote(String storeName, String clusterName) { Store store = mock(Store.class); when(store.getName()).thenReturn(storeName); when(store.getReadQuotaInCU()).thenReturn(1L); @@ -174,6 +196,55 @@ protected Store getStoreFromSystemStore(String storeName, String clusterName) { } @Override + protected SchemaData getSchemaData(String storeName) { + SchemaData schemaData = schemaMap.get(storeName); + SchemaEntry keySchema; + if (schemaData == null) { + // Retrieve the key schema and initialize SchemaData only if it's not cached yet. + StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + Map keySchemaMap = + getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap; + if (keySchemaMap.isEmpty()) { + throw new VeniceException("No key schema found for store: " + storeName); + } + Map.Entry keySchemaEntry = keySchemaMap.entrySet().iterator().next(); + keySchema = + new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()); + schemaData = new SchemaData(storeName, keySchema); + } + StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + Map valueSchemaMap = + getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap; + // Check the value schema string, if it's empty then try to query the other key space for individual value schema. + for (Map.Entry entry: valueSchemaMap.entrySet()) { + // Check if we already have the corresponding value schema + int valueSchemaId = Integer.parseInt(entry.getKey().toString()); + if (schemaData.getValueSchema(valueSchemaId) != null) { + continue; + } + if (entry.getValue().toString().isEmpty()) { + // The value schemas might be too large to be stored in a single K/V. + StoreMetaKey individualValueSchemaKey = + MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap() { + { + put(KEY_STRING_STORE_NAME, storeName); + put(KEY_STRING_SCHEMA_ID, entry.getKey().toString()); + } + }); + + // Empty string isn't a valid schema, so it's safe to throw if missing in the key space. + String valueSchema = + getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString(); + schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema)); + } else { + schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString())); + } + } + return schemaData; + } + protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { StoreMetaValue storeMetaValue = new StoreMetaValue(); MetaStoreDataType metaStoreDataType = MetaStoreDataType.valueOf(key.metadataType); @@ -202,10 +273,5 @@ protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { } return storeMetaValue; } - - @Override - protected SchemaData getSchemaDataFromSystemStore(String storeName) { - return getSchemaDataFromMetaSystemStore(storeName); - } } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java new file mode 100644 index 00000000000..c4fa9a91d68 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java @@ -0,0 +1,80 @@ +package com.linkedin.davinci.repository; + +import static com.linkedin.venice.ConfigKeys.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import com.linkedin.d2.balancer.D2Client; +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.transport.D2TransportClient; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.VeniceProperties; +import io.tehuti.metrics.MetricsRepository; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class RequestBasedMetaRepositoryTest { + private ClientConfig clientConfig; + private VeniceProperties backendConfig; + private MetricsRepository metricsRepository; + private static final String STORE_NAME = "hardware_store"; + + private D2TransportClient d2TransportClient; + private D2Client mockD2Client; + + @BeforeMethod + public void setUpMocks() { + clientConfig = mock(ClientConfig.class); + backendConfig = mock(VeniceProperties.class); + doReturn(1L).when(backendConfig).getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); + metricsRepository = new MetricsRepository(); + doReturn(metricsRepository).when(clientConfig).getMetricsRepository(); + doReturn(true).when(clientConfig).isUseRequestBasedMetaRepository(); + } + + @Test + public void testRequestBasedMetaRepositoryFetchStoreConfigFromRemote() { + RequestBasedMetaRepository requestBasedMetaRepository = mock(RequestBasedMetaRepository.class); + + // Mock D2 + mockD2Client = mock(D2Client.class); + d2TransportClient = new D2TransportClient(STORE_NAME, mockD2Client); + when(requestBasedMetaRepository.getD2TransportClient(STORE_NAME)).thenReturn(d2TransportClient); + + // Test fetchStoreConfigFromRemote + when(requestBasedMetaRepository.fetchStoreConfigFromRemote(STORE_NAME)).thenCallRealMethod(); + StoreConfig storeConfig = requestBasedMetaRepository.fetchStoreConfigFromRemote(STORE_NAME); + Assert.assertNotNull(storeConfig); + Assert.assertEquals(storeConfig.getCluster(), STORE_NAME); + Assert.assertEquals(storeConfig.getStoreName(), STORE_NAME); + Assert.assertNull(storeConfig.getMigrationDestCluster()); + Assert.assertNull(storeConfig.getMigrationSrcCluster()); + } + + @Test + public void testRequestBasedMetaRepositoryFetchStoreFromRemote() { + RequestBasedMetaRepository requestBasedMetaRepository = mock(RequestBasedMetaRepository.class); + + // Mock D2 + mockD2Client = mock(D2Client.class); + d2TransportClient = new D2TransportClient(STORE_NAME, mockD2Client); + when(requestBasedMetaRepository.getD2TransportClient(STORE_NAME)).thenReturn(d2TransportClient); + + // Mock request + Store testStore = TestUtils.createTestStore(STORE_NAME, STORE_NAME, 100); + when(requestBasedMetaRepository.fetchAndCacheStorePropertiesResponseRecord(STORE_NAME)).thenReturn(); + + // Test fetchStoreConfigFromRemote + when(requestBasedMetaRepository.fetchStoreConfigFromRemote(STORE_NAME)).thenCallRealMethod(); + StoreConfig storeConfig = requestBasedMetaRepository.fetchStoreConfigFromRemote(STORE_NAME); + Assert.assertNotNull(storeConfig); + Assert.assertEquals(storeConfig.getCluster(), STORE_NAME); + Assert.assertEquals(storeConfig.getStoreName(), STORE_NAME); + Assert.assertEquals(storeConfig.getMigrationDestCluster(), null); + Assert.assertEquals(storeConfig.getMigrationSrcCluster(), null); + } +} diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java index fe562fbc547..383e53e8fe8 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java @@ -65,6 +65,9 @@ public class ClientConfig { // HttpTransport settings private int maxConnectionsPerRoute; // only for HTTP1 + // NativeMetadataRepository settings + private boolean useRequestBasedMetaRepository = false; + private int maxConnectionsTotal; // only for HTTP1 private boolean httpClient5Http2Enabled; @@ -102,7 +105,7 @@ public static ClientConfig cloneConfig(ClientConfi .setD2ZkTimeout(config.getD2ZkTimeout()) .setD2Client(config.getD2Client()) .setD2Routing(config.isD2Routing()) // This should be the last of the D2 configs since it is an inferred config - // and we want the cloned config to match the source config + // and we want the cloned config to match the source config // Performance-related settings .setMetricsRepository(config.getMetricsRepository()) @@ -279,6 +282,15 @@ public ClientConfig setMaxConnectionsPerRoute(int maxConnectionsPerRoute) { return this; } + public boolean isUseRequestBasedMetaRepository() { + return useRequestBasedMetaRepository; + } + + public ClientConfig setUseRequestBasedMetaRepository(boolean useRequestBasedMetaRepository) { + this.useRequestBasedMetaRepository = useRequestBasedMetaRepository; + return this; + } + public int getMaxConnectionsTotal() { return maxConnectionsTotal; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index 304eeaabae4..7d2fc390685 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -924,7 +924,7 @@ public StoreProperties cloneStoreProperties() { storeProperties.setBootstrapToOnlineTimeoutInHours(getBootstrapToOnlineTimeoutInHours()); // storeProperties.setLeaderFollowerModelEnabled(isLeaderFollowerModelEnabled()); storeProperties.setNativeReplicationEnabled(isNativeReplicationEnabled()); - // storeProperties.setReplicationMetadataVersionID(getReplicationMetadataVersionID()); + storeProperties.setReplicationMetadataVersionID(getRmdVersion()); storeProperties.setPushStreamSourceAddress(getPushStreamSourceAddress()); storeProperties.setBackupStrategy(getBackupStrategy().getValue()); storeProperties.setSchemaAutoRegisteFromPushJobEnabled(isSchemaAutoRegisterFromPushJobEnabled()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java index 57c667329c4..7dc55dc7b34 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java @@ -15,6 +15,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.repository.NativeMetadataRepository; +import com.linkedin.davinci.repository.RequestBasedMetaRepository; import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.client.store.AvroSpecificStoreClient; @@ -38,6 +39,7 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.system.store.MetaStoreDataType; @@ -287,6 +289,42 @@ public void testThinClientMetaStoreBasedRepository() throws InterruptedException } } + @Test(timeOut = 120 * Time.MS_PER_SECOND) + public void testRequestBasedMetaStoreBasedRepository() throws InterruptedException { + String regularVeniceStoreName = Utils.getUniqueString("venice_store"); + createStoreAndMaterializeMetaSystemStore(regularVeniceStoreName); + D2Client d2Client = null; + NativeMetadataRepository nativeMetadataRepository = null; + try { + d2Client = D2TestUtils.getAndStartD2Client(veniceLocalCluster.getZk().getAddress()); + ClientConfig clientConfig = + getClientConfig(regularVeniceStoreName, d2Client).setUseRequestBasedMetaRepository(true); + // Not providing a CLIENT_META_SYSTEM_STORE_VERSION_MAP, should use the default value of 1 for system store + // current version. + VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + nativeMetadataRepository = NativeMetadataRepository.getInstance(clientConfig, backendConfig); + nativeMetadataRepository.start(); + // ThinClientMetaStoreBasedRepository implementation should be used since CLIENT_USE_META_SYSTEM_STORE_REPOSITORY + // is set to true without enabling other feature flags. + Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository); + verifyRepository(nativeMetadataRepository, regularVeniceStoreName); + } finally { + if (d2Client != null) { + D2ClientUtils.shutdownClient(d2Client); + } + if (nativeMetadataRepository != null) { + // Calling clear explicitly here because if the NativeMetadataRepository implementation used happens to + // initialize + // a new DaVinciBackend then calling clear will trigger the cleanup logic to ensure the DaVinciBackend is not + // leaked + // into other tests. + nativeMetadataRepository.clear(); + } + } + } + @Test(timeOut = 60 * Time.MS_PER_SECOND) public void testThinClientMetaStoreBasedRepositoryWithLargeValueSchemas() throws InterruptedException { String regularVeniceStoreName = Utils.getUniqueString("venice_store"); @@ -393,12 +431,14 @@ private void verifyRepository(NativeMetadataRepository nativeMetadataRepository, assertNull(nativeMetadataRepository.getStore("Non-existing-store")); expectThrows(VeniceNoStoreException.class, () -> nativeMetadataRepository.getStoreOrThrow("Non-existing-store")); expectThrows(VeniceNoStoreException.class, () -> nativeMetadataRepository.subscribe("Non-existing-store")); - nativeMetadataRepository.subscribe(regularVeniceStoreName); - Store store = nativeMetadataRepository.getStore(regularVeniceStoreName); - Store controllerStore = new ReadOnlyStore( - veniceLocalCluster.getLeaderVeniceController().getVeniceAdmin().getStore(clusterName, regularVeniceStoreName)); - assertEquals(store, controllerStore); + Store store = normalizeStore(new ReadOnlyStore(nativeMetadataRepository.getStore(regularVeniceStoreName))); + Store controllerStore = normalizeStore( + new ReadOnlyStore( + veniceLocalCluster.getLeaderVeniceController() + .getVeniceAdmin() + .getStore(clusterName, regularVeniceStoreName))); + assertEquals(store.toString(), controllerStore.toString()); SchemaEntry keySchema = nativeMetadataRepository.getKeySchema(regularVeniceStoreName); SchemaEntry controllerKeySchema = veniceLocalCluster.getLeaderVeniceController() .getVeniceAdmin() @@ -423,9 +463,9 @@ private void verifyRepository(NativeMetadataRepository nativeMetadataRepository, assertEquals(nativeRepoStore.getStorageQuotaInByte(), storageQuota); }); assertFalse(controllerClient.addValueSchema(regularVeniceStoreName, VALUE_SCHEMA_2).isError()); - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + TestUtils.waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> { assertEquals( - nativeMetadataRepository.getValueSchemas(regularVeniceStoreName), + nativeMetadataRepository.getValueSchemas(regularVeniceStoreName), // this does not retry, only executed onces veniceLocalCluster.getLeaderVeniceController() .getVeniceAdmin() .getValueSchemas(clusterName, regularVeniceStoreName)); @@ -448,6 +488,10 @@ private void verifyRepository(NativeMetadataRepository nativeMetadataRepository, }); } + private Store normalizeStore(ReadOnlyStore store) { + return new ReadOnlyStore(new ZKStore(store.cloneStoreProperties())); + } + private void createStoreAndMaterializeMetaSystemStore(String storeName) { createStoreAndMaterializeMetaSystemStore(storeName, VALUE_SCHEMA_1); } @@ -455,8 +499,17 @@ private void createStoreAndMaterializeMetaSystemStore(String storeName) { private void createStoreAndMaterializeMetaSystemStore(String storeName, String valueSchema) { // Verify and create Venice regular store if it doesn't exist. if (parentControllerClient.getStore(storeName).getStore() == null) { - assertFalse( - parentControllerClient.createNewStore(storeName, "test_owner", INT_KEY_SCHEMA, valueSchema).isError()); + NewStoreResponse resp = + parentControllerClient.createNewStore(storeName, "test_owner", INT_KEY_SCHEMA, valueSchema); + if (resp.isError()) { + System.out.println("Create new store failed: " + resp.getError()); + } + assertFalse(resp.isError()); + assertFalse(parentControllerClient.emptyPush(storeName, "test-push-job", 100).isError()); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + } } String metaSystemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); TestUtils.waitForNonDeterministicPushCompletion( diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java index 878e231566e..39fdaacce0d 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java @@ -180,7 +180,7 @@ public StorePropertiesResponse getStoreProperties(String storeName, Optional entry: valueSchemas.entrySet()) { int schemaId = Integer.parseInt(entry.getKey().toString()); if (schemaId > largestKnownSchemaId.get()) { - storeValueSchemas.put(schemaId, entry.getValue()); + storeValueSchemas.valueSchemaMap.put(Integer.toString(schemaId), entry.getValue()); } } } else { diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java index 3cd9de7ea97..4bd0ea0f926 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java @@ -158,38 +158,65 @@ public void testGetStoreProperties() { doReturn(readyToServeInstances).when(partition).getReadyToServeInstances(); partitionAssignment.addPartition(partition); String schema = "\"string\""; + ArrayList valueSchemas = new ArrayList<>(); + final int schemaCount = 3; + for (int i = 1; i <= schemaCount; i++) { + valueSchemas.add(new SchemaEntry(i, schema)); + } doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName); Mockito.when(mockSchemaRepo.getKeySchema(storeName)).thenReturn(new SchemaEntry(0, schema)); - Mockito.when(mockSchemaRepo.getValueSchemas(storeName)) - .thenReturn(Collections.singletonList(new SchemaEntry(0, schema))); + Mockito.when(mockSchemaRepo.getValueSchemas(storeName)).thenReturn(valueSchemas); Mockito.when(mockCustomizedViewRepository.getPartitionAssignments(topicName)).thenReturn(partitionAssignment); Mockito.when(mockHelixInstanceConfigRepository.getInstanceGroupIdMapping()).thenReturn(Collections.emptyMap()); - mockStore.setStorageNodeReadQuotaEnabled(true); + + // Request StorePropertiesResponse storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.empty()); + + // Assert response Assert.assertNotNull(storePropertiesResponse); Assert.assertNotNull(storePropertiesResponse.getResponseRecord()); Assert.assertNotNull(storePropertiesResponse.getResponseRecord().getStoreMetaValue()); Assert.assertEquals( storePropertiesResponse.getResponseRecord().getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().get("0"), "\"string\""); - // Verify the metadata Assert.assertEquals( storePropertiesResponse.getResponseRecord().getStoreMetaValue().getStoreProperties().getVersions().size(), 2); Assert.assertEquals(storePropertiesResponse.getResponseRecord().getRoutingInfo().get("0").size(), 1); + ServerCurrentVersionResponse currentVersionResponse = + serverReadMetadataRepository.getCurrentVersionResponse(storeName); + Assert.assertNotNull(currentVersionResponse); + Assert.assertEquals(currentVersionResponse.getCurrentVersion(), 2); + // Assert metrics repo String metadataInvokeMetricName = ".ServerMetadataStats--request_based_metadata_invoke_count.Rate"; String metadataFailureMetricName = ".ServerMetadataStats--request_based_metadata_failure_count.Rate"; Assert.assertTrue(metricsRepository.getMetric(metadataInvokeMetricName).value() > 0); Assert.assertEquals(metricsRepository.getMetric(metadataFailureMetricName).value(), 0d); - ServerCurrentVersionResponse currentVersionResponse = - serverReadMetadataRepository.getCurrentVersionResponse(storeName); - Assert.assertNotNull(currentVersionResponse); - Assert.assertEquals(currentVersionResponse.getCurrentVersion(), 2); + // Test largestKnownSchemaID param + for (int i = 0; i <= schemaCount; i++) { + storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.of(i)); + Assert.assertEquals( + storePropertiesResponse.getResponseRecord() + .getStoreMetaValue() + .getStoreValueSchemas() + .getValueSchemaMap() + .size(), + schemaCount - i); + } + storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.empty()); + Assert.assertEquals( + storePropertiesResponse.getResponseRecord() + .getStoreMetaValue() + .getStoreValueSchemas() + .getValueSchemaMap() + .size(), + schemaCount); + // Value update test mockStore.setBatchGetLimit(300); storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.empty()); Assert.assertEquals(