[SPARK-32730][SQL] Improve LeftSemi and Existence SortMergeJoin right side buffering

### What changes were proposed in this pull request?

LeftSemi and Existence SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens.

### Why are the changes needed?

Performance improvement.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT and TPCDS benchmarks.

Closes #29572 from peter-toth/SPARK-32730-improve-leftsemi-sortmergejoin.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Peter Toth 2020-09-03 14:17:34 +00:00 committed by Wenchen Fan
parent 0a6043f683
commit ffd5227543
2 changed files with 21 additions and 6 deletions

View file

@ -251,7 +251,8 @@ case class SortMergeJoinExec(
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold,
cleanupResources
cleanupResources,
condition.isEmpty
)
private[this] val joinRow = new JoinedRow
@ -330,7 +331,8 @@ case class SortMergeJoinExec(
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold,
cleanupResources
cleanupResources,
condition.isEmpty
)
private[this] val joinRow = new JoinedRow
@ -653,6 +655,7 @@ case class SortMergeJoinExec(
* internal buffer
* @param spillThreshold Threshold for number of rows to be spilled by internal buffer
* @param eagerCleanupResources the eager cleanup function to be invoked when no join row found
* @param onlyBufferFirstMatch [[bufferMatchingRows]] should buffer only the first matching row
*/
private[joins] class SortMergeJoinScanner(
streamedKeyGenerator: Projection,
@ -662,7 +665,8 @@ private[joins] class SortMergeJoinScanner(
bufferedIter: RowIterator,
inMemoryThreshold: Int,
spillThreshold: Int,
eagerCleanupResources: () => Unit) {
eagerCleanupResources: () => Unit,
onlyBufferFirstMatch: Boolean = false) {
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
@ -673,8 +677,9 @@ private[joins] class SortMergeJoinScanner(
*/
private[this] var matchJoinKey: InternalRow = _
/** Buffered rows from the buffered side of the join. This is empty if there are no matches. */
private[this] val bufferedMatches =
new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
private[this] val bufferedMatches: ExternalAppendOnlyUnsafeRowArray =
new ExternalAppendOnlyUnsafeRowArray(if (onlyBufferFirstMatch) 1 else inMemoryThreshold,
spillThreshold)
// Initialization (note: do _not_ want to advance streamed here).
advancedBufferedToRowWithNullFreeJoinKey()
@ -834,7 +839,9 @@ private[joins] class SortMergeJoinScanner(
matchJoinKey = streamedRowKey.copy()
bufferedMatches.clear()
do {
bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
if (!onlyBufferFirstMatch || bufferedMatches.isEmpty) {
bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
}
advancedBufferedToRowWithNullFreeJoinKey()
} while (bufferedRow != null && keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0)
}

View file

@ -749,6 +749,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
)
}
// LEFT SEMI JOIN without bound condition does not spill
assertNotSpilled(sparkContext, "left semi join") {
checkAnswer(
sql("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a WHERE key = 2"),
Row(2, "2") :: Nil
)
}
val expected = new ListBuffer[Row]()
expected.append(
Row(1, "1", 1, 1), Row(1, "1", 1, 2),