[SPARK-9733][SQL] Improve physical plan explain for data sources

All data sources show up as "PhysicalRDD" in physical plan explain. It'd be better if we can show the name of the data source.

Without this patch:
```
== Physical Plan ==
NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), LongType))2,mode=Final,isDistinct=false))
 Exchange hashpartitioning(date#0,cat#1)
  NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), LongType))2,mode=Partial,isDistinct=false))
   PhysicalRDD [date#0,cat#1,count#2], MapPartitionsRDD[3] at
```

With this patch:
```
== Physical Plan ==
TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, IntegerType) + 1), LongType)),mode=Final,isDistinct=false)]
 Exchange hashpartitioning(date#0,cat#1)
  TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, IntegerType) + 1), LongType)),mode=Partial,isDistinct=false)]
   ConvertToUnsafe
    Scan ParquetRelation[file:/scratch/rxin/spark/sales4][date#0,cat#1,count#2]
```

Author: Reynold Xin <rxin@databricks.com>

Closes #8024 from rxin/SPARK-9733 and squashes the following commits:

811b90e [Reynold Xin] Fixed Python test case.
52cab77 [Reynold Xin] Cast.
eea9ccc [Reynold Xin] Fix test case.
fcecb22 [Reynold Xin] [SPARK-9733][SQL] Improve explain message for data source scan node.
This commit is contained in:
Reynold Xin 2015-08-07 13:41:45 -07:00
parent aeddeafc03
commit 05d04e10a8
11 changed files with 45 additions and 27 deletions

View file

@ -212,8 +212,7 @@ class DataFrame(object):
:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
>>> df.explain()
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\
NativeMethodAccessorImpl.java:...
Scan PhysicalRDD[age#0,name#1]
>>> df.explain(True)
== Parsed Logical Plan ==
@ -224,7 +223,6 @@ class DataFrame(object):
...
== Physical Plan ==
...
== RDD ==
"""
if extended:
print(self._jdf.queryExecution().toString())

View file

@ -107,6 +107,8 @@ object Cast {
case class Cast(child: Expression, dataType: DataType)
extends UnaryExpression with CodegenFallback {
override def toString: String = s"cast($child as ${dataType.simpleString})"
override def checkInputDataTypes(): TypeCheckResult = {
if (Cast.canCast(child.dataType, dataType)) {
TypeCheckResult.TypeCheckSuccess
@ -118,8 +120,6 @@ case class Cast(child: Expression, dataType: DataType)
override def nullable: Boolean = Cast.forceNullable(child.dataType, dataType) || child.nullable
override def toString: String = s"CAST($child, $dataType)"
// [[func]] assumes the input is no longer null because eval already does the null check.
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])

View file

@ -93,7 +93,7 @@ private[sql] case class AggregateExpression2(
AttributeSet(childReferences)
}
override def toString: String = s"(${aggregateFunction}2,mode=$mode,isDistinct=$isDistinct)"
override def toString: String = s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)"
}
abstract class AggregateFunction2

View file

@ -1011,9 +1011,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
// TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
// however, the `toRdd` will cause the real execution, which is not what we want.
// We need to think about how to avoid the side effect.
s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
@ -1024,7 +1021,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
|== RDD ==
""".stripMargin.trim
}
}

View file

@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SQLContext}
@ -95,11 +96,23 @@ private[sql] case class LogicalRDD(
/** Physical plan node for scanning data from an RDD. */
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow]) extends LeafNode {
rdd: RDD[InternalRow],
extraInformation: String) extends LeafNode {
override protected[sql] val trackNumOfRowsEnabled = true
protected override def doExecute(): RDD[InternalRow] = rdd
override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
}
private[sql] object PhysicalRDD {
def createFromDataSource(
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation): PhysicalRDD = {
PhysicalRDD(output, rdd, relation.toString)
}
}
/** Logical plan node for scanning data from a local collection. */

View file

@ -363,12 +363,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Generate(
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
case logical.OneRowRelation =>
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil
case logical.RepartitionByExpression(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
case e @ EvaluatePython(udf, child, _) =>
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil
case BroadcastHint(child) => apply(child)
case _ => Nil
}

View file

@ -93,10 +93,13 @@ case class TungstenAggregate(
val allAggregateExpressions = nonCompleteAggregateExpressions ++ completeAggregateExpressions
testFallbackStartsAt match {
case None => s"TungstenAggregate ${groupingExpressions} ${allAggregateExpressions}"
case None =>
val keyString = groupingExpressions.mkString("[", ",", "]")
val valueString = allAggregateExpressions.mkString("[", ",", "]")
s"TungstenAggregate(key=$keyString, value=$valueString"
case Some(fallbackStartsAt) =>
s"TungstenAggregateWithControlledFallback ${groupingExpressions} " +
s"${allAggregateExpressions} fallbackStartsAt=$fallbackStartsAt"
s"TungstenAggregateWithControlledFallback $groupingExpressions " +
s"$allAggregateExpressions fallbackStartsAt=$fallbackStartsAt"
}
}
}

View file

@ -101,8 +101,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, f) =>
toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil
case l @ LogicalRelation(t: TableScan) =>
execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil
case l @ LogicalRelation(baseRelation: TableScan) =>
execution.PhysicalRDD.createFromDataSource(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
@ -169,7 +170,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
}
execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows)
execution.PhysicalRDD.createFromDataSource(
projections.map(_.toAttribute),
unionedRows,
logicalRelation.relation)
}
// TODO: refactor this thing. It is very complicated because it does projection internally.
@ -299,14 +303,18 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
.map(relation.attributeMap) // Match original case of attributes.
val scan = execution.PhysicalRDD(projects.map(_.toAttribute),
scanBuilder(requestedColumns, pushedFilters))
val scan = execution.PhysicalRDD.createFromDataSource(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, pushedFilters),
relation.relation)
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
val scan = execution.PhysicalRDD(requestedColumns,
scanBuilder(requestedColumns, pushedFilters))
val scan = execution.PhysicalRDD.createFromDataSource(
requestedColumns,
scanBuilder(requestedColumns, pushedFilters),
relation.relation)
execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
}

View file

@ -383,7 +383,7 @@ private[sql] abstract class OutputWriterInternal extends OutputWriter {
abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
extends BaseRelation with Logging {
logInfo("Constructing HadoopFsRelation")
override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
def this() = this(None)

View file

@ -32,9 +32,9 @@ class RowFormatConvertersSuite extends SparkPlanTest {
case c: ConvertToSafe => c
}
private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(!outputsSafe.outputsUnsafeRows)
private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null))
private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(outputsUnsafe.outputsUnsafeRows)
test("planner should insert unsafe->safe conversions when required") {

View file

@ -36,7 +36,7 @@ class HiveExplainSuite extends QueryTest {
"== Analyzed Logical Plan ==",
"== Optimized Logical Plan ==",
"== Physical Plan ==",
"Code Generation", "== RDD ==")
"Code Generation")
}
test("explain create table command") {