diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index ca15b36699..b8833a3907 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -257,7 +257,7 @@ class DataFrame(object): >>> df.explain() == Physical Plan == - Scan ExistingRDD[age#0,name#1] + *(1) Scan ExistingRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan == diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 77e381ef6e..4faa27c2c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -84,7 +84,7 @@ case class RowDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val tableIdentifier: Option[TableIdentifier]) - extends DataSourceScanExec { + extends DataSourceScanExec with InputRDDCodegen { def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) @@ -104,30 +104,10 @@ case class RowDataSourceScanExec( } } - override def inputRDDs(): Seq[RDD[InternalRow]] = { - rdd :: Nil - } + // Input can be InternalRow, has to be turned into UnsafeRows. + override protected val createUnsafeProjection: Boolean = true - override protected def doProduce(ctx: CodegenContext): String = { - val numOutputRows = metricTerm(ctx, "numOutputRows") - // PhysicalRDD always just has one input - val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];") - val exprRows = output.zipWithIndex.map{ case (a, i) => - BoundReference(i, a.dataType, a.nullable) - } - val row = ctx.freshName("row") - ctx.INPUT_ROW = row - ctx.currentVars = null - val columnsRowInput = exprRows.map(_.genCode(ctx)) - s""" - |while ($input.hasNext()) { - | InternalRow $row = (InternalRow) $input.next(); - | $numOutputRows.add(1); - | ${consume(ctx, columnsRowInput).trim} - | if (shouldStop()) return; - |} - """.stripMargin - } + override def inputRDD: RDD[InternalRow] = rdd override val metadata: Map[String, String] = { val markedFilters = for (filter <- filters) yield { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 9f67d556af..e214bfd050 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -175,7 +175,7 @@ case class RDDScanExec( rdd: RDD[InternalRow], name: String, override val outputPartitioning: Partitioning = UnknownPartitioning(0), - override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode { + override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode with InputRDDCodegen { private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("") @@ -199,4 +199,9 @@ case class RDDScanExec( override def simpleString: String = { s"$nodeName${truncatedString(output, "[", ",", "]")}" } + + // Input can be InternalRow, has to be turned into UnsafeRows. + override protected val createUnsafeProjection: Boolean = true + + override def inputRDD: RDD[InternalRow] = rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 448eb703ea..31640db372 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics */ case class LocalTableScanExec( output: Seq[Attribute], - @transient rows: Seq[InternalRow]) extends LeafExecNode { + @transient rows: Seq[InternalRow]) extends LeafExecNode with InputRDDCodegen { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -76,4 +76,12 @@ case class LocalTableScanExec( longMetric("numOutputRows").add(taken.size) taken } + + // Input is already UnsafeRows. + override protected val createUnsafeProjection: Boolean = false + + // Do not codegen when there is no parent - to support the fast driver-local collect/take paths. + override def supportCodegen: Boolean = (parent != null) + + override def inputRDD: RDD[InternalRow] = rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 29bcbcae36..fbda0d87a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -350,6 +350,15 @@ trait CodegenSupport extends SparkPlan { */ def needStopCheck: Boolean = parent.needStopCheck + /** + * Helper default should stop check code. + */ + def shouldStopCheckCode: String = if (needStopCheck) { + "if (shouldStop()) return;" + } else { + "// shouldStop check is eliminated" + } + /** * A sequence of checks which evaluate to true if the downstream Limit operators have not received * enough records and reached the limit. If current node is a data producing node, it can leverage @@ -406,6 +415,53 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input can be InternalRows, an UnsafeProjection needs to be created. + protected val createUnsafeProjection: Boolean + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { + // Inline mutable state since an InputRDDCodegen is used once in a task for WholeStageCodegen + val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", + forceInline = true) + val row = ctx.freshName("row") + + val outputVars = if (createUnsafeProjection) { + // creating the vars will make the parent consume add an unsafe projection. + ctx.INPUT_ROW = row + ctx.currentVars = null + output.zipWithIndex.map { case (a, i) => + BoundReference(i, a.dataType, a.nullable).genCode(ctx) + } + } else { + null + } + + val updateNumOutputRowsMetrics = if (metrics.contains("numOutputRows")) { + val numOutputRows = metricTerm(ctx, "numOutputRows") + s"$numOutputRows.add(1);" + } else { + "" + } + s""" + | while ($limitNotReachedCond $input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | ${updateNumOutputRowsMetrics} + | ${consume(ctx, outputVars, if (createUnsafeProjection) null else row).trim} + | ${shouldStopCheckCode} + | } + """.stripMargin + } +} /** * InputAdapter is used to hide a SparkPlan from a subtree that supports codegen. @@ -413,7 +469,7 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { * This is the leaf node of a tree with WholeStageCodegen that is used to generate code * that consumes an RDD iterator of InternalRow. */ -case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport { +case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen { override def output: Seq[Attribute] = child.output @@ -429,24 +485,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp child.doExecuteBroadcast() } - override def inputRDDs(): Seq[RDD[InternalRow]] = { - child.execute() :: Nil - } + override def inputRDD: RDD[InternalRow] = child.execute() - override def doProduce(ctx: CodegenContext): String = { - // Right now, InputAdapter is only used when there is one input RDD. - // Inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen - val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", - forceInline = true) - val row = ctx.freshName("row") - s""" - | while ($limitNotReachedCond $input.hasNext()) { - | InternalRow $row = (InternalRow) $input.next(); - | ${consume(ctx, null, row).trim} - | if (shouldStop()) return; - | } - """.stripMargin - } + // InputAdapter does not need UnsafeProjection. + protected val createUnsafeProjection: Boolean = false override def generateTreeString( depth: Int, diff --git a/sql/core/src/test/resources/sql-tests/results/operators.sql.out b/sql/core/src/test/resources/sql-tests/results/operators.sql.out index fd1d0db9e3..570b281353 100644 --- a/sql/core/src/test/resources/sql-tests/results/operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/operators.sql.out @@ -201,7 +201,7 @@ struct -- !query 24 output == Physical Plan == *Project [null AS (CAST(concat(a, CAST(1 AS STRING)) AS DOUBLE) + CAST(2 AS DOUBLE))#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 25 @@ -211,7 +211,7 @@ struct -- !query 25 output == Physical Plan == *Project [-1b AS concat(CAST((1 - 2) AS STRING), b)#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 26 @@ -221,7 +221,7 @@ struct -- !query 26 output == Physical Plan == *Project [11b AS concat(CAST(((2 * 4) + 3) AS STRING), b)#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 27 @@ -231,7 +231,7 @@ struct -- !query 27 output == Physical Plan == *Project [4a2.0 AS concat(concat(CAST((3 + 1) AS STRING), a), CAST((CAST(4 AS DOUBLE) / CAST(2 AS DOUBLE)) AS STRING))#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 28 @@ -241,7 +241,7 @@ struct -- !query 28 output == Physical Plan == *Project [true AS ((1 = 1) OR (concat(a, b) = ab))#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 29 @@ -251,7 +251,7 @@ struct -- !query 29 output == Physical Plan == *Project [false AS ((concat(a, c) = ac) AND (2 = 3))#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 30