Skip to content

Commit

Permalink
Removing checks on default crc and replacing them with segment status…
Browse files Browse the repository at this point in the history
… checks
  • Loading branch information
9aman committed Jan 15, 2025
1 parent ce3c851 commit 2b7e26a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class SegmentZKMetadata implements ZKMetadata {
private boolean _endTimeMsCached;
private long _endTimeMs;

public static final long DEFAULT_CRC_VALUE = -1;

public SegmentZKMetadata(String segmentName) {
_znRecord = new ZNRecord(segmentName);
Expand Down Expand Up @@ -153,7 +152,7 @@ public long getSizeInBytes() {
}

public long getCrc() {
return _znRecord.getLongField(Segment.CRC, DEFAULT_CRC_VALUE);
return _znRecord.getLongField(Segment.CRC, -1);
}

public void setCrc(long crc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,6 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
LOGGER.info("Updating Idealstate for previous: {} and new segment: {}", committingSegmentName,
newConsumingSegmentName);
long startTimeNs3 = System.nanoTime();
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);

// When multiple segments of the same table complete around the same time it is possible that
// the idealstate update fails due to contention. We serialize the updates to the idealstate
Expand Down Expand Up @@ -1757,8 +1753,7 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
if (uploadedSegmentZKMetadata.getCrc() != segmentZKMetadata.getCrc()) {
LOGGER.info("Updating segment {} crc in ZK to be {} from previous {}", segmentZKMetadata.getSegmentName(),
uploadedSegmentZKMetadata.getCrc(), segmentZKMetadata.getCrc());
updateSegmentMetadata(segmentZKMetadata, uploadedSegmentZKMetadata,
PauselessConsumptionUtils.isPauselessEnabled(tableConfig));
updateSegmentMetadata(segmentZKMetadata, uploadedSegmentZKMetadata);
}
} catch (Exception e) {
// this is a fallback call for backward compatibility to the original API /upload in pinot-server
Expand Down Expand Up @@ -1799,17 +1794,21 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
}

/**
* Updates the segment metadata in ZooKeeper. For pauseless consumption, if the segment's CRC value is -1
* (DEFAULT_CRC_VALUE) which indicates a failed segment commit end metadata call, additional metadata fields
* are also updated along with CRC.
* Updates the segment metadata in ZooKeeper with information from the uploaded segment.
*
* For pauseless consumption scenarios:
* - When segment status is COMMITTING, it indicates a previous segment commit metadata update failed
* - In this case, we perform a full metadata update including time boundaries, index details, and partition info
* - Finally, the segment status is marked as DONE to indicate successful completion
*
* For regular consumption:
* - Only the CRC value is updated
*
* @param segmentZKMetadata Current segment metadata in ZooKeeper that needs to be updated
* @param uploadedSegmentZKMetadata Metadata from the uploaded segment containing new values
* @param isPauselessEnabled Flag indicating if pauseless consumption is enabled for the table
* @param segmentZKMetadata Current segment metadata stored in ZooKeeper that needs to be updated
* @param uploadedSegmentZKMetadata New metadata from the successfully uploaded segment
*/
private void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata, SegmentZKMetadata uploadedSegmentZKMetadata,
boolean isPauselessEnabled) {
if (isPauselessEnabled && segmentZKMetadata.getCrc() == SegmentZKMetadata.DEFAULT_CRC_VALUE) {
private void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata, SegmentZKMetadata uploadedSegmentZKMetadata) {
if (segmentZKMetadata.getStatus() == Status.COMMITTING) {
LOGGER.info("Updating additional metadata in ZK for segment {} as pauseless is enabled",
segmentZKMetadata.getSegmentName());
segmentZKMetadata.setStartTime(uploadedSegmentZKMetadata.getStartTimeMs());
Expand Down

0 comments on commit 2b7e26a

Please sign in to comment.