[SPARK-34502][SQL] Remove unused parameters in join methods

### What changes were proposed in this pull request?

Remove unused parameters in `CoalesceBucketsInJoin`, `UnsafeCartesianRDD` and `ShuffledHashJoinExec`.

### Why are the changes needed?
Clean up

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

Closes #31617 from huaxingao/join-minor.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Huaxin Gao 2021-02-23 12:18:43 -08:00 committed by Liang-Chi Hsieh
parent 429f8af9b6
commit 443139b601
3 changed files with 7 additions and 13 deletions

View file

@ -50,7 +50,6 @@ object CoalesceBucketsInJoin extends Rule[SparkPlan] {
private def updateNumCoalescedBuckets(
join: BaseJoinExec,
numLeftBuckets: Int,
numRightBucket: Int,
numCoalescedBuckets: Int): BaseJoinExec = {
if (numCoalescedBuckets != numLeftBuckets) {
val leftCoalescedChild =
@ -72,7 +71,6 @@ object CoalesceBucketsInJoin extends Rule[SparkPlan] {
private def isCoalesceSHJStreamSide(
join: ShuffledHashJoinExec,
numLeftBuckets: Int,
numRightBucket: Int,
numCoalescedBuckets: Int): Boolean = {
if (numCoalescedBuckets == numLeftBuckets) {
join.buildSide != BuildRight
@ -93,12 +91,12 @@ object CoalesceBucketsInJoin extends Rule[SparkPlan] {
val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
join match {
case j: SortMergeJoinExec =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
updateNumCoalescedBuckets(j, numLeftBuckets, numCoalescedBuckets)
case j: ShuffledHashJoinExec
// Only coalesce the buckets for shuffled hash join stream side,
// to avoid OOM for build side.
if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
if isCoalesceSHJStreamSide(j, numLeftBuckets, numCoalescedBuckets) =>
updateNumCoalescedBuckets(j, numLeftBuckets, numCoalescedBuckets)
case other => other
}
case other => other

View file

@ -35,7 +35,6 @@ import org.apache.spark.util.CompletionIterator
class UnsafeCartesianRDD(
left : RDD[UnsafeRow],
right : RDD[UnsafeRow],
numFieldsOfRight: Int,
inMemoryBufferThreshold: Int,
spillThreshold: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
@ -81,7 +80,6 @@ case class CartesianProductExec(
val pair = new UnsafeCartesianRDD(
leftResults,
rightResults,
right.output.size,
sqlContext.conf.cartesianProductExecBufferInMemoryThreshold,
sqlContext.conf.cartesianProductExecBufferSpillThreshold)
pair.mapPartitionsWithIndexInternal { (index, iter) =>

View file

@ -117,10 +117,10 @@ case class ShuffledHashJoinExec(
val iter = if (hashedRelation.keyIsUnique) {
fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, streamNullRow)
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow)
} else {
fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, streamNullRow)
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow)
}
val resultProj = UnsafeProjection.create(output, output)
@ -146,8 +146,7 @@ case class ShuffledHashJoinExec(
joinRowWithStream: InternalRow => JoinedRow,
joinRowWithBuild: InternalRow => JoinedRow,
streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
buildNullRow: GenericInternalRow,
streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
buildNullRow: GenericInternalRow): Iterator[InternalRow] = {
val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
longMetric("buildDataSize") += matchedKeys.capacity / 8
@ -213,8 +212,7 @@ case class ShuffledHashJoinExec(
joinRowWithStream: InternalRow => JoinedRow,
joinRowWithBuild: InternalRow => JoinedRow,
streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
buildNullRow: GenericInternalRow,
streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
buildNullRow: GenericInternalRow): Iterator[InternalRow] = {
val matchedRows = new OpenHashSet[Long]
TaskContext.get().addTaskCompletionListener[Unit](_ => {
// At the end of the task, update the task's memory usage for this