diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index d9fb4ae76d..b7f3e14bb3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -1,5 +1,6 @@ package com.linkedin.davinci; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL; @@ -216,6 +217,7 @@ public DaVinciBackend( String pid = Utils.getPid(); String instanceSuffix = configLoader.getCombinedProperties().getString(PUSH_STATUS_INSTANCE_NAME_SUFFIX, (pid == null ? "NA" : pid)); + // Current instance name. String instanceName = Utils.getHostName() + "_" + instanceSuffix; // Fetch latest update schema's protocol ID for Push Status Store from Router. @@ -401,7 +403,7 @@ private Function functionToCheckWhetherStorageEngineShouldBeKep private synchronized void bootstrap() { List storageEngines = - storageService.getStorageEngineRepository().getAllLocalStorageEngines(); + getStorageService().getStorageEngineRepository().getAllLocalStorageEngines(); LOGGER.info("Starting bootstrap, storageEngines: {}", storageEngines); Map storeNameToBootstrapVersionMap = new HashMap<>(); Map> storeNameToPartitionListMap = new HashMap<>(); @@ -440,7 +442,7 @@ private synchronized void bootstrap() { if (!(storeNameToBootstrapVersionMap.containsKey(storeName) && (storeNameToBootstrapVersionMap.get(storeName).getNumber() < versionNumber))) { storeNameToBootstrapVersionMap.put(storeName, version); - storeNameToPartitionListMap.put(storeName, storageService.getUserPartitions(kafkaTopicName)); + storeNameToPartitionListMap.put(storeName, getStorageService().getUserPartitions(kafkaTopicName)); } } @@ -455,12 +457,12 @@ private synchronized void bootstrap() { * In this case we will only need to close metadata partition, as it is supposed to be opened and managed by * forked ingestion process via following subscribe call. */ - for (AbstractStorageEngine storageEngine: storageService.getStorageEngineRepository() + for (AbstractStorageEngine storageEngine: getStorageService().getStorageEngineRepository() .getAllLocalStorageEngines()) { storageEngine.closeMetadataPartition(); } } else { - storageService.closeAllStorageEngines(); + getStorageService().closeAllStorageEngines(); } } @@ -470,27 +472,29 @@ private synchronized void bootstrap() { metricsRepository, storageMetadataService, ingestionService, - storageService, + getStorageService(), blobTransferManager, this::getVeniceCurrentVersionNumber) : new DefaultIngestionBackend( storageMetadataService, ingestionService, - storageService, + getStorageService(), blobTransferManager, configLoader.getVeniceServerConfig()); ingestionBackend.addIngestionNotifier(ingestionListener); - // Subscribe all bootstrap version partitions. - storeNameToBootstrapVersionMap.forEach((storeName, version) -> { - List partitions = storeNameToPartitionListMap.get(storeName); - String versionTopic = version.kafkaTopicName(); - LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic); - AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); - aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); - StoreBackend storeBackend = getStoreOrThrow(storeName); - storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); - }); + if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) { + // Subscribe all bootstrap version partitions. + storeNameToBootstrapVersionMap.forEach((storeName, version) -> { + List partitions = storeNameToPartitionListMap.get(storeName); + String versionTopic = version.kafkaTopicName(); + LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic); + AbstractStorageEngine storageEngine = getStorageService().getStorageEngine(versionTopic); + aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); + StoreBackend storeBackend = getStoreOrThrow(storeName); + storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); + }); + } } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index 635bcb1d8d..8287b4ee1c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -108,6 +108,10 @@ public StoreBackendStats getStats() { return stats; } + public ComplementSet getSubscription() { + return subscription; + } + public ReferenceCounted getDaVinciCurrentVersion() { return daVinciCurrentVersionRef.get(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index 181e6c3cff..6b4e1d3e15 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -10,6 +10,7 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT; import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS; @@ -716,6 +717,7 @@ private VeniceConfigLoader buildVeniceConfig() { .put(CLUSTER_NAME, clusterName) .put(ZOOKEEPER_ADDRESS, zkAddress) .put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers) + .put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true) .put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK) .put(INGESTION_USE_DA_VINCI_CLIENT, true) .put(RECORD_TRANSFORMER_VALUE_SCHEMA, recordTransformerOutputValueSchema) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 2bab2878d8..06a9f67139 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -1,23 +1,47 @@ package com.linkedin.davinci; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER; import static com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR; import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN; import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.linkedin.davinci.config.VeniceConfigLoader; +import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; +import com.linkedin.davinci.stats.AggVersionedStorageEngineStats; +import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageService; +import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.venice.exceptions.DiskLimitExhaustedException; import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.utils.ComplementSet; +import com.linkedin.venice.utils.VeniceProperties; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -139,4 +163,100 @@ public void testBootstrappingAwareCompletableFuture() verify(backend, times(2)).hasCurrentVersionBootstrapping(); } + @Test + public void testBootstrappingSubscription() + throws IllegalAccessException, NoSuchFieldException, NoSuchMethodException, InvocationTargetException { + DaVinciBackend backend = mock(DaVinciBackend.class); + StorageService mockStorageService = mock(StorageService.class); + + StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); + AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class); + mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine); + String resourceName = "test_store_v1"; + when(abstractStorageEngine.getStoreVersionName()).thenReturn(resourceName); + + abstractStorageEngine.addStoragePartition(0); + abstractStorageEngine.addStoragePartition(1); + + List localStorageEngines = new ArrayList<>(); + localStorageEngines.add(abstractStorageEngine); + + when(backend.getStorageService()).thenReturn(mockStorageService); + when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository); + when(mockStorageService.getStorageEngine(resourceName)).thenReturn(abstractStorageEngine); + when(mockStorageEngineRepository.getAllLocalStorageEngines()).thenReturn(localStorageEngines); + when(backend.isIsolatedIngestion()).thenReturn(false); + + List userPartitionList = new ArrayList<>(); + userPartitionList.add(0); + userPartitionList.add(1); + userPartitionList.add(2); + when(mockStorageService.getUserPartitions(resourceName)).thenReturn(userPartitionList); + + HashSet backendSubscription = new HashSet<>(); + backendSubscription.add(0); + backendSubscription.add(1); + + StoreBackend mockStoreBackend = mock(StoreBackend.class); + when(backend.getStoreOrThrow(Version.parseStoreFromKafkaTopicName(resourceName))).thenReturn(mockStoreBackend); + ComplementSet backendSubscriptionSet = ComplementSet.wrap(backendSubscription); + when(mockStoreBackend.getSubscription()).thenReturn(backendSubscriptionSet); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + ComplementSet partitions = invocation.getArgument(0); + mockStoreBackend.getSubscription().addAll(partitions); + return null; + } + }).when(mockStoreBackend).subscribe(any(), any()); + + Version mockVersion = mock(Version.class); + Store mockStore = mock(Store.class); + SubscriptionBasedReadOnlyStoreRepository mockStoreRepository = mock(SubscriptionBasedReadOnlyStoreRepository.class); + Field storeRepositoryField = DaVinciBackend.class.getDeclaredField("storeRepository"); + storeRepositoryField.setAccessible(true); + storeRepositoryField.set(backend, mockStoreRepository); + when(mockStoreRepository.getStoreOrThrow(Version.parseStoreFromKafkaTopicName(resourceName))).thenReturn(mockStore); + when(mockStore.getVersion(Version.parseVersionFromKafkaTopicName(resourceName))).thenReturn(mockVersion); + + VeniceConfigLoader mockConfigLoader = mock(VeniceConfigLoader.class); + Field configLoaderField = DaVinciBackend.class.getDeclaredField("configLoader"); + configLoaderField.setAccessible(true); + configLoaderField.set(backend, mockConfigLoader); + VeniceProperties mockCombinedProperties = mock(VeniceProperties.class); + when(mockConfigLoader.getCombinedProperties()).thenReturn(mockCombinedProperties); + + AggVersionedStorageEngineStats mockAggVersionedStorageEngineStats = mock(AggVersionedStorageEngineStats.class); + Field aggVersionedStorageEngineStatsField = DaVinciBackend.class.getDeclaredField("aggVersionedStorageEngineStats"); + aggVersionedStorageEngineStatsField.setAccessible(true); + aggVersionedStorageEngineStatsField.set(backend, mockAggVersionedStorageEngineStats); + + KafkaStoreIngestionService storeIngestionService = mock(KafkaStoreIngestionService.class); + Field ingestionServiceField = DaVinciBackend.class.getDeclaredField("ingestionService"); + ingestionServiceField.setAccessible(true); + ingestionServiceField.set(backend, storeIngestionService); + + Method bootstrapMethod = DaVinciBackend.class.getDeclaredMethod("bootstrap"); + bootstrapMethod.setAccessible(true); + + // DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY == false + when(mockCombinedProperties.getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) + .thenReturn(false); + bootstrapMethod.invoke(backend); + + ComplementSet subscription = mockStoreBackend.getSubscription(); + assertTrue(subscription.contains(0)); + assertTrue(subscription.contains(1)); + assertFalse(subscription.contains(2)); + + // DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY == true + when(mockCombinedProperties.getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)).thenReturn(true); + bootstrapMethod.invoke(backend); + + subscription = mockStoreBackend.getSubscription(); + assertTrue(subscription.contains(0)); + assertTrue(subscription.contains(1)); + assertTrue(subscription.contains(2)); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 145e73c5c4..46ce6afa9a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -97,6 +97,10 @@ private ConfigKeys() { public static final String DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND = "da.vinci.current.version.bootstrapping.quota.bytes.per.second"; + // On Da Vinci Client, control over automatic partition subscription. + public static final String DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY = + "da.vinci.subscribe.on.disk.partitions.automatically"; + // Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used. public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND = "kafka.fetch.quota.unordered.bytes.per.second"; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index d59b2503d6..6dec9463d8 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -11,6 +11,7 @@ import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; @@ -45,7 +46,9 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.d2.balancer.D2ClientBuilder; +import com.linkedin.davinci.DaVinciBackend; import com.linkedin.davinci.DaVinciUserApp; +import com.linkedin.davinci.StoreBackend; import com.linkedin.davinci.client.AvroGenericDaVinciClient; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; @@ -85,6 +88,7 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; +import com.linkedin.venice.utils.ComplementSet; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.ForkedJavaProcess; import com.linkedin.venice.utils.IntegrationTestPushUtils; @@ -988,6 +992,75 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { } } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Exception { + String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); + VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .put(DATA_BASE_PATH, baseDataPath) + .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true) + .put(PUSH_STATUS_STORE_ENABLED, true) + .put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000) + .put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, false) + .build(); + + MetricsRepository metricsRepository = new MetricsRepository(); + + // Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig)) { + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig); + + // Test non-existent key access + client1.subscribeAll().get(); + assertNull(client1.get(KEY_COUNT + 1).get()); + + // Test single-get access + Map keyValueMap = new HashMap<>(); + for (int k = 0; k < KEY_COUNT; ++k) { + assertEquals(client1.get(k).get(), 1); + keyValueMap.put(k, 1); + } + + // Test batch-get access + assertEquals(client1.batchGet(keyValueMap.keySet()).get(), keyValueMap); + } + + // Test managed clients + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig, + Optional.of(Collections.singleton(storeName1)))) { + + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig); + + Set partitions = new HashSet<>(); + + for (int i = 0; i < 2; i++) { + partitions.add(i); + } + + client1.subscribe(partitions); + assertEquals(client1.getPartitionCount(), 3); + + DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); + if (daVinciBackend != null) { + StoreBackend storeBackend = daVinciBackend.getStoreOrThrow(storeName1); + ComplementSet subscription = storeBackend.getSubscription(); + assertTrue(subscription.contains(0)); + assertTrue(subscription.contains(1)); + assertFalse(subscription.contains(2)); + } + } + } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) public void testPartialSubscription(DaVinciConfig daVinciConfig) throws Exception { String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);