From 3c929c0759eeefb3a3d9c01492dbe6b594b38216 Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Thu, 13 Feb 2025 15:19:26 +0800 Subject: [PATCH] Fix an assertion error in the sized hash join (#12092) close https://github.com/NVIDIA/spark-rapids/issues/12091 This PR tries to fix the assertion error in the sized hash join by explicitly concatenating multiple batches into a single one. The original code expects only one batch in the input iterator for the build side of a small join, but it is not true when split-retry happens in the previous operator (e.g. GpuHashAggregate), and the GPU path in this sized join does not concatenate these small batches, instead passes them directly down to this `getSingleBuildBatch` function. Then the assertion fails. One more thing, in the failing case, the join has an exchange and an aggregate as the children. --------- Signed-off-by: Firestarman --- .../rapids/GpuShuffledSizedHashJoinExec.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala index 177710fea81..50a463490bc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -886,7 +886,7 @@ object GpuShuffledAsymmetricHashJoinExec { exprs.buildSideNeedsNullFilter, metrics) JoinInfo(joinType, buildSide, buildIter, buildSize, None, streamIter, exprs) } else { - val buildBatch = getSingleBuildBatch(baseBuildIter, exprs, metrics) + val buildBatch = getAsSingleBuildBatch(baseBuildIter, exprs, metrics) val buildIter = new SingleGpuColumnarBatchIterator(buildBatch) val buildStats = JoinBuildSideStats.fromBatch(buildBatch, exprs.boundBuildKeys) if (buildStats.streamMagnificationFactor < magnificationThreshold) { @@ -1023,15 +1023,26 @@ object GpuShuffledAsymmetricHashJoinExec { } } - private def getSingleBuildBatch( + private def getAsSingleBuildBatch( baseIter: Iterator[ColumnarBatch], exprs: BoundJoinExprs, metrics: Map[String, GpuMetric]): ColumnarBatch = { val iter = addNullFilterIfNecessary(baseIter, exprs.boundBuildKeys, exprs.buildSideNeedsNullFilter, metrics) - closeOnExcept(iter.next()) { batch => - assert(!iter.hasNext) - batch + // Multiple small batches may exist when split-retry happens in the previous op. + // So need to concat them into a single one + val spBatches = mutable.Queue.empty[SpillableColumnarBatch] + closeOnExcept(spBatches) { _ => + while(iter.hasNext) { + spBatches.enqueue( + SpillableColumnarBatch(iter.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + } + } + assert(spBatches.nonEmpty, "At least one batch is expected") + val cbTypes = spBatches.head.dataTypes + withRetryNoSplit(spBatches.toSeq) { _ => + ConcatAndConsumeAll.buildNonEmptyBatchFromTypes( + spBatches.toArray.safeMap(_.getColumnarBatch()), cbTypes) } } }