Skip to content

Commit

Permalink
address comment, avoid reading replicate peer when read skew partitio…
Browse files Browse the repository at this point in the history
…n without mapRange
  • Loading branch information
wangshengjie123 committed Dec 22, 2024
1 parent 583cb67 commit 57813ab
Showing 1 changed file with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public static CelebornInputStream create(
} else {
// if startMapIndex > endMapIndex, means partition is skew partition.
// locations will split to sub-partitions with startMapIndex size.
boolean splitSkewPartitionWithoutMapRange =
conf.clientAdaptiveOptimizeSkewedPartitionReadEnabled() && startMapIndex > endMapIndex;
if (splitSkewPartitionWithoutMapRange) {
boolean readSkewPartitionWithoutMapRange =
readSkewPartitionWithoutMapRange(conf, startMapIndex, endMapIndex);
if (readSkewPartitionWithoutMapRange) {
return new CelebornInputStreamImpl(
conf,
clientFactory,
Expand All @@ -93,7 +93,7 @@ public static CelebornInputStream create(
shuffleId,
partitionId,
exceptionMaker,
splitSkewPartitionWithoutMapRange,
true,
metricsCallback);
} else {
return new CelebornInputStreamImpl(
Expand All @@ -114,7 +114,7 @@ public static CelebornInputStream create(
shuffleId,
partitionId,
exceptionMaker,
splitSkewPartitionWithoutMapRange,
false,
metricsCallback);
}
}
Expand All @@ -124,6 +124,11 @@ public static CelebornInputStream empty() {
return emptyInputStream;
}

public static boolean readSkewPartitionWithoutMapRange(
CelebornConf conf, int startMapIndex, int endMapIndex) {
return conf.clientAdaptiveOptimizeSkewedPartitionReadEnabled() && startMapIndex > endMapIndex;
}

private static final CelebornInputStream emptyInputStream =
new CelebornInputStream() {
@Override
Expand Down Expand Up @@ -205,7 +210,7 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
private ExceptionMaker exceptionMaker;
private boolean closed = false;

private final boolean splitSkewPartitionWithoutMapRange;
private final boolean readSkewPartitionWithoutMapRange;

CelebornInputStreamImpl(
CelebornConf conf,
Expand Down Expand Up @@ -266,7 +271,7 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
int shuffleId,
int partitionId,
ExceptionMaker exceptionMaker,
boolean splitSkewPartitionWithoutMapRange,
boolean readSkewPartitionWithoutMapRange,
MetricsCallback metricsCallback)
throws IOException {
this.conf = conf;
Expand All @@ -288,7 +293,7 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
!conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
this.fetchExcludedWorkerExpireTimeout = conf.clientFetchExcludedWorkerExpireTimeout();
this.failedBatches = failedBatchSet;
this.splitSkewPartitionWithoutMapRange = splitSkewPartitionWithoutMapRange;
this.readSkewPartitionWithoutMapRange = readSkewPartitionWithoutMapRange;
this.fetchExcludedWorkers = fetchExcludedWorkers;

if (conf.clientPushReplicateEnabled()) {
Expand Down Expand Up @@ -338,9 +343,9 @@ private Tuple2<PartitionLocation, PbStreamHandler> nextReadableLocation() {
}
PartitionLocation currentLocation = locations.get(fileIndex);
// if pushShuffleFailureTrackingEnabled is true, should not skip location
while ((splitSkewPartitionWithoutMapRange
while ((readSkewPartitionWithoutMapRange
&& !partitionLocationToChunkRange.containsKey(currentLocation.getUniqueId()))
|| (!splitSkewPartitionWithoutMapRange
|| (!readSkewPartitionWithoutMapRange
&& skipLocation(startMapIndex, endMapIndex, currentLocation))) {
skipCount.increment();
fileIndex++;
Expand Down Expand Up @@ -406,7 +411,10 @@ private boolean isExcluded(PartitionLocation location) {
private PartitionReader createReaderWithRetry(
PartitionLocation location, PbStreamHandler pbStreamHandler) throws IOException {
// For the first time, the location will be selected according to attemptNumber
if (fetchChunkRetryCnt == 0 && attemptNumber % 2 == 1 && location.hasPeer()) {
if (fetchChunkRetryCnt == 0
&& attemptNumber % 2 == 1
&& location.hasPeer()
&& !readSkewPartitionWithoutMapRange) {
location = location.getPeer();
logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
}
Expand All @@ -421,7 +429,7 @@ private PartitionReader createReaderWithRetry(
lastException = e;
shuffleClient.excludeFailedFetchLocation(location.hostAndFetchPort(), e);
fetchChunkRetryCnt++;
if (location.hasPeer()) {
if (location.hasPeer() && !readSkewPartitionWithoutMapRange) {
// fetchChunkRetryCnt % 2 == 0 means both replicas have been tried,
// so sleep before next try.
if (fetchChunkRetryCnt % 2 == 0) {
Expand Down Expand Up @@ -470,7 +478,7 @@ private ByteBuf getNextChunk() throws IOException {
+ currentReader.getLocation(),
e);
} else {
if (currentReader.getLocation().hasPeer()) {
if (currentReader.getLocation().hasPeer() && !readSkewPartitionWithoutMapRange) {
logger.warn(
"Fetch chunk failed {}/{} times for location {}, change to peer",
fetchChunkRetryCnt,
Expand Down Expand Up @@ -707,7 +715,7 @@ private boolean fillBuffer() throws IOException {

// de-duplicate
if (attemptId == attempts[mapId]) {
if (splitSkewPartitionWithoutMapRange) {
if (readSkewPartitionWithoutMapRange) {
Set<PushFailedBatch> failedBatchSet =
this.failedBatches.get(currentReader.getLocation().getUniqueId());
if (null != failedBatchSet) {
Expand Down

0 comments on commit 57813ab

Please sign in to comment.