[SPARK-14127][SQL] Makes 'DESC [EXTENDED|FORMATTED] <table>' support data source tables

## What changes were proposed in this pull request?

This is a follow-up of PR #12844. It makes the newly updated `DescribeTableCommand` to support data sources tables.

## How was this patch tested?

A test case is added to check `DESC [EXTENDED | FORMATTED] <table>` output.

Author: Cheng Lian <lian@databricks.com>

Closes #12934 from liancheng/spark-14127-desc-table-follow-up.
This commit is contained in:
Cheng Lian 2016-05-09 10:53:32 -07:00 committed by Yin Huai
parent b1e01fd519
commit 671b382a80
2 changed files with 47 additions and 30 deletions

View file

@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
@ -288,45 +288,45 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
override def run(sparkSession: SparkSession): Seq[Row] = {
val result = new ArrayBuffer[Row]
sparkSession.sessionState.catalog.lookupRelation(table) match {
case catalogRelation: CatalogRelation =>
if (isExtended) {
describeExtended(catalogRelation, result)
} else if (isFormatted) {
describeFormatted(catalogRelation, result)
} else {
describe(catalogRelation, result)
}
val catalog = sparkSession.sessionState.catalog
case relation =>
describeSchema(relation.schema, result)
if (catalog.isTemporaryTable(table)) {
describeSchema(catalog.lookupRelation(table).schema, result)
} else {
val metadata = catalog.getTableMetadata(table)
if (isExtended) {
describeExtended(metadata, result)
} else if (isFormatted) {
describeFormatted(metadata, result)
} else {
describe(metadata, result)
}
}
result
}
// Shows data columns and partitioned columns (if any)
private def describe(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = {
describeSchema(relation.catalogTable.schema, buffer)
private def describe(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describeSchema(table.schema, buffer)
if (relation.catalogTable.partitionColumns.nonEmpty) {
if (table.partitionColumns.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output(0).name}", output(1).name, output(2).name)
describeSchema(relation.catalogTable.partitionColumns, buffer)
describeSchema(table.partitionColumns, buffer)
}
}
private def describeExtended(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = {
describe(relation, buffer)
private def describeExtended(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describe(table, buffer)
append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", relation.catalogTable.toString, "")
append(buffer, "# Detailed Table Information", table.toString, "")
}
private def describeFormatted(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = {
describe(relation, buffer)
val table = relation.catalogTable
private def describeFormatted(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describe(table, buffer)
append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", "", "")
@ -358,6 +358,12 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}
private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
}
}
private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
val comment =
@ -366,12 +372,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}
private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
}
}
private def append(
buffer: ArrayBuffer[Row], column: String, dataType: String, comment: String): Unit = {
buffer += Row(column, dataType, comment)

View file

@ -365,7 +365,7 @@ class HiveDDLSuite
}
}
test("desc table") {
test("desc table for Hive table") {
withTable("tab1") {
val tabName = "tab1"
sql(s"CREATE TABLE $tabName(c1 int)")
@ -503,4 +503,21 @@ class HiveDDLSuite
}.getMessage
assert(message.contains("Can not drop default database"))
}
test("desc table for data source table") {
withTable("tab1") {
val tabName = "tab1"
sqlContext.range(1).write.format("json").saveAsTable(tabName)
assert(sql(s"DESC $tabName").collect().length == 1)
assert(
sql(s"DESC FORMATTED $tabName").collect()
.exists(_.getString(0) == "# Storage Information"))
assert(
sql(s"DESC EXTENDED $tabName").collect()
.exists(_.getString(0) == "# Detailed Table Information"))
}
}
}