diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index dd3663b48a6..51c59eadd6a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -1127,10 +1127,6 @@ protected void leaderExecuteTopicSwitch( partitionConsumptionState.getOffsetRecord().setLeaderTopic(newSourceTopic); // Calculate leader offset and start consumption prepareLeaderOffsetCheckpointAndStartConsumptionAsLeader(newSourceTopic, partitionConsumptionState, true); - - // In case new topic is empty and leader can never become online - // TODO: Remove this check after AGG mode is deprecated. - defaultReadyToServeChecker.apply(partitionConsumptionState); } /** @@ -1155,7 +1151,6 @@ protected boolean processTopicSwitch( syncTopicSwitchToIngestionMetadataService(topicSwitch, partitionConsumptionState); if (!isLeader(partitionConsumptionState)) { partitionConsumptionState.getOffsetRecord().setLeaderTopic(newSourceTopic); - return true; } return false; } @@ -1538,7 +1533,6 @@ void prepareLeaderOffsetCheckpointAndStartConsumptionAsLeader( Set leaderSourceKafkaURLs = getConsumptionSourceKafkaAddress(partitionConsumptionState); Map leaderOffsetByKafkaURL = new HashMap<>(leaderSourceKafkaURLs.size()); List unreachableBrokerList = new ArrayList<>(); - // TODO: Potentially this logic can be merged into below branch. if (calculateUpstreamOffsetFromTopicSwitch) { leaderOffsetByKafkaURL = @@ -1560,7 +1554,6 @@ void prepareLeaderOffsetCheckpointAndStartConsumptionAsLeader( "Failed to reach broker urls {}, will schedule retry to compute upstream offset and resubscribe!", unreachableBrokerList.toString()); } - // subscribe to the new upstream leaderOffsetByKafkaURL.forEach((kafkaURL, leaderStartOffset) -> { consumerSubscribe(leaderTopic, partitionConsumptionState, leaderStartOffset, kafkaURL); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 311156d00a7..d327bb1fa4f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -669,7 +669,7 @@ protected void checkLongRunningTaskState() throws InterruptedException { * In extreme case, if there is no message in real-time topic, there will be no new message after leader switch * to the real-time topic, so `isReadyToServe()` check will never be invoked. */ - defaultReadyToServeChecker.apply(partitionConsumptionState); + maybeApplyReadyToServeCheck(partitionConsumptionState); } break; @@ -1064,7 +1064,7 @@ protected void leaderExecuteTopicSwitch( upstreamStartOffset); // In case new topic is empty and leader can never become online - defaultReadyToServeChecker.apply(partitionConsumptionState); + maybeApplyReadyToServeCheck(partitionConsumptionState); } protected void syncConsumedUpstreamRTOffsetMapIfNeeded( @@ -1326,7 +1326,7 @@ protected boolean processTopicSwitch( * Real time topic for that partition is empty or the rewind start offset is very closed to the end, followers * calculate the lag of the leader and decides the lag is small enough. */ - return true; + return isHybridAggregateMode(); } return false; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 2483370b19f..da5a4d68709 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -77,6 +77,7 @@ import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.DataReplicationPolicy; import com.linkedin.venice.meta.HybridStoreConfig; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.ReadOnlyStoreRepository; @@ -2094,6 +2095,7 @@ private void checkConsumptionStateWhenStart( } } } + // This ready-to-serve check is acceptable in SIT thread as it happens before subscription. if (!isCompletedReport) { defaultReadyToServeChecker.apply(newPartitionConsumptionState); } @@ -4581,7 +4583,22 @@ void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) { this.versionRole = versionRole; } - protected boolean isDaVinciClient() { + boolean isDaVinciClient() { return isDaVinciClient; } + + boolean isHybridAggregateMode() { + return hybridStoreConfig.isPresent() + && hybridStoreConfig.get().getDataReplicationPolicy().equals(DataReplicationPolicy.AGGREGATE); + } + + ReadyToServeCheck getReadyToServeChecker() { + return defaultReadyToServeChecker; + } + + void maybeApplyReadyToServeCheck(PartitionConsumptionState partitionConsumptionState) { + if (isHybridAggregateMode()) { + getReadyToServeChecker().apply(partitionConsumptionState); + } + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java index e11dc144120..bb4b544e144 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java @@ -403,6 +403,8 @@ void testRollbackAndRollForward() { try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); } + // Bootstrap checker thread can also modify the versionMap, adding wait-assert here to avoid NPE. + assertNotNull(versionMap.get(version2.kafkaTopicName())); }); versionMap.get(version2.kafkaTopicName()).completePartition(partition); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index d894c8b33ba..f8285f64eab 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -3691,16 +3691,20 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node @DataProvider public static Object[][] testProcessTopicSwitchProvider() { - return new Object[][] { { LEADER }, { DA_VINCI } }; + return DataProviderUtils.allPermutationGenerator(new NodeType[] { DA_VINCI, LEADER }, DataProviderUtils.BOOLEAN); } @Test(dataProvider = "testProcessTopicSwitchProvider") - public void testProcessTopicSwitch(NodeType nodeType) { + public void testProcessTopicSwitch(NodeType nodeType, boolean isAggregateMode) { VenicePartitioner partitioner = getVenicePartitioner(); PartitionerConfig partitionerConfig = new PartitionerConfigImpl(); partitionerConfig.setPartitionerClass(partitioner.getClass().getName()); - HybridStoreConfig hybridStoreConfig = - new HybridStoreConfigImpl(100, 100, 100, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); + HybridStoreConfig hybridStoreConfig = new HybridStoreConfigImpl( + 100, + 100, + 100, + isAggregateMode ? DataReplicationPolicy.AGGREGATE : DataReplicationPolicy.NON_AGGREGATE, + BufferReplayPolicy.REWIND_FROM_EOP); MockStoreVersionConfigs storeAndVersionConfigs = setupStoreAndVersionMocks(2, partitionerConfig, Optional.of(hybridStoreConfig), false, true, AA_OFF); StorageService storageService = mock(StorageService.class); @@ -3748,7 +3752,8 @@ public void testProcessTopicSwitch(NodeType nodeType) { doReturn(mockOffsetRecord).when(mockPcs).getOffsetRecord(); doReturn(PARTITION_FOO).when(mockPcs).getPartition(); doReturn(PARTITION_FOO).when(mockPcs).getPartition(); - storeIngestionTaskUnderTest.processTopicSwitch(controlMessage, PARTITION_FOO, 10, mockPcs); + boolean result = storeIngestionTaskUnderTest.processTopicSwitch(controlMessage, PARTITION_FOO, 10, mockPcs); + Assert.assertEquals(isAggregateMode, result); verify(mockTopicManagerRemoteKafka, never()).getOffsetByTime(any(), anyLong()); verify(mockOffsetRecord, never()).setLeaderUpstreamOffset(anyString(), anyLong()); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestClusterLevelConfigForNativeReplication.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestClusterLevelConfigForNativeReplication.java index 7ac63c31d30..2668a8a97b2 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestClusterLevelConfigForNativeReplication.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestClusterLevelConfigForNativeReplication.java @@ -97,6 +97,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() { parentControllerClient.updateStore( storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(true) + .setActiveActiveReplicationEnabled(true) .setHybridRewindSeconds(1L) .setHybridOffsetLagThreshold(10))); TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java index fb037b32bfe..7db0ecbe789 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java @@ -10,6 +10,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.AssertJUnit.fail; +import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.ControllerResponse; @@ -78,17 +79,19 @@ public void setUp() { controllerProps.put(DEFAULT_NUMBER_OF_PARTITION_FOR_HYBRID, 2); controllerProps.put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 3); controllerProps.put(DEFAULT_PARTITION_SIZE, 1024); + Properties serverProps = new Properties(); multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( NUMBER_OF_CHILD_DATACENTERS, NUMBER_OF_CLUSTERS, 1, 1, + 2, 1, - 1, - 1, + 2, Optional.of(controllerProps), Optional.of(controllerProps), - Optional.empty()); + Optional.of(serverProps), + false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); } @@ -111,12 +114,6 @@ public void testRTTopicDeletionWithHybridAndIncrementalVersions() { new ControllerClient(clusterName, childDatacenters.get(i).getControllerConnectString()); } - List topicManagers = new ArrayList<>(2); - topicManagers - .add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); - topicManagers - .add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); - NewStoreResponse newStoreResponse = parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\"")); Assert.assertFalse( @@ -124,12 +121,15 @@ public void testRTTopicDeletionWithHybridAndIncrementalVersions() { "The NewStoreResponse returned an error: " + newStoreResponse.getError()); StoreInfo store = TestUtils.assertCommand(parentControllerClient.getStore(storeName)).getStore(); - String rtTopicName = Utils.getRealTimeTopicName(store); - PubSubTopic rtPubSubTopic = pubSubTopicRepository.getTopic(rtTopicName); + + String metaSystemStoreTopic = + Version.composeKafkaTopic(VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName), 1); + TestUtils.waitForNonDeterministicPushCompletion(metaSystemStoreTopic, parentControllerClient, 30, TimeUnit.SECONDS); UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams(); - updateStoreParams.setIncrementalPushEnabled(true) - .setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS) + updateStoreParams.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS) + .setActiveActiveReplicationEnabled(true) + .setIncrementalPushEnabled(true) .setNumVersionsToPreserve(2) .setHybridRewindSeconds(1000) .setHybridOffsetLagThreshold(1000); @@ -140,6 +140,14 @@ public void testRTTopicDeletionWithHybridAndIncrementalVersions() { .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); PubSubTopic versionPubsubTopic = getVersionPubsubTopic(storeName, response); + List topicManagers = new ArrayList<>(2); + topicManagers + .add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); + topicManagers + .add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); + + String rtTopicName = Utils.getRealTimeTopicName(store); + PubSubTopic rtPubSubTopic = pubSubTopicRepository.getTopic(rtTopicName); for (TopicManager topicManager: topicManagers) { Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic)); Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic)); @@ -318,6 +326,10 @@ public void testUpdateStore() { incPushStoreName, parentControllerClient, new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) + .setHybridRewindSeconds(expectedHybridRewindSeconds) + .setHybridOffsetLagThreshold(expectedHybridOffsetLagThreshold) + .setHybridBufferReplayPolicy(expectedHybridBufferReplayPolicy) + .setActiveActiveReplicationEnabled(true) .setIncrementalPushEnabled(true)); TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, true, () -> { for (ControllerClient controllerClient: controllerClients) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java index 9256626b392..cf1165bc236 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java @@ -545,9 +545,17 @@ public void controllerClientCanSetStoreMetadata() { OwnerResponse ownerRes = controllerClient.setStoreOwner(storeName, owner); Assert.assertFalse(ownerRes.isError(), ownerRes.getError()); Assert.assertEquals(ownerRes.getOwner(), owner); - + // Need to finish the push before converting to hybrid. + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, true, () -> { + StoreResponse storeResponse = controllerClient.getStore(storeName); + Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 1); + }); UpdateStoreQueryParams updateStoreQueryParams = - new UpdateStoreQueryParams().setPartitionCount(partitionCount).setIncrementalPushEnabled(true); + new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true) + .setHybridRewindSeconds(10) + .setHybridOffsetLagThreshold(10) + .setPartitionCount(partitionCount) + .setIncrementalPushEnabled(true); ControllerResponse partitionRes = parentControllerClient.updateStore(storeName, updateStoreQueryParams); Assert.assertFalse(partitionRes.isError(), partitionRes.getError()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.java index 3073d79aaff..ae4f6685f4c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.java @@ -188,12 +188,10 @@ public void testEnableActiveActiveReplicationForCluster() { String storeName1 = Utils.getUniqueString("test-batch-store"); String storeName2 = Utils.getUniqueString("test-hybrid-agg-store"); String storeName3 = Utils.getUniqueString("test-hybrid-non-agg-store"); - String storeName4 = Utils.getUniqueString("test-incremental-push-store"); try { createAndVerifyStoreInAllRegions(storeName1, parentControllerClient, dcControllerClientList); createAndVerifyStoreInAllRegions(storeName2, parentControllerClient, dcControllerClientList); createAndVerifyStoreInAllRegions(storeName3, parentControllerClient, dcControllerClientList); - createAndVerifyStoreInAllRegions(storeName4, parentControllerClient, dcControllerClientList); assertCommand( parentControllerClient.updateStore( @@ -207,9 +205,6 @@ public void testEnableActiveActiveReplicationForCluster() { storeName3, new UpdateStoreQueryParams().setHybridRewindSeconds(10).setHybridOffsetLagThreshold(2))); - assertCommand( - parentControllerClient.updateStore(storeName4, new UpdateStoreQueryParams().setIncrementalPushEnabled(true))); - // Test batch assertCommand( parentControllerClient.configureActiveActiveReplicationForCluster( @@ -249,18 +244,8 @@ public void testEnableActiveActiveReplicationForCluster() { verifyDCConfigAARepl(parentControllerClient, storeName3, true, true, false); verifyDCConfigAARepl(dc0Client, storeName3, true, true, false); verifyDCConfigAARepl(dc1Client, storeName3, true, true, false); - - // Test incremental - assertCommand( - parentControllerClient.configureActiveActiveReplicationForCluster( - true, - VeniceUserStoreType.INCREMENTAL_PUSH.toString(), - Optional.empty())); - verifyDCConfigAARepl(parentControllerClient, storeName4, false, false, true); - verifyDCConfigAARepl(dc0Client, storeName4, false, false, true); - verifyDCConfigAARepl(dc1Client, storeName4, false, false, true); } finally { - deleteStores(storeName1, storeName2, storeName3, storeName4); + deleteStores(storeName1, storeName2, storeName3); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index 8feecea67cd..901245c4a73 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -1,6 +1,7 @@ package com.linkedin.venice.endToEnd; import static com.linkedin.davinci.stats.HostLevelIngestionStats.ASSEMBLED_RMD_SIZE_IN_BYTES; +import static com.linkedin.venice.endToEnd.TestBatch.TEST_TIMEOUT; import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.PARENT_D2_SERVICE_NAME; import static com.linkedin.venice.samza.VeniceSystemFactory.DEPLOYMENT_ID; import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_AGGREGATE; @@ -63,6 +64,7 @@ import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.NewStoreResponse; import com.linkedin.venice.controllerapi.SchemaResponse; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.controllerapi.VersionCreationResponse; @@ -74,6 +76,7 @@ import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceServerWrapper; import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.BackupStrategy; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreInfo; @@ -406,6 +409,7 @@ public void testIncrementalPushPartialUpdateNewFormat(boolean useSparkCompute) t UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) .setCompressionStrategy(CompressionStrategy.NO_OP) + .setActiveActiveReplicationEnabled(true) .setWriteComputationEnabled(true) .setChunkingEnabled(true) .setIncrementalPushEnabled(true) @@ -1467,6 +1471,38 @@ public void testEnablePartialUpdateOnActiveActiveStore() { } } + @Test(timeOut = TEST_TIMEOUT) + public void testRTTopicDeletionWithHybridAndIncrementalVersions() { + String storeName = Utils.getUniqueString("testRTTopicDeletion"); + String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString(); + ControllerClient parentControllerClient = + ControllerClient.constructClusterControllerClient(CLUSTER_NAME, parentControllerURLs); + + NewStoreResponse newStoreResponse = + parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\"")); + Assert.assertFalse( + newStoreResponse.isError(), + "The NewStoreResponse returned an error: " + newStoreResponse.getError()); + + UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams(); + updateStoreParams.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS) + .setActiveActiveReplicationEnabled(true) + .setIncrementalPushEnabled(true) + .setNumVersionsToPreserve(2) + .setHybridRewindSeconds(1000) + .setHybridOffsetLagThreshold(1000); + TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams); + + VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test_push_id", 1000); + assertEquals(response.getVersion(), 1); + assertFalse(response.isError(), "Empty push to parent colo should succeed"); + TestUtils.waitForNonDeterministicPushCompletion( + Version.composeKafkaTopic(storeName, 1), + parentControllerClient, + 60, + TimeUnit.SECONDS); + } + @Test(timeOut = TEST_TIMEOUT_MS) public void testConvertRmdType() { final String storeName = Utils.getUniqueString("testConvertRmdType"); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushJobDetailsTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushJobDetailsTest.java index 33a0d183a1e..5d1731f7670 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushJobDetailsTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushJobDetailsTest.java @@ -338,7 +338,12 @@ public void testPushJobDetails(boolean useCustomCheckpoints) throws IOException // because hadoop job client cannot fetch counters properly. parentControllerClient.updateStore( testStoreName, - new UpdateStoreQueryParams().setStorageQuotaInByte(-1).setPartitionCount(2).setIncrementalPushEnabled(true)); + new UpdateStoreQueryParams().setStorageQuotaInByte(-1) + .setPartitionCount(2) + .setHybridOffsetLagThreshold(10) + .setHybridRewindSeconds(10) + .setActiveActiveReplicationEnabled(true) + .setIncrementalPushEnabled(true)); Properties pushJobProps = defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPathForFullPush, testStoreName); pushJobProps.setProperty(PUSH_JOB_STATUS_UPLOAD_ENABLE, String.valueOf(true)); try (VenicePushJob testPushJob = new VenicePushJob("test-push-job-details-job", pushJobProps)) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java index 22b2d801b83..1ebca163050 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java @@ -121,6 +121,7 @@ public void setUpStore() { storeName, new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) .setPartitionCount(PARTITION_COUNT) + .setActiveActiveReplicationEnabled(true) .setIncrementalPushEnabled(true))); String daVinciPushStatusSystemStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(storeName); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchReportIncrementalPush.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchReportIncrementalPush.java index fc1e92ed25b..cef9e24a1e5 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchReportIncrementalPush.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchReportIncrementalPush.java @@ -121,6 +121,7 @@ public void testBatchReportIncrementalPush() throws IOException { .setCompressionStrategy(CompressionStrategy.NO_OP) .setWriteComputationEnabled(true) .setChunkingEnabled(true) + .setActiveActiveReplicationEnabled(true) .setIncrementalPushEnabled(true) .setHybridRewindSeconds(1000L) .setHybridOffsetLagThreshold(2L); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java index a8c282b2a5e..6a4fcc4d962 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java @@ -29,19 +29,13 @@ import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord; import static com.linkedin.venice.utils.TestUtils.assertCommand; import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion; -import static com.linkedin.venice.utils.TestWriteUtils.STRING_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP; -import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE; import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.INCREMENTAL_PUSH; import static com.linkedin.venice.vpj.VenicePushJobConstants.INPUT_PATH_PROP; -import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; -import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_MAX_RECORDS_PER_MAPPER; -import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_TOPIC; import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_STATUS_UPLOAD_ENABLE; import static com.linkedin.venice.vpj.VenicePushJobConstants.SEND_CONTROL_MESSAGES_DIRECTLY; -import static com.linkedin.venice.vpj.VenicePushJobConstants.SOURCE_KAFKA; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP; import static org.testng.Assert.assertFalse; @@ -90,7 +84,6 @@ import com.linkedin.venice.samza.VeniceSystemFactory; import com.linkedin.venice.samza.VeniceSystemProducer; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; -import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.server.VeniceServer; import com.linkedin.venice.status.BatchJobHeartbeatConfigs; import com.linkedin.venice.status.PushJobDetailsStatus; @@ -106,9 +99,6 @@ import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; -import com.linkedin.venice.writer.VeniceWriter; -import com.linkedin.venice.writer.VeniceWriterFactory; -import com.linkedin.venice.writer.VeniceWriterOptions; import io.tehuti.metrics.MetricsRepository; import java.io.File; import java.io.IOException; @@ -119,7 +109,6 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -482,6 +471,7 @@ public void testNativeReplicationForIncrementalPush() throws Exception { updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(1) .setHybridOffsetLagThreshold(TEST_TIMEOUT) .setHybridRewindSeconds(2L) + .setActiveActiveReplicationEnabled(true) .setIncrementalPushEnabled(true), 100, (parentControllerClient, clusterName, storeName, props, inputDir) -> { @@ -509,8 +499,7 @@ public void testActiveActiveForHeartbeatSystemStores() throws Exception { int partitionCount = 2; motherOfAllTests( "testActiveActiveForHeartbeatSystemStores", - updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(partitionCount) - .setIncrementalPushEnabled(true), + updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(partitionCount), recordCount, (parentControllerClient, clusterName, storeName, props, inputDir) -> { try ( @@ -576,93 +565,6 @@ public void testActiveActiveForHeartbeatSystemStores() throws Exception { }); } - @Test(timeOut = TEST_TIMEOUT) - public void testMultiDataCenterRePushWithIncrementalPush() throws Exception { - motherOfAllTests( - "testMultiDataCenterRePushWithIncrementalPush", - updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(1), - 100, - (parentControllerClient, clusterName, storeName, props, inputDir) -> { - try (VenicePushJob job = new VenicePushJob("Test push job", props)) { - job.run(); - - // Verify the kafka URL being returned to the push job is the same as dc-0 kafka url. - Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(0).getKafkaBrokerWrapper().getAddress()); - } - VeniceWriter incPushToRTWriter = null; - try { - assertFalse( - parentControllerClient - .updateStore( - storeName, - new UpdateStoreQueryParams().setIncrementalPushEnabled(true) - .setHybridOffsetLagThreshold(1) - .setHybridRewindSeconds(Time.SECONDS_PER_DAY)) - .isError()); - - // Update the store to L/F hybrid and enable INCREMENTAL_PUSH_SAME_AS_REAL_TIME. - props.setProperty(SOURCE_KAFKA, "true"); - props.setProperty( - KAFKA_INPUT_BROKER_URL, - multiRegionMultiClusterWrapper.getParentKafkaBrokerWrapper().getAddress()); - props.setProperty(KAFKA_INPUT_MAX_RECORDS_PER_MAPPER, "5"); - props.setProperty(VeniceWriter.ENABLE_CHUNKING, "false"); - props.setProperty(KAFKA_INPUT_TOPIC, Version.composeKafkaTopic(storeName, 1)); - - try (VenicePushJob rePushJob = new VenicePushJob("Test re-push job re-push", props)) { - rePushJob.run(); - } - String incPushToRTVersion = System.currentTimeMillis() + "_test_inc_push_to_rt"; - VeniceControllerWrapper parentController = - parentControllers.stream().filter(c -> c.isLeaderController(clusterName)).findAny().get(); - incPushToRTWriter = startIncrementalPush( - parentControllerClient, - storeName, - parentController.getVeniceAdmin().getVeniceWriterFactory(), - incPushToRTVersion); - final String newVersionTopic = Version.composeKafkaTopic( - storeName, - parentControllerClient.getStore(storeName).getStore().getLargestUsedVersionNumber()); - // Incremental push shouldn't be blocked and we will complete it once the new re-push is started. - String incValuePrefix = "inc_test_"; - int newRePushVersion = Version.parseVersionFromKafkaTopicName(newVersionTopic) + 1; - VeniceWriter finalIncPushToRTWriter = incPushToRTWriter; - CompletableFuture.runAsync(() -> { - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - Assert.assertEquals( - parentControllerClient.getStore(storeName).getStore().getLargestUsedVersionNumber(), - newRePushVersion); - }); - for (int i = 1; i <= 10; i++) { - finalIncPushToRTWriter.put(Integer.toString(i), incValuePrefix + i, 1); - } - finalIncPushToRTWriter.broadcastEndOfIncrementalPush(incPushToRTVersion, new HashMap<>()); - }); - // The re-push should complete and contain all the incremental push to RT data. - props.setProperty(KAFKA_INPUT_TOPIC, newVersionTopic); - try (VenicePushJob rePushJob = new VenicePushJob("Test re-push job re-push", props)) { - rePushJob.run(); - } - // Rewind should be overwritten. - Optional latestVersion = - parentControllerClient.getStore(storeName).getStore().getVersion(newRePushVersion); - Assert.assertTrue(latestVersion.isPresent()); - Assert.assertEquals( - latestVersion.get().getHybridStoreConfig().getRewindTimeInSeconds(), - DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE); - for (int dataCenterIndex = 0; dataCenterIndex < NUMBER_OF_CHILD_DATACENTERS; dataCenterIndex++) { - String routerUrl = - childDatacenters.get(dataCenterIndex).getClusters().get(clusterName).getRandomRouterURL(); - verifyVeniceStoreData(storeName, routerUrl, incValuePrefix, 10); - } - } finally { - if (incPushToRTWriter != null) { - incPushToRTWriter.close(); - } - } - }); - } - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) public void testEmptyPush(boolean toParent) { String clusterName = CLUSTER_NAMES[0]; @@ -1050,47 +952,4 @@ private void assertStoreHealth(ControllerClient controllerClient, String systemS storeResponse.getStore().getCurrentVersion() > 0, systemStoreName + " is not ready for DC-" + dcNumber); } - - private VeniceWriter startIncrementalPush( - ControllerClient controllerClient, - String storeName, - VeniceWriterFactory veniceWriterFactory, - String incrementalPushVersion) { - VersionCreationResponse response = controllerClient.requestTopicForWrites( - storeName, - 1024, - Version.PushType.INCREMENTAL, - "test-incremental-push", - true, - true, - false, - Optional.empty(), - Optional.empty(), - Optional.empty(), - false, - -1); - assertFalse(response.isError()); - Assert.assertNotNull(response.getKafkaTopic()); - VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter( - new VeniceWriterOptions.Builder(response.getKafkaTopic()) - .setKeySerializer(new VeniceAvroKafkaSerializer(STRING_SCHEMA.toString())) - .setValueSerializer(new VeniceAvroKafkaSerializer(STRING_SCHEMA.toString())) - .build()); - veniceWriter.broadcastStartOfIncrementalPush(incrementalPushVersion, new HashMap<>()); - return veniceWriter; - } - - private void verifyVeniceStoreData(String storeName, String routerUrl, String valuePrefix, int keyCount) - throws ExecutionException, InterruptedException { - try (AvroGenericStoreClient client = ClientFactory - .getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(routerUrl))) { - for (int i = 1; i <= keyCount; ++i) { - String expected = valuePrefix + i; - Object actual = client.get(Integer.toString(i)).get(); /* client.get().get() returns a Utf8 object */ - Assert.assertNotNull(actual, "Unexpected null value for key: " + i); - Assert.assertEquals(actual.toString(), expected); - } - } - } - } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/ingestionHeartbeat/IngestionHeartBeatTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/ingestionHeartbeat/IngestionHeartBeatTest.java index 4d110669962..bc7cee1128b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/ingestionHeartbeat/IngestionHeartBeatTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/ingestionHeartbeat/IngestionHeartBeatTest.java @@ -164,6 +164,11 @@ public void testIngestionHeartBeat( ControllerResponse updateStoreResponse = parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams)); + // If config combination for incremental push is wrong, update store should fail loudly. + if (!isActiveActiveEnabled && isIncrementalPushEnabled) { + assertTrue(updateStoreResponse.isError(), "Update store does not error on invalid config combination."); + return; + } assertFalse(updateStoreResponse.isError(), "Update store got error: " + updateStoreResponse.getError()); VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test_push_id", 1000); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index 934e30be91f..547c3db34a2 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -24,7 +24,6 @@ import java.io.File; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -256,44 +255,43 @@ public static Map> addKafkaClusterIDMappingToServerC Optional serverProperties, List regionNames, List kafkaBrokers) { - if (serverProperties.isPresent()) { - PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol.valueOf( - serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name())); - Map> kafkaClusterMap = new HashMap<>(); - Map mapping; - for (int i = 1; i <= regionNames.size(); i++) { - int clusterId = i - 1; - String regionName = regionNames.get(clusterId); - PubSubSecurityProtocol securityProtocol = baseSecurityProtocol; - if (clusterId > 0) { - // Testing mixed security on any 2-layer setup with 2 or more DCs. - securityProtocol = PubSubSecurityProtocol.SSL; - } - PubSubBrokerWrapper pubSubBrokerWrapper = kafkaBrokers.get(i); - mapping = prepareKafkaClusterMappingInfo(regionName, pubSubBrokerWrapper, securityProtocol, ""); - kafkaClusterMap.put(String.valueOf(clusterId), mapping); - } - - for (int i = 1 + regionNames.size(); i <= 2 * regionNames.size(); i++) { - int clusterId = i - 1; - String regionName = regionNames.get(clusterId - regionNames.size()); - PubSubBrokerWrapper pubSubBrokerWrapper = kafkaBrokers.get(i - regionNames.size()); - mapping = prepareKafkaClusterMappingInfo( - regionName, - pubSubBrokerWrapper, - baseSecurityProtocol, - Utils.SEPARATE_TOPIC_SUFFIX); - kafkaClusterMap.put(String.valueOf(clusterId), mapping); + PubSubSecurityProtocol baseSecurityProtocol = serverProperties + .map( + properties -> PubSubSecurityProtocol + .valueOf(properties.getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name()))) + .orElse(PubSubSecurityProtocol.PLAINTEXT); + Map> kafkaClusterMap = new HashMap<>(); + Map mapping; + for (int i = 1; i <= regionNames.size(); i++) { + int clusterId = i - 1; + String regionName = regionNames.get(clusterId); + PubSubSecurityProtocol securityProtocol = baseSecurityProtocol; + if (clusterId > 0) { + // Testing mixed security on any 2-layer setup with 2 or more DCs. + securityProtocol = PubSubSecurityProtocol.SSL; } + PubSubBrokerWrapper pubSubBrokerWrapper = kafkaBrokers.get(i); + mapping = prepareKafkaClusterMappingInfo(regionName, pubSubBrokerWrapper, securityProtocol, ""); + kafkaClusterMap.put(String.valueOf(clusterId), mapping); + } - LOGGER.info( - "addKafkaClusterIDMappingToServerConfigs \n\treceived broker list: \n\t\t{} \n\tand generated cluster map: \n\t\t{}", - kafkaBrokers.stream().map(PubSubBrokerWrapper::toString).collect(Collectors.joining("\n\t\t")), - kafkaClusterMap.entrySet().stream().map(Objects::toString).collect(Collectors.joining("\n\t\t"))); - return kafkaClusterMap; - } else { - return Collections.emptyMap(); + for (int i = 1 + regionNames.size(); i <= 2 * regionNames.size(); i++) { + int clusterId = i - 1; + String regionName = regionNames.get(clusterId - regionNames.size()); + PubSubBrokerWrapper pubSubBrokerWrapper = kafkaBrokers.get(i - regionNames.size()); + mapping = prepareKafkaClusterMappingInfo( + regionName, + pubSubBrokerWrapper, + baseSecurityProtocol, + Utils.SEPARATE_TOPIC_SUFFIX); + kafkaClusterMap.put(String.valueOf(clusterId), mapping); } + + LOGGER.info( + "addKafkaClusterIDMappingToServerConfigs \n\treceived broker list: \n\t\t{} \n\tand generated cluster map: \n\t\t{}", + kafkaBrokers.stream().map(PubSubBrokerWrapper::toString).collect(Collectors.joining("\n\t\t")), + kafkaClusterMap.entrySet().stream().map(Objects::toString).collect(Collectors.joining("\n\t\t"))); + return kafkaClusterMap; } static Map prepareKafkaClusterMappingInfo( diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java index 35d1a047279..ffd2e8236b0 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java @@ -465,7 +465,7 @@ public static void waitForNonDeterministicPushCompletion( ControllerClient controllerClient, long timeout, TimeUnit timeoutUnit) { - waitForNonDeterministicAssertion(timeout, timeoutUnit, () -> { + waitForNonDeterministicAssertion(timeout, timeoutUnit, true, () -> { JobStatusQueryResponse jobStatusQueryResponse = assertCommand(controllerClient.queryJobStatus(topicName, Optional.empty())); ExecutionStatus executionStatus = ExecutionStatus.valueOf(jobStatusQueryResponse.getStatus()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 74365eb9b50..9bd42282188 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -2518,7 +2518,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa && !veniceHelixAdmin.isHybrid(currStore.getHybridStoreConfig()) && !veniceHelixAdmin.isHybrid(updatedHybridStoreConfig)) { LOGGER.info( - "Enabling incremental push for a batch store:{}. Converting it to a hybrid store with default configs.", + "Enabling incremental push for a batch store:{}. Converting it to Active/Active hybrid store with default configs.", storeName); HybridStoreConfigRecord hybridStoreConfigRecord = new HybridStoreConfigRecord(); hybridStoreConfigRecord.rewindTimeInSeconds = DEFAULT_REWIND_TIME_IN_SECONDS; @@ -2533,6 +2533,10 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa updatedConfigsList.add(BUFFER_REPLAY_POLICY); hybridStoreConfigRecord.realTimeTopicName = DEFAULT_REAL_TIME_TOPIC_NAME; setStore.hybridStoreConfig = hybridStoreConfigRecord; + if (!currStore.isSystemStore() && controllerConfig.isActiveActiveReplicationEnabledAsDefaultForHybrid()) { + setStore.activeActiveReplicationEnabled = true; + updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED); + } } /** @@ -2768,6 +2772,18 @@ && getVeniceHelixAdmin().isHybrid(setStore.getHybridStoreConfig()) && setStore.g updatedConfigsList.add(PARTITION_COUNT); } + /** + * Pre-flight check for incremental push config update. We only allow incremental push config to be turned on + * when store is A/A. Otherwise, we should fail store update. + */ + if (setStore.hybridStoreConfig != null && setStore.incrementalPushEnabled + && !setStore.activeActiveReplicationEnabled) { + throw new VeniceHttpException( + HttpStatus.SC_BAD_REQUEST, + "Hybrid store config invalid. Cannot have incremental push enabled while A/A not enabled", + ErrorType.BAD_REQUEST); + } + /** * By default, parent controllers will not try to replicate the unchanged store configs to child controllers; * an updatedConfigsList will be used to represent which configs are updated by users. diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index e683b7b86e3..75b0d884d5f 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -1750,8 +1750,7 @@ public void testUpdateStore() { when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); - UpdateStoreQueryParams storeQueryParams1 = - new UpdateStoreQueryParams().setIncrementalPushEnabled(true).setBlobTransferEnabled(true); + UpdateStoreQueryParams storeQueryParams1 = new UpdateStoreQueryParams().setBlobTransferEnabled(true); parentAdmin.initStorageCluster(clusterName); parentAdmin.updateStore(clusterName, storeName, storeQueryParams1); @@ -1771,7 +1770,6 @@ public void testUpdateStore() { assertEquals(adminMessage.operationType, AdminMessageType.UPDATE_STORE.getValue()); UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; - assertEquals(updateStore.incrementalPushEnabled, true); Assert.assertTrue(updateStore.blobTransferEnabled); long readQuota = 100L; @@ -2663,8 +2661,8 @@ public void testAclException() { () -> parentAdmin.deleteAclForStore(clusterName, storeName)); } - @Test - public void testHybridAndIncrementalUpdateStoreCommands() { + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testHybridAndIncrementalUpdateStoreCommands(boolean aaEnabled) { String storeName = Utils.getUniqueString("testUpdateStore"); Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); doReturn(store).when(internalAdmin).getStore(clusterName, storeName); @@ -2701,6 +2699,7 @@ public void testHybridAndIncrementalUpdateStoreCommands() { assertEquals(updateStore.hybridStoreConfig.offsetLagThresholdToGoOnline, 20000); assertEquals(updateStore.hybridStoreConfig.rewindTimeInSeconds, 60); + store.setActiveActiveReplicationEnabled(aaEnabled); store.setHybridStoreConfig( new HybridStoreConfigImpl( 60, @@ -2709,10 +2708,15 @@ public void testHybridAndIncrementalUpdateStoreCommands() { DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)); // Incremental push can be enabled on a hybrid store, default inc push policy is inc push to RT now - parentAdmin.updateStore(clusterName, storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)); - - // veniceWriter.put will be called again for the second update store command - verify(veniceWriter, times(2)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + if (aaEnabled) { + parentAdmin.updateStore(clusterName, storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)); + // veniceWriter.put will be called again for the second update store command + verify(veniceWriter, times(2)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + } else { + assertThrows( + () -> parentAdmin + .updateStore(clusterName, storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(true))); + } } @Test