[SPARK-35760][SQL] Fix the max rows check for broadcast exchange

### What changes were proposed in this pull request?

This is to fix the maximal allowed number of rows check in `BroadcastExchangeExec`. After https://github.com/apache/spark/pull/27828, the max number of rows is calculated based on max capacity of `BytesToBytesMap` (previous value before the PR is 512000000). This calculation is not accurate as only `UnsafeHashedRelation` is using `BytesToBytesMap`. `LongHashedRelation` (used for broadcast join on key with long data type) has limit of [512000000](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L584), and `BroadcastNestedLoopJoinExec` is not depending on `HashedRelation` at all.

The change is to only specialize the max rows limit when needed. Keep other broadcast case with the previous limit - 512000000.

### Why are the changes needed?

Fix code logic and avoid unexpected behavior.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

Closes #32911 from c21/broadcast.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
This commit is contained in:
Cheng Su 2021-06-16 09:36:24 +09:00 committed by Takeshi Yamamuro
parent 864ff67746
commit 9709ee5ffd

View file

@ -32,9 +32,10 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.HashedRelation
import org.apache.spark.sql.execution.joins.{HashedRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.LongType
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.util.{SparkFatalException, ThreadUtils}
@ -105,6 +106,19 @@ case class BroadcastExchangeExec(
@transient
private val timeout: Long = conf.broadcastTimeout
@transient
private lazy val maxBroadcastRows = mode match {
case HashedRelationBroadcastMode(key, _)
// NOTE: LongHashedRelation is used for single key with LongType. This should be kept
// consistent with HashedRelation.apply.
if !(key.length == 1 && key.head.dataType == LongType) =>
// Since the maximum number of keys that BytesToBytesMap supports is 1 << 29,
// and only 70% of the slots can be used before growing in UnsafeHashedRelation,
// here the limitation should not be over 341 million.
(BytesToBytesMap.MAX_CAPACITY / 1.5).toLong
case _ => 512000000
}
@transient
override lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
@ -117,9 +131,9 @@ case class BroadcastExchangeExec(
// Use executeCollect/executeCollectIterator to avoid conversion to Scala types
val (numRows, input) = child.executeCollectIterator()
longMetric("numOutputRows") += numRows
if (numRows >= MAX_BROADCAST_TABLE_ROWS) {
if (numRows >= maxBroadcastRows) {
throw new SparkException(
s"Cannot broadcast the table over $MAX_BROADCAST_TABLE_ROWS rows: $numRows rows")
s"Cannot broadcast the table over $maxBroadcastRows rows: $numRows rows")
}
val beforeBuild = System.nanoTime()
@ -212,11 +226,6 @@ case class BroadcastExchangeExec(
}
object BroadcastExchangeExec {
// Since the maximum number of keys that BytesToBytesMap supports is 1 << 29,
// and only 70% of the slots can be used before growing in HashedRelation,
// here the limitation should not be over 341 million.
val MAX_BROADCAST_TABLE_ROWS = (BytesToBytesMap.MAX_CAPACITY / 1.5).toLong
val MAX_BROADCAST_TABLE_BYTES = 8L << 30
private[execution] val executionContext = ExecutionContext.fromExecutorService(