Skip to content

Commit

Permalink
Pauseless Ingestion #2: Handle Failure scenarios without DR (#14798)
Browse files Browse the repository at this point in the history
  • Loading branch information
9aman authored Jan 29, 2025
1 parent 1585490 commit 3525116
Show file tree
Hide file tree
Showing 21 changed files with 1,391 additions and 396 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pinot.common.metadata.segment;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -33,6 +37,8 @@

public class SegmentZKMetadata implements ZKMetadata {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentZKMetadata.class);
private static final String SEGMENT_NAME_KEY = "segmentName";
private static final String SIMPLE_FIELDS_KEY = "simpleFields";
private static final String NULL = "null";

private final ZNRecord _znRecord;
Expand All @@ -58,6 +64,16 @@ public String getSegmentName() {
return _znRecord.getId();
}

public Map<String, String> getSimpleFields() {
return _simpleFields;
}

public void setSimpleFields(Map<String, String> simpleFields) {
_simpleFields = simpleFields;
_startTimeMsCached = false;
_endTimeMsCached = false;
}

public long getStartTimeMs() {
if (!_startTimeMsCached) {
long startTimeMs = -1;
Expand Down Expand Up @@ -370,6 +386,25 @@ public Map<String, String> toMap() {
return metadataMap;
}

public String toJsonString() {
ObjectNode objectNode = JsonUtils.newObjectNode();
objectNode.put(SEGMENT_NAME_KEY, getSegmentName());
objectNode.set(SIMPLE_FIELDS_KEY, JsonUtils.objectToJsonNode(_simpleFields));
return objectNode.toString();
}

public static SegmentZKMetadata fromJsonString(String jsonString)
throws IOException {
JsonNode jsonNode = JsonUtils.stringToJsonNode(jsonString);
String segmentName = jsonNode.get(SEGMENT_NAME_KEY).asText();
JsonNode simpleFieldsJsonNode = jsonNode.get(SIMPLE_FIELDS_KEY);
Map<String, String> simpleFields = JsonUtils.jsonNodeToObject(simpleFieldsJsonNode, new TypeReference<>() {
});
ZNRecord znRecord = new ZNRecord(segmentName);
znRecord.setSimpleFields(simpleFields);
return new SegmentZKMetadata(znRecord);
}

@Override
public ZNRecord toZNRecord() {
// Convert to TreeMap to keep the keys sorted. The de-serialized ZNRecord has simple fields stored as LinkedHashMap.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.util;
package org.apache.pinot.common.metadata.segment;

import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.joda.time.Interval;


public class ZKMetadataUtils {
private ZKMetadataUtils() {
public class SegmentZKMetadataUtils {
private SegmentZKMetadataUtils() {
}

/**
Expand All @@ -47,7 +47,7 @@ public static SegmentZKMetadata createSegmentZKMetadata(String tableNameWithType
String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentMetadata.getName());
updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, crypterName,
segmentSizeInBytes, true);
segmentSizeInBytes, null, true);
return segmentZKMetadata;
}

Expand All @@ -57,7 +57,16 @@ public static SegmentZKMetadata createSegmentZKMetadata(String tableNameWithType
public static void refreshSegmentZKMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata,
SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes) {
updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, crypterName,
segmentSizeInBytes, false);
segmentSizeInBytes, null, false);
}

/**
* Updates the segment ZK metadata for a committing segment.
*/
public static void updateCommittingSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmentZKMetadata,
SegmentMetadata segmentMetadata, String downloadUrl, long segmentSizeInBytes, String endOffset) {
updateSegmentZKMetadata(realtimeTableName, segmentZKMetadata, segmentMetadata, downloadUrl, null,
segmentSizeInBytes, endOffset, false);
}

public static void updateSegmentZKTimeInterval(SegmentZKMetadata segmentZKMetadata,
Expand All @@ -77,47 +86,96 @@ public static void updateSegmentZKTimeInterval(SegmentZKMetadata segmentZKMetada

private static void updateSegmentZKMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata,
SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes,
boolean newSegment) {
if (newSegment) {
segmentZKMetadata.setPushTime(System.currentTimeMillis());
} else {
segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
}
@Nullable String endOffset, boolean newSegment) {
String segmentName = segmentZKMetadata.getSegmentName();

if (segmentMetadata.getTimeInterval() != null) {
segmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
segmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
segmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
ColumnMetadata timeColumnMetadata = segmentMetadata.getColumnMetadataFor(segmentMetadata.getTimeColumn());
if (isValidTimeMetadata(timeColumnMetadata)) {
segmentZKMetadata.setRawStartTime(timeColumnMetadata.getMinValue().toString());
segmentZKMetadata.setRawEndTime(timeColumnMetadata.getMaxValue().toString());
if (endOffset != null) {
// For committing segment

segmentZKMetadata.setEndOffset(endOffset);
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);

// For committing segment, use current time as start/end time if total docs is 0
if (segmentMetadata.getTotalDocs() > 0) {
Interval timeInterval = segmentMetadata.getTimeInterval();
Preconditions.checkState(timeInterval != null, "Failed to find time info for table: %s, segment: %s",
tableNameWithType, segmentName);
segmentZKMetadata.setStartTime(timeInterval.getStartMillis());
segmentZKMetadata.setEndTime(timeInterval.getEndMillis());
} else {
long now = System.currentTimeMillis();
segmentZKMetadata.setStartTime(now);
segmentZKMetadata.setEndTime(now);
}
segmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
} else {
segmentZKMetadata.setStartTime(-1);
segmentZKMetadata.setEndTime(-1);
segmentZKMetadata.setTimeUnit(null);
// For uploaded segment

// Set segment status, start/end offset info for real-time table
if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);

// For new segment, start/end offset must exist if the segment name follows LLC segment name convention
if (newSegment && LLCSegmentName.isLLCSegment(segmentMetadata.getName())) {
Preconditions.checkArgument(
segmentMetadata.getStartOffset() != null && segmentMetadata.getEndOffset() != null,
"New uploaded LLC segment must have start/end offset in the segment metadata");
}

// NOTE:
// - If start/end offset is available in the uploaded segment, update them in the segment ZK metadata
// - If not, keep the existing start/end offset in the segment ZK metadata unchanged
if (segmentMetadata.getStartOffset() != null) {
segmentZKMetadata.setStartOffset(segmentMetadata.getStartOffset());
}
if (segmentMetadata.getEndOffset() != null) {
segmentZKMetadata.setEndOffset(segmentMetadata.getEndOffset());
}
}

segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
if (newSegment) {
segmentZKMetadata.setPushTime(System.currentTimeMillis());
} else {
segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
}
Interval timeInterval = segmentMetadata.getTimeInterval();
if (timeInterval != null) {
segmentZKMetadata.setStartTime(timeInterval.getStartMillis());
segmentZKMetadata.setEndTime(timeInterval.getEndMillis());
segmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
ColumnMetadata timeColumnMetadata = segmentMetadata.getColumnMetadataFor(segmentMetadata.getTimeColumn());
if (isValidTimeMetadata(timeColumnMetadata)) {
segmentZKMetadata.setRawStartTime(timeColumnMetadata.getMinValue().toString());
segmentZKMetadata.setRawEndTime(timeColumnMetadata.getMaxValue().toString());
}
} else {
segmentZKMetadata.setStartTime(-1);
segmentZKMetadata.setEndTime(-1);
segmentZKMetadata.setTimeUnit(null);
}
}
segmentZKMetadata.setIndexVersion(
segmentMetadata.getVersion() != null ? segmentMetadata.getVersion().name() : null);

SegmentVersion segmentVersion = segmentMetadata.getVersion();
segmentZKMetadata.setIndexVersion(segmentVersion != null ? segmentVersion.toString() : null);
segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
segmentZKMetadata.setDownloadUrl(downloadUrl);
segmentZKMetadata.setCrypterName(crypterName);
segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);

// Set partition metadata
Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
segmentMetadata.getColumnMetadataMap().forEach((column, columnMetadata) -> {
for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
ColumnMetadata columnMetadata = entry.getValue();
PartitionFunction partitionFunction = columnMetadata.getPartitionFunction();
if (partitionFunction != null) {
ColumnPartitionMetadata columnPartitionMetadata =
new ColumnPartitionMetadata(partitionFunction.getName(), partitionFunction.getNumPartitions(),
columnMetadata.getPartitions(), partitionFunction.getFunctionConfig());
columnPartitionMap.put(column, columnPartitionMetadata);
columnPartitionMap.put(entry.getKey(), columnPartitionMetadata);
}
});
}
segmentZKMetadata.setPartitionMetadata(
!columnPartitionMap.isEmpty() ? new SegmentPartitionMetadata(columnPartitionMap) : null);

Expand All @@ -130,27 +188,6 @@ private static void updateSegmentZKMetadata(String tableNameWithType, SegmentZKM
customMap.putAll(segmentMetadata.getCustomMap());
}
segmentZKMetadata.setCustomMap(customMap);

// Set fields specific to realtime table
if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);

// For new segment, start/end offset must exist if the segment name follows LLC segment name convention
if (newSegment && LLCSegmentName.isLLCSegment(segmentMetadata.getName())) {
Preconditions.checkArgument(segmentMetadata.getStartOffset() != null && segmentMetadata.getEndOffset() != null,
"New uploaded LLC segment must have start/end offset in the segment metadata");
}

// NOTE:
// - If start/end offset is available in the uploaded segment, update them in the segment ZK metadata
// - If not, keep the existing start/end offset in the segment ZK metadata unchanged
if (segmentMetadata.getStartOffset() != null) {
segmentZKMetadata.setStartOffset(segmentMetadata.getStartOffset());
}
if (segmentMetadata.getEndOffset() != null) {
segmentZKMetadata.setEndOffset(segmentMetadata.getEndOffset());
}
}
}

private static boolean isValidTimeMetadata(ColumnMetadata timeColumnMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public class TableLLCSegmentUploadResponse {
private final long _crc;
private final String _downloadUrl;

public TableLLCSegmentUploadResponse(@JsonProperty("segmentName") String segmentName,
@JsonProperty("crc") Long crc, @JsonProperty("downloadUrl") String downloadUrl) {
public TableLLCSegmentUploadResponse(@JsonProperty("segmentName") String segmentName, @JsonProperty("crc") long crc,
@JsonProperty("downloadUrl") String downloadUrl) {
_segmentName = segmentName;
_crc = crc;
_downloadUrl = downloadUrl;
Expand All @@ -37,7 +37,7 @@ public String getSegmentName() {
return _segmentName;
}

public Long getCrc() {
public long getCrc() {
return _crc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse;
Expand Down Expand Up @@ -991,6 +992,30 @@ public TableLLCSegmentUploadResponse uploadLLCToSegmentStore(String uri)
return tableLLCSegmentUploadResponse;
}

/**
* Used by controllers to send requests to servers: Controller periodic task uses this endpoint to ask servers
* to upload committed llc segment to segment store if missing.
* @param uri The uri to ask servers to upload segment to segment store
* @return {@link SegmentZKMetadata} - segment download url, crc, other metadata
* @throws URISyntaxException
* @throws IOException
* @throws HttpErrorStatusException
*/
public SegmentZKMetadata uploadLLCToSegmentStoreWithZKMetadata(String uri)
throws URISyntaxException, IOException, HttpErrorStatusException {
ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(new URI(uri)).setVersion(HttpVersion.HTTP_1_1);
// sendRequest checks the response status code
SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(
_httpClient.sendRequest(requestBuilder.build(), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS));
SegmentZKMetadata segmentZKMetadata = SegmentZKMetadata.fromJsonString(response.getResponse());
if (StringUtils.isEmpty(segmentZKMetadata.getDownloadUrl())) {
throw new HttpErrorStatusException(
String.format("Returned segment download url is empty after requesting servers to upload by the path: %s",
uri), response.getStatusCode());
}
return segmentZKMetadata;
}

/**
* Send segment uri.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pinot.common.utils;

import java.util.Optional;
import javax.validation.constraints.NotNull;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
Expand All @@ -39,8 +39,11 @@ private PauselessConsumptionUtils() {
* @return true if pauseless consumption is explicitly enabled, false otherwise
* @throws NullPointerException if tableConfig is null
*/
public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) {
return Optional.ofNullable(tableConfig.getIngestionConfig()).map(IngestionConfig::getStreamIngestionConfig)
.map(StreamIngestionConfig::isPauselessConsumptionEnabled).orElse(false);
public static boolean isPauselessEnabled(@Nullable TableConfig tableConfig) {
return Optional.ofNullable(tableConfig)
.map(TableConfig::getIngestionConfig)
.map(IngestionConfig::getStreamIngestionConfig)
.map(StreamIngestionConfig::isPauselessConsumptionEnabled)
.orElse(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,7 @@ private void setUpPinotController() {

// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
_pinotLLCRealtimeSegmentManager =
new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics);
_pinotLLCRealtimeSegmentManager = createPinotLLCRealtimeSegmentManager();
// TODO: Need to put this inside HelixResourceManager when HelixControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);

Expand Down Expand Up @@ -623,6 +622,10 @@ protected void configure() {
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
}

protected PinotLLCRealtimeSegmentManager createPinotLLCRealtimeSegmentManager() {
return new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics);
}

/**
* This method is used to fix table/schema names.
* TODO: in the next release, maybe 2.0.0, we can remove this method. Meanwhile we can delete the orphan schemas
Expand Down
Loading

0 comments on commit 3525116

Please sign in to comment.