diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala index a4e5be01b4..d50c7fd283 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -50,7 +50,6 @@ object CoalesceBucketsInJoin extends Rule[SparkPlan] { private def updateNumCoalescedBuckets( join: BaseJoinExec, numLeftBuckets: Int, - numRightBucket: Int, numCoalescedBuckets: Int): BaseJoinExec = { if (numCoalescedBuckets != numLeftBuckets) { val leftCoalescedChild = @@ -72,7 +71,6 @@ object CoalesceBucketsInJoin extends Rule[SparkPlan] { private def isCoalesceSHJStreamSide( join: ShuffledHashJoinExec, numLeftBuckets: Int, - numRightBucket: Int, numCoalescedBuckets: Int): Boolean = { if (numCoalescedBuckets == numLeftBuckets) { join.buildSide != BuildRight @@ -93,12 +91,12 @@ object CoalesceBucketsInJoin extends Rule[SparkPlan] { val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets) join match { case j: SortMergeJoinExec => - updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) + updateNumCoalescedBuckets(j, numLeftBuckets, numCoalescedBuckets) case j: ShuffledHashJoinExec // Only coalesce the buckets for shuffled hash join stream side, // to avoid OOM for build side. - if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => - updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) + if isCoalesceSHJStreamSide(j, numLeftBuckets, numCoalescedBuckets) => + updateNumCoalescedBuckets(j, numLeftBuckets, numCoalescedBuckets) case other => other } case other => other diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index a71bf94c45..b6386d0d11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -35,7 +35,6 @@ import org.apache.spark.util.CompletionIterator class UnsafeCartesianRDD( left : RDD[UnsafeRow], right : RDD[UnsafeRow], - numFieldsOfRight: Int, inMemoryBufferThreshold: Int, spillThreshold: Int) extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) { @@ -81,7 +80,6 @@ case class CartesianProductExec( val pair = new UnsafeCartesianRDD( leftResults, rightResults, - right.output.size, sqlContext.conf.cartesianProductExecBufferInMemoryThreshold, sqlContext.conf.cartesianProductExecBufferSpillThreshold) pair.mapPartitionsWithIndexInternal { (index, iter) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 1dc7a3b7ee..cd57408e79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -117,10 +117,10 @@ case class ShuffledHashJoinExec( val iter = if (hashedRelation.keyIsUnique) { fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, streamNullRow) + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow) } else { fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, streamNullRow) + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow) } val resultProj = UnsafeProjection.create(output, output) @@ -146,8 +146,7 @@ case class ShuffledHashJoinExec( joinRowWithStream: InternalRow => JoinedRow, joinRowWithBuild: InternalRow => JoinedRow, streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow, - streamNullRow: GenericInternalRow): Iterator[InternalRow] = { + buildNullRow: GenericInternalRow): Iterator[InternalRow] = { val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex) longMetric("buildDataSize") += matchedKeys.capacity / 8 @@ -213,8 +212,7 @@ case class ShuffledHashJoinExec( joinRowWithStream: InternalRow => JoinedRow, joinRowWithBuild: InternalRow => JoinedRow, streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow, - streamNullRow: GenericInternalRow): Iterator[InternalRow] = { + buildNullRow: GenericInternalRow): Iterator[InternalRow] = { val matchedRows = new OpenHashSet[Long] TaskContext.get().addTaskCompletionListener[Unit](_ => { // At the end of the task, update the task's memory usage for this