From 61456214957cd446c2338fc70a0872c4bc22f77d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 22 Sep 2020 08:49:58 -0700 Subject: [PATCH] [SPARK-32659][SQL][FOLLOWUP] Broadcast Array instead of Set in InSubqueryExec ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/29475. This PR updates the code to broadcast the Array instead of Set, which was the behavior before #29475 ### Why are the changes needed? The size of Set can be much bigger than Array. It's safer to keep the behavior the same as before and build the set at the executor side. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #29838 from cloud-fan/followup. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/execution/subquery.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index b56c0792f1..1a6b99a455 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -114,10 +114,10 @@ case class InSubqueryExec( child: Expression, plan: BaseSubqueryExec, exprId: ExprId, - private var resultBroadcast: Broadcast[Set[Any]] = null) extends ExecSubqueryExpression { + private var resultBroadcast: Broadcast[Array[Any]] = null) extends ExecSubqueryExpression { - @transient private var result: Set[Any] = _ - @transient private lazy val inSet = InSet(child, result) + @transient private var result: Array[Any] = _ + @transient private lazy val inSet = InSet(child, result.toSet) override def dataType: DataType = BooleanType override def children: Seq[Expression] = child :: Nil @@ -133,14 +133,14 @@ case class InSubqueryExec( def updateResult(): Unit = { val rows = plan.executeCollect() result = if (plan.output.length > 1) { - rows.toSet + rows.asInstanceOf[Array[Any]] } else { - rows.map(_.get(0, child.dataType)).toSet + rows.map(_.get(0, child.dataType)) } resultBroadcast = plan.sqlContext.sparkContext.broadcast(result) } - def values(): Option[Set[Any]] = Option(resultBroadcast).map(_.value) + def values(): Option[Array[Any]] = Option(resultBroadcast).map(_.value) private def prepareResult(): Unit = { require(resultBroadcast != null, s"$this has not finished")