From ffd5227543c899ab44d2f258adb75f4b0eb57b03 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 3 Sep 2020 14:17:34 +0000 Subject: [PATCH] [SPARK-32730][SQL] Improve LeftSemi and Existence SortMergeJoin right side buffering ### What changes were proposed in this pull request? LeftSemi and Existence SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens. ### Why are the changes needed? Performance improvement. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT and TPCDS benchmarks. Closes #29572 from peter-toth/SPARK-32730-improve-leftsemi-sortmergejoin. Authored-by: Peter Toth Signed-off-by: Wenchen Fan --- .../execution/joins/SortMergeJoinExec.scala | 19 +++++++++++++------ .../org/apache/spark/sql/JoinSuite.scala | 8 ++++++++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 6e7bcb8825..097ea61f13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -251,7 +251,8 @@ case class SortMergeJoinExec( RowIterator.fromScala(rightIter), inMemoryThreshold, spillThreshold, - cleanupResources + cleanupResources, + condition.isEmpty ) private[this] val joinRow = new JoinedRow @@ -330,7 +331,8 @@ case class SortMergeJoinExec( RowIterator.fromScala(rightIter), inMemoryThreshold, spillThreshold, - cleanupResources + cleanupResources, + condition.isEmpty ) private[this] val joinRow = new JoinedRow @@ -653,6 +655,7 @@ case class SortMergeJoinExec( * internal buffer * @param spillThreshold Threshold for number of rows to be spilled by internal buffer * @param eagerCleanupResources the eager cleanup function to be invoked when no join row found + * @param onlyBufferFirstMatch [[bufferMatchingRows]] should buffer only the first matching row */ private[joins] class SortMergeJoinScanner( streamedKeyGenerator: Projection, @@ -662,7 +665,8 @@ private[joins] class SortMergeJoinScanner( bufferedIter: RowIterator, inMemoryThreshold: Int, spillThreshold: Int, - eagerCleanupResources: () => Unit) { + eagerCleanupResources: () => Unit, + onlyBufferFirstMatch: Boolean = false) { private[this] var streamedRow: InternalRow = _ private[this] var streamedRowKey: InternalRow = _ private[this] var bufferedRow: InternalRow = _ @@ -673,8 +677,9 @@ private[joins] class SortMergeJoinScanner( */ private[this] var matchJoinKey: InternalRow = _ /** Buffered rows from the buffered side of the join. This is empty if there are no matches. */ - private[this] val bufferedMatches = - new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) + private[this] val bufferedMatches: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(if (onlyBufferFirstMatch) 1 else inMemoryThreshold, + spillThreshold) // Initialization (note: do _not_ want to advance streamed here). advancedBufferedToRowWithNullFreeJoinKey() @@ -834,7 +839,9 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = streamedRowKey.copy() bufferedMatches.clear() do { - bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow]) + if (!onlyBufferFirstMatch || bufferedMatches.isEmpty) { + bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow]) + } advancedBufferedToRowWithNullFreeJoinKey() } while (bufferedRow != null && keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 93cd847132..942cf24a3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -749,6 +749,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan ) } + // LEFT SEMI JOIN without bound condition does not spill + assertNotSpilled(sparkContext, "left semi join") { + checkAnswer( + sql("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a WHERE key = 2"), + Row(2, "2") :: Nil + ) + } + val expected = new ListBuffer[Row]() expected.append( Row(1, "1", 1, 1), Row(1, "1", 1, 2),