[SPARK-15495][SQL] Improve the explain output for Aggregation operator

## What changes were proposed in this pull request?

This PR improves the explain output of Aggregator operator.

SQL:

```
Seq((1,2,3)).toDF("a", "b", "c").createTempView("df1")
spark.sql("cache table df1")
spark.sql("select count(a), count(c), b from df1 group by b").explain()
```

**Before change:**

```
*TungstenAggregate(key=[b#8], functions=[count(1),count(1)], output=[count(a)#79L,count(c)#80L,b#8])
+- Exchange hashpartitioning(b#8, 200), None
   +- *TungstenAggregate(key=[b#8], functions=[partial_count(1),partial_count(1)], output=[b#8,count#98L,count#99L])
      +- InMemoryTableScan [b#8], InMemoryRelation [a#7,b#8,c#9], true, 10000, StorageLevel(disk=true, memory=true, offheap=false, deserialized=true, replication=1), LocalTableScan [a#7,b#8,c#9], [[1,2,3]], Some(df1)
``````

**After change:**

```
*Aggregate(key=[b#8], functions=[count(1),count(1)], output=[count(a)#79L,count(c)#80L,b#8])
+- Exchange hashpartitioning(b#8, 200), None
   +- *Aggregate(key=[b#8], functions=[partial_count(1),partial_count(1)], output=[b#8,count#98L,count#99L])
      +- InMemoryTableScan [b#8], InMemoryRelation [a#7,b#8,c#9], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), LocalTableScan [a#7,b#8,c#9], [[1,2,3]], Some(df1)
```

## How was this patch tested?

Manual test and existing UT.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13363 from clockfly/verbose3.
This commit is contained in:
Sean Zhong 2016-06-01 09:58:01 -07:00 committed by Wenchen Fan
parent 1f43562daf
commit d5012c2740
4 changed files with 12 additions and 6 deletions

View file

@ -120,8 +120,14 @@ class StorageLevel private(
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
override def toString: String = {
s"StorageLevel(disk=$useDisk, memory=$useMemory, offheap=$useOffHeap, " +
s"deserialized=$deserialized, replication=$replication)"
val disk = if (useDisk) "disk" else ""
val memory = if (useMemory) "memory" else ""
val heap = if (useOffHeap) "offheap" else ""
val deserialize = if (deserialized) "deserialized" else ""
val output =
Seq(disk, memory, heap, deserialize, s"$replication replicas").filter(_.nonEmpty)
s"StorageLevel(${output.mkString(", ")})"
}
override def hashCode(): Int = toInt * 41 + replication

View file

@ -434,7 +434,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case other => other :: Nil
}.mkString(", ")
/** String representation of this node without any children. */
/** ONE line description of this node. */
def simpleString: String = s"$nodeName $argString".trim
override def toString: String = treeString

View file

@ -106,6 +106,6 @@ case class SortBasedAggregateExec(
val keyString = groupingExpressions.mkString("[", ",", "]")
val functionString = allAggregateExpressions.mkString("[", ",", "]")
val outputString = output.mkString("[", ",", "]")
s"SortBasedAggregate(key=$keyString, functions=$functionString, output=$outputString)"
s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)"
}
}

View file

@ -769,9 +769,9 @@ case class TungstenAggregate(
val keyString = groupingExpressions.mkString("[", ",", "]")
val functionString = allAggregateExpressions.mkString("[", ",", "]")
val outputString = output.mkString("[", ",", "]")
s"TungstenAggregate(key=$keyString, functions=$functionString, output=$outputString)"
s"Aggregate(key=$keyString, functions=$functionString, output=$outputString)"
case Some(fallbackStartsAt) =>
s"TungstenAggregateWithControlledFallback $groupingExpressions " +
s"AggregateWithControlledFallback $groupingExpressions " +
s"$allAggregateExpressions $resultExpressions fallbackStartsAt=$fallbackStartsAt"
}
}