[SPARK-5643][SQL] Add a show method to print the content of a DataFrame in tabular format.
An example: ``` year month AVG('Adj Close) MAX('Adj Close) 1980 12 0.503218 0.595103 1981 01 0.523289 0.570307 1982 02 0.436504 0.475256 1983 03 0.410516 0.442194 1984 04 0.450090 0.483521 ``` Author: Reynold Xin <rxin@databricks.com> Closes #4416 from rxin/SPARK-5643 and squashes the following commits: d0e0d6e [Reynold Xin] [SQL] Minor update to data source and statistics documentation. 269da83 [Reynold Xin] Updated isLocal comment. 2cf3c27 [Reynold Xin] Moved logic into optimizer. 1a04d8b [Reynold Xin] [SPARK-5643][SQL] Add a show method to print the content of a DataFrame in columnar format.
This commit is contained in:
parent
56aff4bd6c
commit
a052ed4250
|
@ -50,7 +50,9 @@ object DefaultOptimizer extends Optimizer {
|
|||
CombineFilters,
|
||||
PushPredicateThroughProject,
|
||||
PushPredicateThroughJoin,
|
||||
ColumnPruning) :: Nil
|
||||
ColumnPruning) ::
|
||||
Batch("LocalRelation", FixedPoint(100),
|
||||
ConvertToLocalRelation) :: Nil
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -610,3 +612,17 @@ object DecimalAggregates extends Rule[LogicalPlan] {
|
|||
DecimalType(prec + 4, scale + 4))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts local operations (i.e. ones that don't require data exchange) on LocalRelation to
|
||||
* another LocalRelation.
|
||||
*
|
||||
* This is relatively simple as it currently handles only a single case: Project.
|
||||
*/
|
||||
object ConvertToLocalRelation extends Rule[LogicalPlan] {
|
||||
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
|
||||
case Project(projectList, LocalRelation(output, data)) =>
|
||||
val projection = new InterpretedProjection(projectList, output)
|
||||
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,12 +29,15 @@ import org.apache.spark.sql.catalyst.trees
|
|||
/**
|
||||
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
|
||||
* corresponding statistic produced by the children. To override this behavior, override
|
||||
* `statistics` and assign it an overriden version of `Statistics`.
|
||||
* `statistics` and assign it an overridden version of `Statistics`.
|
||||
*
|
||||
* '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the
|
||||
* '''NOTE''': concrete and/or overridden versions of statistics fields should pay attention to the
|
||||
* performance of the implementations. The reason is that estimations might get triggered in
|
||||
* performance-critical processes, such as query plan planning.
|
||||
*
|
||||
* Note that we are using a BigInt here since it is easy to overflow a 64-bit integer in
|
||||
* cardinality estimation (e.g. cartesian joins).
|
||||
*
|
||||
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
|
||||
* defaults to the product of children's `sizeInBytes`.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.catalyst.optimizer
|
||||
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
|
||||
import org.apache.spark.sql.catalyst.dsl.plans._
|
||||
import org.apache.spark.sql.catalyst.dsl.expressions._
|
||||
import org.apache.spark.sql.catalyst.plans.PlanTest
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
|
||||
import org.apache.spark.sql.catalyst.rules.RuleExecutor
|
||||
|
||||
|
||||
class ConvertToLocalRelationSuite extends PlanTest {
|
||||
|
||||
object Optimize extends RuleExecutor[LogicalPlan] {
|
||||
val batches =
|
||||
Batch("LocalRelation", FixedPoint(100),
|
||||
ConvertToLocalRelation) :: Nil
|
||||
}
|
||||
|
||||
test("Project on LocalRelation should be turned into a single LocalRelation") {
|
||||
val testRelation = LocalRelation(
|
||||
LocalRelation('a.int, 'b.int).output,
|
||||
Row(1, 2) ::
|
||||
Row(4, 5) :: Nil)
|
||||
|
||||
val correctAnswer = LocalRelation(
|
||||
LocalRelation('a1.int, 'b1.int).output,
|
||||
Row(1, 3) ::
|
||||
Row(4, 6) :: Nil)
|
||||
|
||||
val projectOnLocal = testRelation.select(
|
||||
UnresolvedAttribute("a").as("a1"),
|
||||
(UnresolvedAttribute("b") + 1).as("b1"))
|
||||
|
||||
val optimized = Optimize(projectOnLocal.analyze)
|
||||
|
||||
comparePlans(optimized, correctAnswer)
|
||||
}
|
||||
|
||||
}
|
|
@ -102,7 +102,7 @@ trait DataFrame extends RDDApi[Row] {
|
|||
* }}}
|
||||
*/
|
||||
@scala.annotation.varargs
|
||||
def toDataFrame(colName: String, colNames: String*): DataFrame
|
||||
def toDataFrame(colNames: String*): DataFrame
|
||||
|
||||
/** Returns the schema of this [[DataFrame]]. */
|
||||
def schema: StructType
|
||||
|
@ -116,6 +116,25 @@ trait DataFrame extends RDDApi[Row] {
|
|||
/** Prints the schema to the console in a nice tree format. */
|
||||
def printSchema(): Unit
|
||||
|
||||
/**
|
||||
* Returns true if the `collect` and `take` methods can be run locally
|
||||
* (without any Spark executors).
|
||||
*/
|
||||
def isLocal: Boolean
|
||||
|
||||
/**
|
||||
* Displays the [[DataFrame]] in a tabular form. For example:
|
||||
* {{{
|
||||
* year month AVG('Adj Close) MAX('Adj Close)
|
||||
* 1980 12 0.503218 0.595103
|
||||
* 1981 01 0.523289 0.570307
|
||||
* 1982 02 0.436504 0.475256
|
||||
* 1983 03 0.410516 0.442194
|
||||
* 1984 04 0.450090 0.483521
|
||||
* }}}
|
||||
*/
|
||||
def show(): Unit
|
||||
|
||||
/**
|
||||
* Cartesian join with another [[DataFrame]].
|
||||
*
|
||||
|
|
|
@ -90,14 +90,13 @@ private[sql] class DataFrameImpl protected[sql](
|
|||
}
|
||||
}
|
||||
|
||||
override def toDataFrame(colName: String, colNames: String*): DataFrame = {
|
||||
val newNames = colName +: colNames
|
||||
require(schema.size == newNames.size,
|
||||
override def toDataFrame(colNames: String*): DataFrame = {
|
||||
require(schema.size == colNames.size,
|
||||
"The number of columns doesn't match.\n" +
|
||||
"Old column names: " + schema.fields.map(_.name).mkString(", ") + "\n" +
|
||||
"New column names: " + newNames.mkString(", "))
|
||||
"New column names: " + colNames.mkString(", "))
|
||||
|
||||
val newCols = schema.fieldNames.zip(newNames).map { case (oldName, newName) =>
|
||||
val newCols = schema.fieldNames.zip(colNames).map { case (oldName, newName) =>
|
||||
apply(oldName).as(newName)
|
||||
}
|
||||
select(newCols :_*)
|
||||
|
@ -113,6 +112,38 @@ private[sql] class DataFrameImpl protected[sql](
|
|||
|
||||
override def printSchema(): Unit = println(schema.treeString)
|
||||
|
||||
override def isLocal: Boolean = {
|
||||
logicalPlan.isInstanceOf[LocalRelation]
|
||||
}
|
||||
|
||||
override def show(): Unit = {
|
||||
val data = take(20)
|
||||
val numCols = schema.fieldNames.length
|
||||
|
||||
// For cells that are beyond 20 characters, replace it with the first 17 and "..."
|
||||
val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row =>
|
||||
row.toSeq.map { cell =>
|
||||
val str = if (cell == null) "null" else cell.toString
|
||||
if (str.length > 20) str.substring(0, 17) + "..." else str
|
||||
} : Seq[String]
|
||||
}
|
||||
|
||||
// Compute the width of each column
|
||||
val colWidths = Array.fill(numCols)(0)
|
||||
for (row <- rows) {
|
||||
for ((cell, i) <- row.zipWithIndex) {
|
||||
colWidths(i) = math.max(colWidths(i), cell.length)
|
||||
}
|
||||
}
|
||||
|
||||
// Pad the cells and print them
|
||||
println(rows.map { row =>
|
||||
row.zipWithIndex.map { case (cell, i) =>
|
||||
String.format(s"%-${colWidths(i)}s", cell)
|
||||
}.mkString(" ")
|
||||
}.mkString("\n"))
|
||||
}
|
||||
|
||||
override def join(right: DataFrame): DataFrame = {
|
||||
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
|
|||
|
||||
protected[sql] override def logicalPlan: LogicalPlan = err()
|
||||
|
||||
override def toDataFrame(colName: String, colNames: String*): DataFrame = err()
|
||||
override def toDataFrame(colNames: String*): DataFrame = err()
|
||||
|
||||
override def schema: StructType = err()
|
||||
|
||||
|
@ -58,6 +58,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
|
|||
|
||||
override def printSchema(): Unit = err()
|
||||
|
||||
override def show(): Unit = err()
|
||||
|
||||
override def isLocal: Boolean = false
|
||||
|
||||
override def join(right: DataFrame): DataFrame = err()
|
||||
|
||||
override def join(right: DataFrame, joinExprs: Column): DataFrame = err()
|
||||
|
|
|
@ -17,9 +17,6 @@
|
|||
|
||||
package org.apache.spark.sql.execution
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
|
||||
import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.rdd.{RDD, ShuffledRDD}
|
||||
|
@ -40,7 +37,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
|
|||
|
||||
@transient lazy val buildProjection = newMutableProjection(projectList, child.output)
|
||||
|
||||
def execute() = child.execute().mapPartitions { iter =>
|
||||
override def execute() = child.execute().mapPartitions { iter =>
|
||||
val resuableProjection = buildProjection()
|
||||
iter.map(resuableProjection)
|
||||
}
|
||||
|
@ -55,7 +52,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
|
|||
|
||||
@transient lazy val conditionEvaluator = newPredicate(condition, child.output)
|
||||
|
||||
def execute() = child.execute().mapPartitions { iter =>
|
||||
override def execute() = child.execute().mapPartitions { iter =>
|
||||
iter.filter(conditionEvaluator)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,13 +87,13 @@ trait CreatableRelationProvider {
|
|||
|
||||
/**
|
||||
* ::DeveloperApi::
|
||||
* Represents a collection of tuples with a known schema. Classes that extend BaseRelation must
|
||||
* be able to produce the schema of their data in the form of a [[StructType]] Concrete
|
||||
* Represents a collection of tuples with a known schema. Classes that extend BaseRelation must
|
||||
* be able to produce the schema of their data in the form of a [[StructType]]. Concrete
|
||||
* implementation should inherit from one of the descendant `Scan` classes, which define various
|
||||
* abstract methods for execution.
|
||||
*
|
||||
* BaseRelations must also define a equality function that only returns true when the two
|
||||
* instances will return the same data. This equality function is used when determining when
|
||||
* instances will return the same data. This equality function is used when determining when
|
||||
* it is safe to substitute cached results for a given relation.
|
||||
*/
|
||||
@DeveloperApi
|
||||
|
@ -102,13 +102,16 @@ abstract class BaseRelation {
|
|||
def schema: StructType
|
||||
|
||||
/**
|
||||
* Returns an estimated size of this relation in bytes. This information is used by the planner
|
||||
* Returns an estimated size of this relation in bytes. This information is used by the planner
|
||||
* to decided when it is safe to broadcast a relation and can be overridden by sources that
|
||||
* know the size ahead of time. By default, the system will assume that tables are too
|
||||
* large to broadcast. This method will be called multiple times during query planning
|
||||
* large to broadcast. This method will be called multiple times during query planning
|
||||
* and thus should not perform expensive operations for each invocation.
|
||||
*
|
||||
* Note that it is always better to overestimate size than underestimate, because underestimation
|
||||
* could lead to execution plans that are suboptimal (i.e. broadcasting a very large table).
|
||||
*/
|
||||
def sizeInBytes = sqlContext.conf.defaultSizeInBytes
|
||||
def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in a new issue