Skip to content

Commit

Permalink
[dvc] Flag enablement for automatic subscription of partitions (#1332)
Browse files Browse the repository at this point in the history
In this PR, to make sure the correct partitions are subscribed/added to the subscription set in processes of Da Vinci
Client, a code change is introduced to include a flag (DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY)
for enabling/confirming automatic subscription of on-disk partitions. This flag is added to the config map in the
VeniceConfigLoader. Flag value is checked during bootstrapping in DaVinciBackend in case automatic subscription is not
wanted, so that subscription occurs correctly.
Applying this issue to Da Vinci Client which has information over partitions in storage.

DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY is by default set to true, where in DaVinciBackend the
subscription by default occurs.

This PR is an extension of the PR #1196 that checks which partitions should be kept [in StorageService] and applies the
check in VeniceServer.

Resolves #650
  • Loading branch information
kristyelee authored Feb 11, 2025
1 parent 26312d1 commit 4488e72
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci;

import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX;
import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL;
Expand Down Expand Up @@ -216,6 +217,7 @@ public DaVinciBackend(
String pid = Utils.getPid();
String instanceSuffix =
configLoader.getCombinedProperties().getString(PUSH_STATUS_INSTANCE_NAME_SUFFIX, (pid == null ? "NA" : pid));
// Current instance name.
String instanceName = Utils.getHostName() + "_" + instanceSuffix;

// Fetch latest update schema's protocol ID for Push Status Store from Router.
Expand Down Expand Up @@ -401,7 +403,7 @@ private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKep

private synchronized void bootstrap() {
List<AbstractStorageEngine> storageEngines =
storageService.getStorageEngineRepository().getAllLocalStorageEngines();
getStorageService().getStorageEngineRepository().getAllLocalStorageEngines();
LOGGER.info("Starting bootstrap, storageEngines: {}", storageEngines);
Map<String, Version> storeNameToBootstrapVersionMap = new HashMap<>();
Map<String, List<Integer>> storeNameToPartitionListMap = new HashMap<>();
Expand Down Expand Up @@ -440,7 +442,7 @@ private synchronized void bootstrap() {
if (!(storeNameToBootstrapVersionMap.containsKey(storeName)
&& (storeNameToBootstrapVersionMap.get(storeName).getNumber() < versionNumber))) {
storeNameToBootstrapVersionMap.put(storeName, version);
storeNameToPartitionListMap.put(storeName, storageService.getUserPartitions(kafkaTopicName));
storeNameToPartitionListMap.put(storeName, getStorageService().getUserPartitions(kafkaTopicName));
}
}

Expand All @@ -455,12 +457,12 @@ private synchronized void bootstrap() {
* In this case we will only need to close metadata partition, as it is supposed to be opened and managed by
* forked ingestion process via following subscribe call.
*/
for (AbstractStorageEngine storageEngine: storageService.getStorageEngineRepository()
for (AbstractStorageEngine storageEngine: getStorageService().getStorageEngineRepository()
.getAllLocalStorageEngines()) {
storageEngine.closeMetadataPartition();
}
} else {
storageService.closeAllStorageEngines();
getStorageService().closeAllStorageEngines();
}
}

Expand All @@ -470,27 +472,29 @@ private synchronized void bootstrap() {
metricsRepository,
storageMetadataService,
ingestionService,
storageService,
getStorageService(),
blobTransferManager,
this::getVeniceCurrentVersionNumber)
: new DefaultIngestionBackend(
storageMetadataService,
ingestionService,
storageService,
getStorageService(),
blobTransferManager,
configLoader.getVeniceServerConfig());
ingestionBackend.addIngestionNotifier(ingestionListener);

// Subscribe all bootstrap version partitions.
storeNameToBootstrapVersionMap.forEach((storeName, version) -> {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
String versionTopic = version.kafkaTopicName();
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
});
if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) {
// Subscribe all bootstrap version partitions.
storeNameToBootstrapVersionMap.forEach((storeName, version) -> {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
String versionTopic = version.kafkaTopicName();
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
AbstractStorageEngine storageEngine = getStorageService().getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public StoreBackendStats getStats() {
return stats;
}

public ComplementSet<Integer> getSubscription() {
return subscription;
}

public ReferenceCounted<VersionBackend> getDaVinciCurrentVersion() {
return daVinciCurrentVersionRef.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT;
import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT;
import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS;
Expand Down Expand Up @@ -716,6 +717,7 @@ private VeniceConfigLoader buildVeniceConfig() {
.put(CLUSTER_NAME, clusterName)
.put(ZOOKEEPER_ADDRESS, zkAddress)
.put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers)
.put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK)
.put(INGESTION_USE_DA_VINCI_CLIENT, true)
.put(RECORD_TRANSFORMER_VALUE_SCHEMA, recordTransformerOutputValueSchema)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,47 @@
package com.linkedin.davinci;

import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR;
import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN;
import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.stats.AggVersionedStorageEngineStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.VeniceProperties;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -139,4 +163,100 @@ public void testBootstrappingAwareCompletableFuture()
verify(backend, times(2)).hasCurrentVersionBootstrapping();
}

@Test
public void testBootstrappingSubscription()
throws IllegalAccessException, NoSuchFieldException, NoSuchMethodException, InvocationTargetException {
DaVinciBackend backend = mock(DaVinciBackend.class);
StorageService mockStorageService = mock(StorageService.class);

StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class);
AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class);
mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine);
String resourceName = "test_store_v1";
when(abstractStorageEngine.getStoreVersionName()).thenReturn(resourceName);

abstractStorageEngine.addStoragePartition(0);
abstractStorageEngine.addStoragePartition(1);

List<AbstractStorageEngine> localStorageEngines = new ArrayList<>();
localStorageEngines.add(abstractStorageEngine);

when(backend.getStorageService()).thenReturn(mockStorageService);
when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository);
when(mockStorageService.getStorageEngine(resourceName)).thenReturn(abstractStorageEngine);
when(mockStorageEngineRepository.getAllLocalStorageEngines()).thenReturn(localStorageEngines);
when(backend.isIsolatedIngestion()).thenReturn(false);

List<Integer> userPartitionList = new ArrayList<>();
userPartitionList.add(0);
userPartitionList.add(1);
userPartitionList.add(2);
when(mockStorageService.getUserPartitions(resourceName)).thenReturn(userPartitionList);

HashSet<Integer> backendSubscription = new HashSet<>();
backendSubscription.add(0);
backendSubscription.add(1);

StoreBackend mockStoreBackend = mock(StoreBackend.class);
when(backend.getStoreOrThrow(Version.parseStoreFromKafkaTopicName(resourceName))).thenReturn(mockStoreBackend);
ComplementSet<Integer> backendSubscriptionSet = ComplementSet.wrap(backendSubscription);
when(mockStoreBackend.getSubscription()).thenReturn(backendSubscriptionSet);

doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ComplementSet<Integer> partitions = invocation.getArgument(0);
mockStoreBackend.getSubscription().addAll(partitions);
return null;
}
}).when(mockStoreBackend).subscribe(any(), any());

Version mockVersion = mock(Version.class);
Store mockStore = mock(Store.class);
SubscriptionBasedReadOnlyStoreRepository mockStoreRepository = mock(SubscriptionBasedReadOnlyStoreRepository.class);
Field storeRepositoryField = DaVinciBackend.class.getDeclaredField("storeRepository");
storeRepositoryField.setAccessible(true);
storeRepositoryField.set(backend, mockStoreRepository);
when(mockStoreRepository.getStoreOrThrow(Version.parseStoreFromKafkaTopicName(resourceName))).thenReturn(mockStore);
when(mockStore.getVersion(Version.parseVersionFromKafkaTopicName(resourceName))).thenReturn(mockVersion);

VeniceConfigLoader mockConfigLoader = mock(VeniceConfigLoader.class);
Field configLoaderField = DaVinciBackend.class.getDeclaredField("configLoader");
configLoaderField.setAccessible(true);
configLoaderField.set(backend, mockConfigLoader);
VeniceProperties mockCombinedProperties = mock(VeniceProperties.class);
when(mockConfigLoader.getCombinedProperties()).thenReturn(mockCombinedProperties);

AggVersionedStorageEngineStats mockAggVersionedStorageEngineStats = mock(AggVersionedStorageEngineStats.class);
Field aggVersionedStorageEngineStatsField = DaVinciBackend.class.getDeclaredField("aggVersionedStorageEngineStats");
aggVersionedStorageEngineStatsField.setAccessible(true);
aggVersionedStorageEngineStatsField.set(backend, mockAggVersionedStorageEngineStats);

KafkaStoreIngestionService storeIngestionService = mock(KafkaStoreIngestionService.class);
Field ingestionServiceField = DaVinciBackend.class.getDeclaredField("ingestionService");
ingestionServiceField.setAccessible(true);
ingestionServiceField.set(backend, storeIngestionService);

Method bootstrapMethod = DaVinciBackend.class.getDeclaredMethod("bootstrap");
bootstrapMethod.setAccessible(true);

// DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY == false
when(mockCombinedProperties.getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true))
.thenReturn(false);
bootstrapMethod.invoke(backend);

ComplementSet<Integer> subscription = mockStoreBackend.getSubscription();
assertTrue(subscription.contains(0));
assertTrue(subscription.contains(1));
assertFalse(subscription.contains(2));

// DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY == true
when(mockCombinedProperties.getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)).thenReturn(true);
bootstrapMethod.invoke(backend);

subscription = mockStoreBackend.getSubscription();
assertTrue(subscription.contains(0));
assertTrue(subscription.contains(1));
assertTrue(subscription.contains(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ private ConfigKeys() {
public static final String DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND =
"da.vinci.current.version.bootstrapping.quota.bytes.per.second";

// On Da Vinci Client, control over automatic partition subscription.
public static final String DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY =
"da.vinci.subscribe.on.disk.partitions.automatically";

// Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used.
public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND =
"kafka.fetch.quota.unordered.bytes.per.second";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED;
Expand Down Expand Up @@ -45,7 +46,9 @@

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.DaVinciBackend;
import com.linkedin.davinci.DaVinciUserApp;
import com.linkedin.davinci.StoreBackend;
import com.linkedin.davinci.client.AvroGenericDaVinciClient;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
Expand Down Expand Up @@ -85,6 +88,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ForkedJavaProcess;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
Expand Down Expand Up @@ -988,6 +992,75 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception {
}
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class)
public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Exception {
String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath();
VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.put(DATA_BASE_PATH, baseDataPath)
.put(PERSISTENCE_TYPE, ROCKS_DB)
.put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true)
.put(PUSH_STATUS_STORE_ENABLED, true)
.put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000)
.put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, false)
.build();

MetricsRepository metricsRepository = new MetricsRepository();

// Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
metricsRepository,
backendConfig)) {
DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig);

// Test non-existent key access
client1.subscribeAll().get();
assertNull(client1.get(KEY_COUNT + 1).get());

// Test single-get access
Map<Integer, Integer> keyValueMap = new HashMap<>();
for (int k = 0; k < KEY_COUNT; ++k) {
assertEquals(client1.get(k).get(), 1);
keyValueMap.put(k, 1);
}

// Test batch-get access
assertEquals(client1.batchGet(keyValueMap.keySet()).get(), keyValueMap);
}

// Test managed clients
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
metricsRepository,
backendConfig,
Optional.of(Collections.singleton(storeName1)))) {

DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig);

Set<Integer> partitions = new HashSet<>();

for (int i = 0; i < 2; i++) {
partitions.add(i);
}

client1.subscribe(partitions);
assertEquals(client1.getPartitionCount(), 3);

DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend();
if (daVinciBackend != null) {
StoreBackend storeBackend = daVinciBackend.getStoreOrThrow(storeName1);
ComplementSet<Integer> subscription = storeBackend.getSubscription();
assertTrue(subscription.contains(0));
assertTrue(subscription.contains(1));
assertFalse(subscription.contains(2));
}
}
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class)
public void testPartialSubscription(DaVinciConfig daVinciConfig) throws Exception {
String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
Expand Down

0 comments on commit 4488e72

Please sign in to comment.