[SPARK-32659][SQL][FOLLOWUP] Broadcast Array instead of Set in InSubqueryExec

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/29475.

This PR updates the code to broadcast the Array instead of Set, which was the behavior before #29475

### Why are the changes needed?

The size of Set can be much bigger than Array. It's safer to keep the behavior the same as before and build the set at the executor side.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #29838 from cloud-fan/followup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Wenchen Fan 2020-09-22 08:49:58 -07:00 committed by Dongjoon Hyun
parent 790d9ef2d3
commit 6145621495

View file

@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.broadcast.Broadcast import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression} import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
@ -114,10 +114,10 @@ case class InSubqueryExec(
child: Expression, child: Expression,
plan: BaseSubqueryExec, plan: BaseSubqueryExec,
exprId: ExprId, exprId: ExprId,
private var resultBroadcast: Broadcast[Set[Any]] = null) extends ExecSubqueryExpression { private var resultBroadcast: Broadcast[Array[Any]] = null) extends ExecSubqueryExpression {
@transient private var result: Set[Any] = _ @transient private var result: Array[Any] = _
@transient private lazy val inSet = InSet(child, result) @transient private lazy val inSet = InSet(child, result.toSet)
override def dataType: DataType = BooleanType override def dataType: DataType = BooleanType
override def children: Seq[Expression] = child :: Nil override def children: Seq[Expression] = child :: Nil
@ -133,14 +133,14 @@ case class InSubqueryExec(
def updateResult(): Unit = { def updateResult(): Unit = {
val rows = plan.executeCollect() val rows = plan.executeCollect()
result = if (plan.output.length > 1) { result = if (plan.output.length > 1) {
rows.toSet rows.asInstanceOf[Array[Any]]
} else { } else {
rows.map(_.get(0, child.dataType)).toSet rows.map(_.get(0, child.dataType))
} }
resultBroadcast = plan.sqlContext.sparkContext.broadcast(result) resultBroadcast = plan.sqlContext.sparkContext.broadcast(result)
} }
def values(): Option[Set[Any]] = Option(resultBroadcast).map(_.value) def values(): Option[Array[Any]] = Option(resultBroadcast).map(_.value)
private def prepareResult(): Unit = { private def prepareResult(): Unit = {
require(resultBroadcast != null, s"$this has not finished") require(resultBroadcast != null, s"$this has not finished")