[SPARK-26159] Codegen for LocalTableScanExec and RDDScanExec

## What changes were proposed in this pull request?

Implement codegen for `LocalTableScanExec` and `ExistingRDDExec`. Refactor to share code between `LocalTableScanExec`, `ExistingRDDExec`, `InputAdapter` and `RowDataSourceScanExec`.

The difference in `doProduce` between these four was that `ExistingRDDExec` and `RowDataSourceScanExec` triggered adding an `UnsafeProjection`, while `InputAdapter` and `LocalTableScanExec` did not.

In the new trait `InputRDDCodegen` I added a flag `createUnsafeProjection` which the operators set accordingly.

Note: `LocalTableScanExec` explicitly creates its input as `UnsafeRows`, so it was obvious why it doesn't need an `UnsafeProjection`. But if an `InputAdapter` may take input that is `InternalRows` but not `UnsafeRows`, then I think it doesn't need an unsafe projection just because any other operator that is its parent would do that. That assumes that that any parent operator would always result in some `UnsafeProjection` being eventually added, and hence the output of the `WholeStageCodegen` unit would be `UnsafeRows`. If these assumptions hold, I think `createUnsafeProjection` could be set to `(parent == null)`.

Note: Do not codegen `LocalTableScanExec` when it's the only operator. `LocalTableScanExec` has optimized driver-only `executeCollect` and `executeTake` code paths that are used to return `Command` results without starting Spark Jobs. They can no longer be used if the `LocalTableScanExec` gets optimized.

## How was this patch tested?

Covered and used in existing tests.

Closes #23127 from juliuszsompolski/SPARK-26159.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Juliusz Sompolski 2018-11-28 13:37:11 +08:00 committed by Wenchen Fan
parent 2d89d109e1
commit 8c6871828e
6 changed files with 86 additions and 51 deletions

View file

@ -257,7 +257,7 @@ class DataFrame(object):
>>> df.explain()
== Physical Plan ==
Scan ExistingRDD[age#0,name#1]
*(1) Scan ExistingRDD[age#0,name#1]
>>> df.explain(True)
== Parsed Logical Plan ==

View file

@ -84,7 +84,7 @@ case class RowDataSourceScanExec(
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {
extends DataSourceScanExec with InputRDDCodegen {
def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)
@ -104,30 +104,10 @@ case class RowDataSourceScanExec(
}
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}
// Input can be InternalRow, has to be turned into UnsafeRows.
override protected val createUnsafeProjection: Boolean = true
override protected def doProduce(ctx: CodegenContext): String = {
val numOutputRows = metricTerm(ctx, "numOutputRows")
// PhysicalRDD always just has one input
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];")
val exprRows = output.zipWithIndex.map{ case (a, i) =>
BoundReference(i, a.dataType, a.nullable)
}
val row = ctx.freshName("row")
ctx.INPUT_ROW = row
ctx.currentVars = null
val columnsRowInput = exprRows.map(_.genCode(ctx))
s"""
|while ($input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| $numOutputRows.add(1);
| ${consume(ctx, columnsRowInput).trim}
| if (shouldStop()) return;
|}
""".stripMargin
}
override def inputRDD: RDD[InternalRow] = rdd
override val metadata: Map[String, String] = {
val markedFilters = for (filter <- filters) yield {

View file

@ -175,7 +175,7 @@ case class RDDScanExec(
rdd: RDD[InternalRow],
name: String,
override val outputPartitioning: Partitioning = UnknownPartitioning(0),
override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode {
override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode with InputRDDCodegen {
private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("")
@ -199,4 +199,9 @@ case class RDDScanExec(
override def simpleString: String = {
s"$nodeName${truncatedString(output, "[", ",", "]")}"
}
// Input can be InternalRow, has to be turned into UnsafeRows.
override protected val createUnsafeProjection: Boolean = true
override def inputRDD: RDD[InternalRow] = rdd
}

View file

@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
*/
case class LocalTableScanExec(
output: Seq[Attribute],
@transient rows: Seq[InternalRow]) extends LeafExecNode {
@transient rows: Seq[InternalRow]) extends LeafExecNode with InputRDDCodegen {
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
@ -76,4 +76,12 @@ case class LocalTableScanExec(
longMetric("numOutputRows").add(taken.size)
taken
}
// Input is already UnsafeRows.
override protected val createUnsafeProjection: Boolean = false
// Do not codegen when there is no parent - to support the fast driver-local collect/take paths.
override def supportCodegen: Boolean = (parent != null)
override def inputRDD: RDD[InternalRow] = rdd
}

View file

@ -350,6 +350,15 @@ trait CodegenSupport extends SparkPlan {
*/
def needStopCheck: Boolean = parent.needStopCheck
/**
* Helper default should stop check code.
*/
def shouldStopCheckCode: String = if (needStopCheck) {
"if (shouldStop()) return;"
} else {
"// shouldStop check is eliminated"
}
/**
* A sequence of checks which evaluate to true if the downstream Limit operators have not received
* enough records and reached the limit. If current node is a data producing node, it can leverage
@ -406,6 +415,53 @@ trait BlockingOperatorWithCodegen extends CodegenSupport {
override def limitNotReachedChecks: Seq[String] = Nil
}
/**
* Leaf codegen node reading from a single RDD.
*/
trait InputRDDCodegen extends CodegenSupport {
def inputRDD: RDD[InternalRow]
// If the input can be InternalRows, an UnsafeProjection needs to be created.
protected val createUnsafeProjection: Boolean
override def inputRDDs(): Seq[RDD[InternalRow]] = {
inputRDD :: Nil
}
override def doProduce(ctx: CodegenContext): String = {
// Inline mutable state since an InputRDDCodegen is used once in a task for WholeStageCodegen
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")
val outputVars = if (createUnsafeProjection) {
// creating the vars will make the parent consume add an unsafe projection.
ctx.INPUT_ROW = row
ctx.currentVars = null
output.zipWithIndex.map { case (a, i) =>
BoundReference(i, a.dataType, a.nullable).genCode(ctx)
}
} else {
null
}
val updateNumOutputRowsMetrics = if (metrics.contains("numOutputRows")) {
val numOutputRows = metricTerm(ctx, "numOutputRows")
s"$numOutputRows.add(1);"
} else {
""
}
s"""
| while ($limitNotReachedCond $input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| ${updateNumOutputRowsMetrics}
| ${consume(ctx, outputVars, if (createUnsafeProjection) null else row).trim}
| ${shouldStopCheckCode}
| }
""".stripMargin
}
}
/**
* InputAdapter is used to hide a SparkPlan from a subtree that supports codegen.
@ -413,7 +469,7 @@ trait BlockingOperatorWithCodegen extends CodegenSupport {
* This is the leaf node of a tree with WholeStageCodegen that is used to generate code
* that consumes an RDD iterator of InternalRow.
*/
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen {
override def output: Seq[Attribute] = child.output
@ -429,24 +485,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
child.doExecuteBroadcast()
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.execute() :: Nil
}
override def inputRDD: RDD[InternalRow] = child.execute()
override def doProduce(ctx: CodegenContext): String = {
// Right now, InputAdapter is only used when there is one input RDD.
// Inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")
s"""
| while ($limitNotReachedCond $input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| ${consume(ctx, null, row).trim}
| if (shouldStop()) return;
| }
""".stripMargin
}
// InputAdapter does not need UnsafeProjection.
protected val createUnsafeProjection: Boolean = false
override def generateTreeString(
depth: Int,

View file

@ -201,7 +201,7 @@ struct<plan:string>
-- !query 24 output
== Physical Plan ==
*Project [null AS (CAST(concat(a, CAST(1 AS STRING)) AS DOUBLE) + CAST(2 AS DOUBLE))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]
-- !query 25
@ -211,7 +211,7 @@ struct<plan:string>
-- !query 25 output
== Physical Plan ==
*Project [-1b AS concat(CAST((1 - 2) AS STRING), b)#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]
-- !query 26
@ -221,7 +221,7 @@ struct<plan:string>
-- !query 26 output
== Physical Plan ==
*Project [11b AS concat(CAST(((2 * 4) + 3) AS STRING), b)#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]
-- !query 27
@ -231,7 +231,7 @@ struct<plan:string>
-- !query 27 output
== Physical Plan ==
*Project [4a2.0 AS concat(concat(CAST((3 + 1) AS STRING), a), CAST((CAST(4 AS DOUBLE) / CAST(2 AS DOUBLE)) AS STRING))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]
-- !query 28
@ -241,7 +241,7 @@ struct<plan:string>
-- !query 28 output
== Physical Plan ==
*Project [true AS ((1 = 1) OR (concat(a, b) = ab))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]
-- !query 29
@ -251,7 +251,7 @@ struct<plan:string>
-- !query 29 output
== Physical Plan ==
*Project [false AS ((concat(a, c) = ac) AND (2 = 3))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]
-- !query 30