From 09522d6d725b6e89c6bd4ebf398b77d482ce09d1 Mon Sep 17 00:00:00 2001 From: wangshengjie3 Date: Wed, 27 Nov 2024 16:51:47 +0800 Subject: [PATCH] update spark patch to abort stage when rerun skew join stage --- ...rn-Optimize-Skew-Partitions-spark3_2.patch | 33 ++++++----- ...rn-Optimize-Skew-Partitions-spark3_3.patch | 46 ++++++++++----- ...rn-Optimize-Skew-Partitions-spark3_4.patch | 58 ++++++++++--------- ...rn-Optimize-Skew-Partitions-spark3_5.patch | 34 ++++++----- 4 files changed, 101 insertions(+), 70 deletions(-) diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch index 536422af2ac..dcaec1178be 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch @@ -35,7 +35,7 @@ index e469c9989f2..245d9b3b9de 100644 /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -index b950c07f3d8..e9e10bb647f 100644 +index b950c07f3d8..d081b4642c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1369,7 +1369,10 @@ private[spark] class DAGScheduler( @@ -50,21 +50,20 @@ index b950c07f3d8..e9e10bb647f 100644 mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId) sms.shuffleDep.newShuffleMergeState() case _ => -@@ -1780,7 +1783,8 @@ private[spark] class DAGScheduler( +@@ -1780,7 +1783,7 @@ private[spark] class DAGScheduler( failedStage.failedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest -+ disallowStageRetryForTest || (failedStage.isInstanceOf[ResultStage] -+ && mapOutputTracker.skewShuffleIds.contains(shuffleId)) ++ disallowStageRetryForTest || mapOutputTracker.skewShuffleIds.contains(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -index 6bc8ba4eebb..2e7d87c96eb 100644 +index 6bc8ba4eebb..44db30dbaec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -@@ -3431,6 +3431,12 @@ object SQLConf { +@@ -3431,6 +3431,19 @@ object SQLConf { .booleanConf .createWithDefault(false) @@ -73,16 +72,25 @@ index 6bc8ba4eebb..2e7d87c96eb 100644 + .version("3.0.0") + .booleanConf + .createWithDefault(false) ++ ++ val CELEBORN_STAGE_RERUN_ENABLED = ++ buildConf("spark.celeborn.client.spark.stageRerun.enabled") ++ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure") ++ .version("3.0.0") ++ .booleanConf ++ .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * -@@ -4154,6 +4160,9 @@ class SQLConf extends Serializable with Logging { +@@ -4154,6 +4167,11 @@ class SQLConf extends Serializable with Logging { def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG) + def celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean = + getConf(SQLConf.CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ) ++ ++ def celebornStageRerunEnabled: Boolean = getConf(SQLConf.CELEBORN_STAGE_RERUN_ENABLED) + /** ********************** SQLConf functionality methods ************ */ @@ -189,7 +197,7 @@ index 88abe68197b..150699a84a3 100644 logDebug(s"Right side partition $partitionIndex " + s"(${FileUtils.byteCountToDisplaySize(rightSize)}) is skewed, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala -index 3609548f374..f7c6d5dda90 100644 +index 3609548f374..59c80198f19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer @@ -200,7 +208,7 @@ index 3609548f374..f7c6d5dda90 100644 object ShufflePartitionsUtil extends Logging { final val SMALL_PARTITION_FACTOR = 0.2 -@@ -376,11 +377,25 @@ object ShufflePartitionsUtil extends Logging { +@@ -376,11 +377,22 @@ object ShufflePartitionsUtil extends Logging { def createSkewPartitionSpecs( shuffleId: Int, reducerId: Int, @@ -215,10 +223,7 @@ index 3609548f374..f7c6d5dda90 100644 + SparkEnv.get.conf.get("spark.shuffle.manager", "sort").contains("celeborn") && + SQLConf.get.celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle + -+ val throwsFetchFailure = SparkEnv.get -+ .conf -+ .get("spark.celeborn.client.spark.fetch.throwsFetchFailure", "false") -+ .toBoolean ++ val throwsFetchFailure = SQLConf.get.celebornStageRerunEnabled + if (throwsFetchFailure && isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed") + val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] @@ -227,7 +232,7 @@ index 3609548f374..f7c6d5dda90 100644 Some(mapStartIndices.indices.map { i => val startMapIndex = mapStartIndices(i) val endMapIndex = if (i == mapStartIndices.length - 1) { -@@ -388,8 +403,20 @@ object ShufflePartitionsUtil extends Logging { +@@ -388,8 +400,20 @@ object ShufflePartitionsUtil extends Logging { } else { mapStartIndices(i + 1) } diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch index 8ba4978b382..3c194f8ce8d 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch @@ -35,7 +35,7 @@ index b1974948430..0dc92ec44a8 100644 /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -index bd2823bcac1..5d81b9de5b6 100644 +index bd2823bcac1..4f40becadc7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1404,7 +1404,10 @@ private[spark] class DAGScheduler( @@ -50,21 +50,20 @@ index bd2823bcac1..5d81b9de5b6 100644 mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId) sms.shuffleDep.newShuffleMergeState() case _ => -@@ -1851,7 +1854,8 @@ private[spark] class DAGScheduler( +@@ -1851,7 +1854,7 @@ private[spark] class DAGScheduler( failedStage.failedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest -+ disallowStageRetryForTest || (failedStage.isInstanceOf[ResultStage] -+ && mapOutputTracker.skewShuffleIds.contains(shuffleId)) ++ disallowStageRetryForTest || mapOutputTracker.skewShuffleIds.contains(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -index af03ad9a4cb..7a3ee9ebfaf 100644 +index af03ad9a4cb..6c36fb96d58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -@@ -3784,6 +3784,12 @@ object SQLConf { +@@ -3784,6 +3784,19 @@ object SQLConf { .booleanConf .createWithDefault(false) @@ -73,16 +72,25 @@ index af03ad9a4cb..7a3ee9ebfaf 100644 + .version("3.0.0") + .booleanConf + .createWithDefault(false) ++ ++ val CELEBORN_STAGE_RERUN_ENABLED = ++ buildConf("spark.celeborn.client.spark.stageRerun.enabled") ++ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure") ++ .version("3.0.0") ++ .booleanConf ++ .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * -@@ -4549,6 +4555,9 @@ class SQLConf extends Serializable with Logging { +@@ -4549,6 +4562,11 @@ class SQLConf extends Serializable with Logging { def histogramNumericPropagateInputType: Boolean = getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE) + def celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean = + getConf(SQLConf.CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ) ++ ++ def celebornStageRerunEnabled: Boolean = getConf(SQLConf.CELEBORN_STAGE_RERUN_ENABLED) + /** ********************** SQLConf functionality methods ************ */ @@ -190,17 +198,25 @@ index d4a173bb9cc..21ef335e064 100644 logDebug(s"Right side partition $partitionIndex " + s"(${FileUtils.byteCountToDisplaySize(rightSize)}) is skewed, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala -index af689db3379..9d9f9c994b9 100644 +index af689db3379..529097549ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala -@@ -380,13 +380,27 @@ object ShufflePartitionsUtil extends Logging { +@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer + import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} + import org.apache.spark.internal.Logging + import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec} ++import org.apache.spark.sql.internal.SQLConf + + object ShufflePartitionsUtil extends Logging { + final val SMALL_PARTITION_FACTOR = 0.2 +@@ -380,13 +381,23 @@ object ShufflePartitionsUtil extends Logging { shuffleId: Int, reducerId: Int, targetSize: Long, - smallPartitionFactor: Double = SMALL_PARTITION_FACTOR) +- : Option[Seq[PartialReducerPartitionSpec]] = { + smallPartitionFactor: Double = SMALL_PARTITION_FACTOR, -+ isCelebornShuffle: Boolean = false) - : Option[Seq[PartialReducerPartitionSpec]] = { ++ isCelebornShuffle: Boolean = false): Option[Seq[PartialReducerPartitionSpec]] = { val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId) if (mapPartitionSizes.exists(_ < 0)) return None val mapStartIndices = splitSizeListByTargetSize( @@ -210,10 +226,7 @@ index af689db3379..9d9f9c994b9 100644 + SparkEnv.get.conf.get("spark.shuffle.manager", "sort").contains("celeborn") && + SQLConf.get.celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle + -+ val throwsFetchFailure = SparkEnv.get -+ .conf -+ .get("spark.celeborn.client.spark.fetch.throwsFetchFailure", "false") -+ .toBoolean ++ val throwsFetchFailure = SQLConf.get.celebornStageRerunEnabled + if (throwsFetchFailure && isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed") + val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] @@ -222,11 +235,12 @@ index af689db3379..9d9f9c994b9 100644 Some(mapStartIndices.indices.map { i => val startMapIndex = mapStartIndices(i) val endMapIndex = if (i == mapStartIndices.length - 1) { -@@ -400,7 +414,14 @@ object ShufflePartitionsUtil extends Logging { +@@ -400,7 +411,15 @@ object ShufflePartitionsUtil extends Logging { dataSize += mapPartitionSizes(mapIndex) mapIndex += 1 } - PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) ++ + if (isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + // These `dataSize` variables may not be accurate as they only represent the sum of + // `dataSize` when the Celeborn optimize skewed partition read feature is enabled. diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch index b906e7f37ea..3c194f8ce8d 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch @@ -14,10 +14,10 @@ # limitations under the License. diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala -index fade0b86dd8..3290d9fdf23 100644 +index b1974948430..0dc92ec44a8 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala -@@ -697,6 +697,8 @@ private[spark] class MapOutputTrackerMaster( +@@ -696,6 +696,8 @@ private[spark] class MapOutputTrackerMaster( pool } @@ -26,7 +26,7 @@ index fade0b86dd8..3290d9fdf23 100644 // Make sure that we aren't going to exceed the max RPC message size by making sure // we use broadcast to send large map output statuses. if (minSizeForBroadcast > maxRpcMessageSize) { -@@ -887,6 +889,7 @@ private[spark] class MapOutputTrackerMaster( +@@ -886,6 +888,7 @@ private[spark] class MapOutputTrackerMaster( shuffleStatus.invalidateSerializedMergeOutputStatusCache() } } @@ -35,10 +35,10 @@ index fade0b86dd8..3290d9fdf23 100644 /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -index 26be8c72bbc..c3e1d98c06f 100644 +index bd2823bcac1..4f40becadc7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -@@ -1435,7 +1435,10 @@ private[spark] class DAGScheduler( +@@ -1404,7 +1404,10 @@ private[spark] class DAGScheduler( // The operation here can make sure for the partially completed intermediate stage, // `findMissingPartitions()` returns all partitions every time. stage match { @@ -50,39 +50,47 @@ index 26be8c72bbc..c3e1d98c06f 100644 mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId) sms.shuffleDep.newShuffleMergeState() case _ => -@@ -1897,7 +1900,8 @@ private[spark] class DAGScheduler( - +@@ -1851,7 +1854,7 @@ private[spark] class DAGScheduler( + failedStage.failedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest -+ disallowStageRetryForTest || (failedStage.isInstanceOf[ResultStage] -+ && mapOutputTracker.skewShuffleIds.contains(shuffleId)) ++ disallowStageRetryForTest || mapOutputTracker.skewShuffleIds.contains(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -index be9a7c82828..195fdffd501 100644 +index af03ad9a4cb..6c36fb96d58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -@@ -4208,6 +4208,12 @@ object SQLConf { - .booleanConf - .createWithDefault(false) +@@ -3784,6 +3784,19 @@ object SQLConf { + .booleanConf + .createWithDefault(false) + val CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ = + buildConf("spark.celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled") + .version("3.0.0") + .booleanConf + .createWithDefault(false) ++ ++ val CELEBORN_STAGE_RERUN_ENABLED = ++ buildConf("spark.celeborn.client.spark.stageRerun.enabled") ++ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure") ++ .version("3.0.0") ++ .booleanConf ++ .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * -@@ -5040,6 +5046,9 @@ class SQLConf extends Serializable with Logging { - getConf(SQLConf.LEGACY_NEGATIVE_INDEX_IN_ARRAY_INSERT) - } +@@ -4549,6 +4562,11 @@ class SQLConf extends Serializable with Logging { + def histogramNumericPropagateInputType: Boolean = + getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE) + def celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean = + getConf(SQLConf.CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ) ++ ++ def celebornStageRerunEnabled: Boolean = getConf(SQLConf.CELEBORN_STAGE_RERUN_ENABLED) + /** ********************** SQLConf functionality methods ************ */ @@ -162,7 +170,7 @@ index b34ab3e380b..cb0ed9d05a4 100644 if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) { shuffle diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala -index 37cdea084d8..4694a06919e 100644 +index d4a173bb9cc..21ef335e064 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -152,8 +152,10 @@ case class OptimizeSkewedJoin(ensureRequirements: EnsureRequirements) @@ -190,7 +198,7 @@ index 37cdea084d8..4694a06919e 100644 logDebug(s"Right side partition $partitionIndex " + s"(${FileUtils.byteCountToDisplaySize(rightSize)}) is skewed, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala -index dbed66683b0..c017c6d1229 100644 +index af689db3379..529097549ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer @@ -201,14 +209,14 @@ index dbed66683b0..c017c6d1229 100644 object ShufflePartitionsUtil extends Logging { final val SMALL_PARTITION_FACTOR = 0.2 -@@ -380,13 +381,27 @@ object ShufflePartitionsUtil extends Logging { +@@ -380,13 +381,23 @@ object ShufflePartitionsUtil extends Logging { shuffleId: Int, reducerId: Int, targetSize: Long, - smallPartitionFactor: Double = SMALL_PARTITION_FACTOR) +- : Option[Seq[PartialReducerPartitionSpec]] = { + smallPartitionFactor: Double = SMALL_PARTITION_FACTOR, -+ isCelebornShuffle: Boolean = false) - : Option[Seq[PartialReducerPartitionSpec]] = { ++ isCelebornShuffle: Boolean = false): Option[Seq[PartialReducerPartitionSpec]] = { val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId) if (mapPartitionSizes.exists(_ < 0)) return None val mapStartIndices = splitSizeListByTargetSize( @@ -218,10 +226,7 @@ index dbed66683b0..c017c6d1229 100644 + SparkEnv.get.conf.get("spark.shuffle.manager", "sort").contains("celeborn") && + SQLConf.get.celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle + -+ val throwsFetchFailure = SparkEnv.get -+ .conf -+ .get("spark.celeborn.client.spark.fetch.throwsFetchFailure", "false") -+ .toBoolean ++ val throwsFetchFailure = SQLConf.get.celebornStageRerunEnabled + if (throwsFetchFailure && isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed") + val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] @@ -230,11 +235,12 @@ index dbed66683b0..c017c6d1229 100644 Some(mapStartIndices.indices.map { i => val startMapIndex = mapStartIndices(i) val endMapIndex = if (i == mapStartIndices.length - 1) { -@@ -400,7 +415,14 @@ object ShufflePartitionsUtil extends Logging { +@@ -400,7 +411,15 @@ object ShufflePartitionsUtil extends Logging { dataSize += mapPartitionSizes(mapIndex) mapIndex += 1 } - PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) ++ + if (isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + // These `dataSize` variables may not be accurate as they only represent the sum of + // `dataSize` when the Celeborn optimize skewed partition read feature is enabled. diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch index 65efae79589..66544751433 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch @@ -35,7 +35,7 @@ index 9a7a3b0c0e7..c886263b3eb 100644 /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -index 89d16e57934..dab1eca457e 100644 +index 89d16e57934..24aad10c2a8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1480,7 +1480,10 @@ private[spark] class DAGScheduler( @@ -50,21 +50,20 @@ index 89d16e57934..dab1eca457e 100644 mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId) sms.shuffleDep.newShuffleMergeState() case _ => -@@ -1962,7 +1965,8 @@ private[spark] class DAGScheduler( +@@ -1962,7 +1965,7 @@ private[spark] class DAGScheduler( val shouldAbortStage = failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest -+ disallowStageRetryForTest || (failedStage.isInstanceOf[ResultStage] -+ && mapOutputTracker.skewShuffleIds.contains(shuffleId)) ++ disallowStageRetryForTest || mapOutputTracker.skewShuffleIds.contains(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -index 6f2f0088fcc..706c3ce70db 100644 +index 6f2f0088fcc..3a7b1aabbbc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -@@ -4423,6 +4423,12 @@ object SQLConf { +@@ -4423,6 +4423,19 @@ object SQLConf { .booleanConf .createWithDefault(false) @@ -73,16 +72,25 @@ index 6f2f0088fcc..706c3ce70db 100644 + .version("3.0.0") + .booleanConf + .createWithDefault(false) ++ ++ val CELEBORN_STAGE_RERUN_ENABLED = ++ buildConf("spark.celeborn.client.spark.stageRerun.enabled") ++ .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure") ++ .version("3.0.0") ++ .booleanConf ++ .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * -@@ -5278,6 +5284,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { +@@ -5278,6 +5291,11 @@ class SQLConf extends Serializable with Logging with SqlApiConf { getConf(SQLConf.LEGACY_NEGATIVE_INDEX_IN_ARRAY_INSERT) } + def celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean = + getConf(SQLConf.CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ) ++ ++ def celebornStageRerunEnabled: Boolean = getConf(SQLConf.CELEBORN_STAGE_RERUN_ENABLED) + /** ********************** SQLConf functionality methods ************ */ @@ -190,7 +198,7 @@ index 37cdea084d8..4694a06919e 100644 logDebug(s"Right side partition $partitionIndex " + s"(${FileUtils.byteCountToDisplaySize(rightSize)}) is skewed, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala -index 9370b3d8d1d..a00383e9b83 100644 +index 9370b3d8d1d..28d4b5b8d8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer @@ -201,7 +209,7 @@ index 9370b3d8d1d..a00383e9b83 100644 object ShufflePartitionsUtil extends Logging { final val SMALL_PARTITION_FACTOR = 0.2 -@@ -382,13 +383,26 @@ object ShufflePartitionsUtil extends Logging { +@@ -382,13 +383,23 @@ object ShufflePartitionsUtil extends Logging { shuffleId: Int, reducerId: Int, targetSize: Long, @@ -218,10 +226,7 @@ index 9370b3d8d1d..a00383e9b83 100644 + SparkEnv.get.conf.get("spark.shuffle.manager", "sort").contains("celeborn") && + SQLConf.get.celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle + -+ val throwsFetchFailure = SparkEnv.get -+ .conf -+ .get("spark.celeborn.client.spark.fetch.throwsFetchFailure", "false") -+ .toBoolean ++ val throwsFetchFailure = SQLConf.get.celebornStageRerunEnabled + if (throwsFetchFailure && isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed") + val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] @@ -230,11 +235,12 @@ index 9370b3d8d1d..a00383e9b83 100644 Some(mapStartIndices.indices.map { i => val startMapIndex = mapStartIndices(i) val endMapIndex = if (i == mapStartIndices.length - 1) { -@@ -402,7 +416,14 @@ object ShufflePartitionsUtil extends Logging { +@@ -402,7 +413,15 @@ object ShufflePartitionsUtil extends Logging { dataSize += mapPartitionSizes(mapIndex) mapIndex += 1 } - PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) ++ + if (isCelebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + // These `dataSize` variables may not be accurate as they only represent the sum of + // `dataSize` when the Celeborn optimize skewed partition read feature is enabled.