Skip to content

Commit

Permalink
Fix reingestion test
Browse files Browse the repository at this point in the history
  • Loading branch information
KKCorps committed Jan 17, 2025
1 parent c2fda4a commit c836009
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2237,6 +2237,11 @@ public void reIngestSegmentsWithErrorState(String tableNameWithType) {
}
}

if (segmentsInErrorState.isEmpty()) {
LOGGER.info("No segments found in ERROR state for table {}", tableNameWithType);
return;
}

// filter out segments that are not ONLINE in IdealState
for (String segmentName : segmentsInErrorState) {
Map<String, String> instanceIdealStateMap = segmentToInstanceIdealStateMap.get(segmentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
Expand Down Expand Up @@ -115,6 +116,8 @@ protected void processTable(String tableNameWithType, Context context) {

if (context._runSegmentLevelValidation) {
runSegmentLevelValidation(tableConfig);
} else {
LOGGER.info("Skipping segment-level validation for table: {}", tableConfig.getTableName());
}
}

Expand Down Expand Up @@ -172,7 +175,11 @@ private void runSegmentLevelValidation(TableConfig tableConfig) {
// Update the total document count gauge
_validationMetrics.updateTotalDocumentCountGauge(realtimeTableName, computeTotalDocumentCount(segmentsZKMetadata));

_llcRealtimeSegmentManager.reIngestSegmentsWithErrorState(tableConfig.getTableName());
boolean isPauselessConsumptionEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig);

if (isPauselessConsumptionEnabled) {
_llcRealtimeSegmentManager.reIngestSegmentsWithErrorState(tableConfig.getTableName());
}

// Check missing segments and upload them to the deep store
if (_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -136,8 +138,19 @@ public void setUp()
TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
tableConfig.getValidationConfig().setRetentionTimeValue("100000");
tableConfig.getIndexingConfig().setPauselessConsumptionEnabled(true);
tableConfig.getIndexingConfig().getStreamConfigs().put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");

IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(
new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
Map<String, String> streamConfigMap = ingestionConfig.getStreamIngestionConfig()
.getStreamConfigMaps()
.get(0);
streamConfigMap.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
streamConfigMap.put("segmentDownloadTimeoutMinutes", "1");
tableConfig.getIndexingConfig().setStreamConfigs(null);
tableConfig.setIngestionConfig(ingestionConfig);

addTableConfig(tableConfig);
Thread.sleep(60000L);
TestUtils.waitForCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMet

protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
if (_failCommit) {
throw new RuntimeException("Forced failure in buildSegmentInternal");
throw new RuntimeException("Forced failure in buildSegmentInternal");
}
return super.buildSegmentInternal(forCommit);
return super.buildSegmentInternal(forCommit);
}
}

0 comments on commit c836009

Please sign in to comment.