From c836009e15a5c61b9ec8594e0b478da9c979ffa5 Mon Sep 17 00:00:00 2001 From: KKCorps Date: Fri, 17 Jan 2025 10:49:32 +0530 Subject: [PATCH] Fix reingestion test --- .../PinotLLCRealtimeSegmentManager.java | 5 +++++ .../RealtimeSegmentValidationManager.java | 9 ++++++++- ...altimeIngestionSegmentCommitFailureTest.java | 17 +++++++++++++++-- ...lureInjectingRealtimeSegmentDataManager.java | 4 ++-- 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 51b2f0c17e32..9264eed06a7f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -2237,6 +2237,11 @@ public void reIngestSegmentsWithErrorState(String tableNameWithType) { } } + if (segmentsInErrorState.isEmpty()) { + LOGGER.info("No segments found in ERROR state for table {}", tableNameWithType); + return; + } + // filter out segments that are not ONLINE in IdealState for (String segmentName : segmentsInErrorState) { Map instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 95d348cce504..e4411f7ae441 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -28,6 +28,7 @@ import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ValidationMetrics; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.api.resources.PauseStatusDetails; @@ -115,6 +116,8 @@ protected void processTable(String tableNameWithType, Context context) { if (context._runSegmentLevelValidation) { runSegmentLevelValidation(tableConfig); + } else { + LOGGER.info("Skipping segment-level validation for table: {}", tableConfig.getTableName()); } } @@ -172,7 +175,11 @@ private void runSegmentLevelValidation(TableConfig tableConfig) { // Update the total document count gauge _validationMetrics.updateTotalDocumentCountGauge(realtimeTableName, computeTotalDocumentCount(segmentsZKMetadata)); - _llcRealtimeSegmentManager.reIngestSegmentsWithErrorState(tableConfig.getTableName()); + boolean isPauselessConsumptionEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig); + + if (isPauselessConsumptionEnabled) { + _llcRealtimeSegmentManager.reIngestSegmentsWithErrorState(tableConfig.getTableName()); + } // Check missing segments and upload them to the deep store if (_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java index fb23f22a50e4..e37714d9976c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java @@ -38,6 +38,8 @@ import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig; import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; @@ -136,8 +138,19 @@ public void setUp() TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0)); tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS"); tableConfig.getValidationConfig().setRetentionTimeValue("100000"); - tableConfig.getIndexingConfig().setPauselessConsumptionEnabled(true); - tableConfig.getIndexingConfig().getStreamConfigs().put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless"); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig( + new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs()))); + ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true); + Map streamConfigMap = ingestionConfig.getStreamIngestionConfig() + .getStreamConfigMaps() + .get(0); + streamConfigMap.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless"); + streamConfigMap.put("segmentDownloadTimeoutMinutes", "1"); + tableConfig.getIndexingConfig().setStreamConfigs(null); + tableConfig.setIngestionConfig(ingestionConfig); + addTableConfig(tableConfig); Thread.sleep(60000L); TestUtils.waitForCondition( diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/FailureInjectingRealtimeSegmentDataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/FailureInjectingRealtimeSegmentDataManager.java index dc6fce1915cf..072cc92e4923 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/FailureInjectingRealtimeSegmentDataManager.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/FailureInjectingRealtimeSegmentDataManager.java @@ -59,8 +59,8 @@ public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMet protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { if (_failCommit) { - throw new RuntimeException("Forced failure in buildSegmentInternal"); + throw new RuntimeException("Forced failure in buildSegmentInternal"); } - return super.buildSegmentInternal(forCommit); + return super.buildSegmentInternal(forCommit); } }