Skip to content

Commit

Permalink
[controller][server] Remove SIT ready-to-serve check for A/A and non-…
Browse files Browse the repository at this point in the history
…AGG store during L/F transition (linkedin#1409)

Ready-to-serve check happens in two threads today:
(1) Drainer: which is reasonable
(2) SIT thread: there are two sub-cases:

During L/F transition: It is only for empty RT topic, and today we have HB for non-agg and A/A already, so it only applies to the lingering AGG system store. In this PR, it is checked against store data replication type, if it is AGG we will still have it. We will completely remove it once AGG mode is removed.
Before subscribe, it has a pre-check for short circuit to complete, and this is fine, as it is before the subscribe.
Also, I noticed that there are a bunch of integration tests which enables incremental push on NON-AA store. This is totally invalid setup and after removing the additional RTS check, these tests start to fail/very flaky.
To make sure we don't do this in test and prod, this PR added a new check in update store command in parent controller, which will fail loudly if the new update request is to set incremental push to be true on a non-A/A store.
I have another thinking about enabling all the configs that are enabled in production to be enabled in the test suite, but I'd like to do it in another separate PR, so it is easier for reviewers.

Beyonds that, fixed a flaky unit test that can throw NPE.
  • Loading branch information
sixpluszero authored Jan 3, 2025
1 parent c320f15 commit c7e3884
Show file tree
Hide file tree
Showing 19 changed files with 186 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1127,10 +1127,6 @@ protected void leaderExecuteTopicSwitch(
partitionConsumptionState.getOffsetRecord().setLeaderTopic(newSourceTopic);
// Calculate leader offset and start consumption
prepareLeaderOffsetCheckpointAndStartConsumptionAsLeader(newSourceTopic, partitionConsumptionState, true);

// In case new topic is empty and leader can never become online
// TODO: Remove this check after AGG mode is deprecated.
defaultReadyToServeChecker.apply(partitionConsumptionState);
}

/**
Expand All @@ -1155,7 +1151,6 @@ protected boolean processTopicSwitch(
syncTopicSwitchToIngestionMetadataService(topicSwitch, partitionConsumptionState);
if (!isLeader(partitionConsumptionState)) {
partitionConsumptionState.getOffsetRecord().setLeaderTopic(newSourceTopic);
return true;
}
return false;
}
Expand Down Expand Up @@ -1538,7 +1533,6 @@ void prepareLeaderOffsetCheckpointAndStartConsumptionAsLeader(
Set<String> leaderSourceKafkaURLs = getConsumptionSourceKafkaAddress(partitionConsumptionState);
Map<String, Long> leaderOffsetByKafkaURL = new HashMap<>(leaderSourceKafkaURLs.size());
List<CharSequence> unreachableBrokerList = new ArrayList<>();

// TODO: Potentially this logic can be merged into below branch.
if (calculateUpstreamOffsetFromTopicSwitch) {
leaderOffsetByKafkaURL =
Expand All @@ -1560,7 +1554,6 @@ void prepareLeaderOffsetCheckpointAndStartConsumptionAsLeader(
"Failed to reach broker urls {}, will schedule retry to compute upstream offset and resubscribe!",
unreachableBrokerList.toString());
}

// subscribe to the new upstream
leaderOffsetByKafkaURL.forEach((kafkaURL, leaderStartOffset) -> {
consumerSubscribe(leaderTopic, partitionConsumptionState, leaderStartOffset, kafkaURL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ protected void checkLongRunningTaskState() throws InterruptedException {
* In extreme case, if there is no message in real-time topic, there will be no new message after leader switch
* to the real-time topic, so `isReadyToServe()` check will never be invoked.
*/
defaultReadyToServeChecker.apply(partitionConsumptionState);
maybeApplyReadyToServeCheck(partitionConsumptionState);
}
break;

Expand Down Expand Up @@ -1064,7 +1064,7 @@ protected void leaderExecuteTopicSwitch(
upstreamStartOffset);

// In case new topic is empty and leader can never become online
defaultReadyToServeChecker.apply(partitionConsumptionState);
maybeApplyReadyToServeCheck(partitionConsumptionState);
}

protected void syncConsumedUpstreamRTOffsetMapIfNeeded(
Expand Down Expand Up @@ -1326,7 +1326,7 @@ protected boolean processTopicSwitch(
* Real time topic for that partition is empty or the rewind start offset is very closed to the end, followers
* calculate the lag of the leader and decides the lag is small enough.
*/
return true;
return isHybridAggregateMode();
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -2094,6 +2095,7 @@ private void checkConsumptionStateWhenStart(
}
}
}
// This ready-to-serve check is acceptable in SIT thread as it happens before subscription.
if (!isCompletedReport) {
defaultReadyToServeChecker.apply(newPartitionConsumptionState);
}
Expand Down Expand Up @@ -4581,7 +4583,22 @@ void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) {
this.versionRole = versionRole;
}

protected boolean isDaVinciClient() {
boolean isDaVinciClient() {
return isDaVinciClient;
}

boolean isHybridAggregateMode() {
return hybridStoreConfig.isPresent()
&& hybridStoreConfig.get().getDataReplicationPolicy().equals(DataReplicationPolicy.AGGREGATE);
}

ReadyToServeCheck getReadyToServeChecker() {
return defaultReadyToServeChecker;
}

void maybeApplyReadyToServeCheck(PartitionConsumptionState partitionConsumptionState) {
if (isHybridAggregateMode()) {
getReadyToServeChecker().apply(partitionConsumptionState);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ void testRollbackAndRollForward() {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
}
// Bootstrap checker thread can also modify the versionMap, adding wait-assert here to avoid NPE.
assertNotNull(versionMap.get(version2.kafkaTopicName()));
});
versionMap.get(version2.kafkaTopicName()).completePartition(partition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3691,16 +3691,20 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node

@DataProvider
public static Object[][] testProcessTopicSwitchProvider() {
return new Object[][] { { LEADER }, { DA_VINCI } };
return DataProviderUtils.allPermutationGenerator(new NodeType[] { DA_VINCI, LEADER }, DataProviderUtils.BOOLEAN);
}

@Test(dataProvider = "testProcessTopicSwitchProvider")
public void testProcessTopicSwitch(NodeType nodeType) {
public void testProcessTopicSwitch(NodeType nodeType, boolean isAggregateMode) {
VenicePartitioner partitioner = getVenicePartitioner();
PartitionerConfig partitionerConfig = new PartitionerConfigImpl();
partitionerConfig.setPartitionerClass(partitioner.getClass().getName());
HybridStoreConfig hybridStoreConfig =
new HybridStoreConfigImpl(100, 100, 100, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP);
HybridStoreConfig hybridStoreConfig = new HybridStoreConfigImpl(
100,
100,
100,
isAggregateMode ? DataReplicationPolicy.AGGREGATE : DataReplicationPolicy.NON_AGGREGATE,
BufferReplayPolicy.REWIND_FROM_EOP);
MockStoreVersionConfigs storeAndVersionConfigs =
setupStoreAndVersionMocks(2, partitionerConfig, Optional.of(hybridStoreConfig), false, true, AA_OFF);
StorageService storageService = mock(StorageService.class);
Expand Down Expand Up @@ -3748,7 +3752,8 @@ public void testProcessTopicSwitch(NodeType nodeType) {
doReturn(mockOffsetRecord).when(mockPcs).getOffsetRecord();
doReturn(PARTITION_FOO).when(mockPcs).getPartition();
doReturn(PARTITION_FOO).when(mockPcs).getPartition();
storeIngestionTaskUnderTest.processTopicSwitch(controlMessage, PARTITION_FOO, 10, mockPcs);
boolean result = storeIngestionTaskUnderTest.processTopicSwitch(controlMessage, PARTITION_FOO, 10, mockPcs);
Assert.assertEquals(isAggregateMode, result);
verify(mockTopicManagerRemoteKafka, never()).getOffsetByTime(any(), anyLong());
verify(mockOffsetRecord, never()).setLeaderUpstreamOffset(anyString(), anyLong());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() {
parentControllerClient.updateStore(
storeName,
new UpdateStoreQueryParams().setIncrementalPushEnabled(true)
.setActiveActiveReplicationEnabled(true)
.setHybridRewindSeconds(1L)
.setHybridOffsetLagThreshold(10)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.AssertJUnit.fail;

import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
Expand Down Expand Up @@ -78,17 +79,19 @@ public void setUp() {
controllerProps.put(DEFAULT_NUMBER_OF_PARTITION_FOR_HYBRID, 2);
controllerProps.put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 3);
controllerProps.put(DEFAULT_PARTITION_SIZE, 1024);
Properties serverProps = new Properties();
multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(
NUMBER_OF_CHILD_DATACENTERS,
NUMBER_OF_CLUSTERS,
1,
1,
2,
1,
1,
1,
2,
Optional.of(controllerProps),
Optional.of(controllerProps),
Optional.empty());
Optional.of(serverProps),
false);

childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();
}
Expand All @@ -111,25 +114,22 @@ public void testRTTopicDeletionWithHybridAndIncrementalVersions() {
new ControllerClient(clusterName, childDatacenters.get(i).getControllerConnectString());
}

List<TopicManager> topicManagers = new ArrayList<>(2);
topicManagers
.add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());
topicManagers
.add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());

NewStoreResponse newStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\""));
Assert.assertFalse(
newStoreResponse.isError(),
"The NewStoreResponse returned an error: " + newStoreResponse.getError());

StoreInfo store = TestUtils.assertCommand(parentControllerClient.getStore(storeName)).getStore();
String rtTopicName = Utils.getRealTimeTopicName(store);
PubSubTopic rtPubSubTopic = pubSubTopicRepository.getTopic(rtTopicName);

String metaSystemStoreTopic =
Version.composeKafkaTopic(VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName), 1);
TestUtils.waitForNonDeterministicPushCompletion(metaSystemStoreTopic, parentControllerClient, 30, TimeUnit.SECONDS);

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
updateStoreParams.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setActiveActiveReplicationEnabled(true)
.setIncrementalPushEnabled(true)
.setNumVersionsToPreserve(2)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);
Expand All @@ -140,6 +140,14 @@ public void testRTTopicDeletionWithHybridAndIncrementalVersions() {
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
PubSubTopic versionPubsubTopic = getVersionPubsubTopic(storeName, response);

List<TopicManager> topicManagers = new ArrayList<>(2);
topicManagers
.add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());
topicManagers
.add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());

String rtTopicName = Utils.getRealTimeTopicName(store);
PubSubTopic rtPubSubTopic = pubSubTopicRepository.getTopic(rtTopicName);
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic));
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
Expand Down Expand Up @@ -318,6 +326,10 @@ public void testUpdateStore() {
incPushStoreName,
parentControllerClient,
new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA)
.setHybridRewindSeconds(expectedHybridRewindSeconds)
.setHybridOffsetLagThreshold(expectedHybridOffsetLagThreshold)
.setHybridBufferReplayPolicy(expectedHybridBufferReplayPolicy)
.setActiveActiveReplicationEnabled(true)
.setIncrementalPushEnabled(true));
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, true, () -> {
for (ControllerClient controllerClient: controllerClients) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,17 @@ public void controllerClientCanSetStoreMetadata() {
OwnerResponse ownerRes = controllerClient.setStoreOwner(storeName, owner);
Assert.assertFalse(ownerRes.isError(), ownerRes.getError());
Assert.assertEquals(ownerRes.getOwner(), owner);

// Need to finish the push before converting to hybrid.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, true, () -> {
StoreResponse storeResponse = controllerClient.getStore(storeName);
Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 1);
});
UpdateStoreQueryParams updateStoreQueryParams =
new UpdateStoreQueryParams().setPartitionCount(partitionCount).setIncrementalPushEnabled(true);
new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true)
.setHybridRewindSeconds(10)
.setHybridOffsetLagThreshold(10)
.setPartitionCount(partitionCount)
.setIncrementalPushEnabled(true);
ControllerResponse partitionRes = parentControllerClient.updateStore(storeName, updateStoreQueryParams);
Assert.assertFalse(partitionRes.isError(), partitionRes.getError());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,10 @@ public void testEnableActiveActiveReplicationForCluster() {
String storeName1 = Utils.getUniqueString("test-batch-store");
String storeName2 = Utils.getUniqueString("test-hybrid-agg-store");
String storeName3 = Utils.getUniqueString("test-hybrid-non-agg-store");
String storeName4 = Utils.getUniqueString("test-incremental-push-store");
try {
createAndVerifyStoreInAllRegions(storeName1, parentControllerClient, dcControllerClientList);
createAndVerifyStoreInAllRegions(storeName2, parentControllerClient, dcControllerClientList);
createAndVerifyStoreInAllRegions(storeName3, parentControllerClient, dcControllerClientList);
createAndVerifyStoreInAllRegions(storeName4, parentControllerClient, dcControllerClientList);

assertCommand(
parentControllerClient.updateStore(
Expand All @@ -207,9 +205,6 @@ public void testEnableActiveActiveReplicationForCluster() {
storeName3,
new UpdateStoreQueryParams().setHybridRewindSeconds(10).setHybridOffsetLagThreshold(2)));

assertCommand(
parentControllerClient.updateStore(storeName4, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)));

// Test batch
assertCommand(
parentControllerClient.configureActiveActiveReplicationForCluster(
Expand Down Expand Up @@ -249,18 +244,8 @@ public void testEnableActiveActiveReplicationForCluster() {
verifyDCConfigAARepl(parentControllerClient, storeName3, true, true, false);
verifyDCConfigAARepl(dc0Client, storeName3, true, true, false);
verifyDCConfigAARepl(dc1Client, storeName3, true, true, false);

// Test incremental
assertCommand(
parentControllerClient.configureActiveActiveReplicationForCluster(
true,
VeniceUserStoreType.INCREMENTAL_PUSH.toString(),
Optional.empty()));
verifyDCConfigAARepl(parentControllerClient, storeName4, false, false, true);
verifyDCConfigAARepl(dc0Client, storeName4, false, false, true);
verifyDCConfigAARepl(dc1Client, storeName4, false, false, true);
} finally {
deleteStores(storeName1, storeName2, storeName3, storeName4);
deleteStores(storeName1, storeName2, storeName3);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.davinci.stats.HostLevelIngestionStats.ASSEMBLED_RMD_SIZE_IN_BYTES;
import static com.linkedin.venice.endToEnd.TestBatch.TEST_TIMEOUT;
import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.PARENT_D2_SERVICE_NAME;
import static com.linkedin.venice.samza.VeniceSystemFactory.DEPLOYMENT_ID;
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_AGGREGATE;
Expand Down Expand Up @@ -63,6 +64,7 @@
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
Expand All @@ -74,6 +76,7 @@
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
Expand Down Expand Up @@ -406,6 +409,7 @@ public void testIncrementalPushPartialUpdateNewFormat(boolean useSparkCompute) t
UpdateStoreQueryParams updateStoreParams =
new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA)
.setCompressionStrategy(CompressionStrategy.NO_OP)
.setActiveActiveReplicationEnabled(true)
.setWriteComputationEnabled(true)
.setChunkingEnabled(true)
.setIncrementalPushEnabled(true)
Expand Down Expand Up @@ -1467,6 +1471,38 @@ public void testEnablePartialUpdateOnActiveActiveStore() {
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testRTTopicDeletionWithHybridAndIncrementalVersions() {
String storeName = Utils.getUniqueString("testRTTopicDeletion");
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
ControllerClient parentControllerClient =
ControllerClient.constructClusterControllerClient(CLUSTER_NAME, parentControllerURLs);

NewStoreResponse newStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\""));
Assert.assertFalse(
newStoreResponse.isError(),
"The NewStoreResponse returned an error: " + newStoreResponse.getError());

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setActiveActiveReplicationEnabled(true)
.setIncrementalPushEnabled(true)
.setNumVersionsToPreserve(2)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams);

VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test_push_id", 1000);
assertEquals(response.getVersion(), 1);
assertFalse(response.isError(), "Empty push to parent colo should succeed");
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 1),
parentControllerClient,
60,
TimeUnit.SECONDS);
}

@Test(timeOut = TEST_TIMEOUT_MS)
public void testConvertRmdType() {
final String storeName = Utils.getUniqueString("testConvertRmdType");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,12 @@ public void testPushJobDetails(boolean useCustomCheckpoints) throws IOException
// because hadoop job client cannot fetch counters properly.
parentControllerClient.updateStore(
testStoreName,
new UpdateStoreQueryParams().setStorageQuotaInByte(-1).setPartitionCount(2).setIncrementalPushEnabled(true));
new UpdateStoreQueryParams().setStorageQuotaInByte(-1)
.setPartitionCount(2)
.setHybridOffsetLagThreshold(10)
.setHybridRewindSeconds(10)
.setActiveActiveReplicationEnabled(true)
.setIncrementalPushEnabled(true));
Properties pushJobProps = defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPathForFullPush, testStoreName);
pushJobProps.setProperty(PUSH_JOB_STATUS_UPLOAD_ENABLE, String.valueOf(true));
try (VenicePushJob testPushJob = new VenicePushJob("test-push-job-details-job", pushJobProps)) {
Expand Down
Loading

0 comments on commit c7e3884

Please sign in to comment.