[SPARK-29834][SQL] DESC DATABASE should look up catalog like v2 commands

### What changes were proposed in this pull request?
Add DescribeNamespaceStatement, DescribeNamespace and DescribeNamespaceExec
to make "DESC DATABASE" look up catalog like v2 commands.

### Why are the changes needed?
It's important to make all the commands have the same catalog/namespace resolution behavior, to avoid confusing end-users.

### Does this PR introduce any user-facing change?
Yes, add "DESC NAMESPACE" whose function is same as "DESC DATABASE" and "DESC SCHEMA".

### How was this patch tested?
New unit test

Closes #26513 from fuwhu/SPARK-29834.

Authored-by: fuwhu <bestwwg@163.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
fuwhu 2019-11-15 18:50:42 -08:00 committed by Dongjoon Hyun
parent 7720781695
commit 16e7195299
12 changed files with 146 additions and 35 deletions

View file

@ -199,7 +199,8 @@ statement
| SHOW CREATE TABLE multipartIdentifier #showCreateTable
| SHOW CURRENT NAMESPACE #showCurrentNamespace
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase
| (DESC | DESCRIBE) (database | NAMESPACE) EXTENDED?
multipartIdentifier #describeNamespace
| (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
multipartIdentifier partitionSpec? describeColName? #describeTable
| (DESC | DESCRIBE) QUERY? query #describeQuery

View file

@ -172,6 +172,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case DropNamespaceStatement(NonSessionCatalog(catalog, nameParts), ifExists, cascade) =>
DropNamespace(catalog, nameParts, ifExists, cascade)
case DescribeNamespaceStatement(NonSessionCatalog(catalog, nameParts), extended) =>
DescribeNamespace(catalog, nameParts, extended)
case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)

View file

@ -2541,6 +2541,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
Option(ctx.pattern).map(string))
}
/**
* Create a [[DescribeNamespaceStatement]].
*
* For example:
* {{{
* DESCRIBE (DATABASE|SCHEMA|NAMESPACE) [EXTENDED] database;
* }}}
*/
override def visitDescribeNamespace(ctx: DescribeNamespaceContext): LogicalPlan =
withOrigin(ctx) {
DescribeNamespaceStatement(
visitMultipartIdentifier(ctx.multipartIdentifier()),
ctx.EXTENDED != null)
}
/**
* Create a table, returning a [[CreateTableStatement]] logical plan.
*

View file

@ -269,6 +269,13 @@ case class DescribeTableStatement(
partitionSpec: TablePartitionSpec,
isExtended: Boolean) extends ParsedStatement
/**
* A DESCRIBE NAMESPACE statement, as parsed from SQL.
*/
case class DescribeNamespaceStatement(
namespace: Seq[String],
extended: Boolean) extends ParsedStatement
/**
* A DESCRIBE TABLE tbl_name col_name statement, as parsed from SQL.
*/

View file

@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.{DataType, StringType, StructType}
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType}
/**
* Base trait for DataSourceV2 write commands
@ -255,6 +255,21 @@ case class DropNamespace(
ifExists: Boolean,
cascade: Boolean) extends Command
/**
* The logical plan of the DESCRIBE NAMESPACE command that works for v2 catalogs.
*/
case class DescribeNamespace(
catalog: CatalogPlugin,
namespace: Seq[String],
extended: Boolean) extends Command {
override def output: Seq[Attribute] = Seq(
AttributeReference("name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column").build())(),
AttributeReference("value", StringType, nullable = true,
new MetadataBuilder().putString("comment", "value of the column").build())())
}
/**
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
*/

View file

@ -653,6 +653,13 @@ class DDLParserSuite extends AnalysisTest {
"DESC TABLE COLUMN for a specific partition is not supported"))
}
test("describe database") {
val sql1 = "DESCRIBE DATABASE EXTENDED a.b"
val sql2 = "DESCRIBE DATABASE a.b"
comparePlans(parsePlan(sql1), DescribeNamespaceStatement(Seq("a", "b"), extended = true))
comparePlans(parsePlan(sql2), DescribeNamespaceStatement(Seq("a", "b"), extended = false))
}
test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") {
comparePlans(parsePlan("describe t"),
DescribeTableStatement(Seq("t"), Map.empty, isExtended = false))

View file

@ -158,6 +158,13 @@ class ResolveSessionCatalog(
case AlterViewUnsetPropertiesStatement(SessionCatalog(catalog, tableName), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(tableName.asTableIdentifier, keys, ifExists, isView = true)
case d @ DescribeNamespaceStatement(SessionCatalog(_, nameParts), _) =>
if (nameParts.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${nameParts.quoted}")
}
DescribeDatabaseCommand(nameParts.head, d.extended)
case DescribeTableStatement(
nameParts @ SessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
loadTable(catalog, tableName.asIdentifier).collect {

View file

@ -258,18 +258,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
)
}
/**
* Create a [[DescribeDatabaseCommand]] command.
*
* For example:
* {{{
* DESCRIBE DATABASE [EXTENDED] database;
* }}}
*/
override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) {
DescribeDatabaseCommand(ctx.db.getText, ctx.EXTENDED != null)
}
/**
* Create a plan for a DESCRIBE FUNCTION command.
*/

View file

@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
@ -192,6 +192,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
Nil
}
case desc @ DescribeNamespace(catalog, namespace, extended) =>
DescribeNamespaceExec(desc.output, catalog, namespace, extended) :: Nil
case desc @ DescribeTable(DataSourceV2Relation(table, _, _), isExtended) =>
DescribeTableExec(desc.output, table, isExtended) :: Nil

View file

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.COMMENT_TABLE_PROP
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.LOCATION_TABLE_PROP
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.RESERVED_PROPERTIES
import org.apache.spark.sql.types.StructType
/**
* Physical plan node for describing a namespace.
*/
case class DescribeNamespaceExec(
output: Seq[Attribute],
catalog: CatalogPlugin,
namespace: Seq[String],
isExtended: Boolean) extends V2CommandExec {
private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind()
override protected def run(): Seq[InternalRow] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val rows = new ArrayBuffer[InternalRow]()
val nsCatalog = catalog.asNamespaceCatalog
val ns = namespace.toArray
val metadata = nsCatalog.loadNamespaceMetadata(ns)
rows += toCatalystRow("Namespace Name", ns.last)
rows += toCatalystRow("Description", metadata.get(COMMENT_TABLE_PROP))
rows += toCatalystRow("Location", metadata.get(LOCATION_TABLE_PROP))
if (isExtended) {
val properties = metadata.asScala.toSeq.filter(p => !RESERVED_PROPERTIES.contains(p._1))
if (properties.nonEmpty) {
rows += toCatalystRow("Properties", properties.mkString("(", ",", ")"))
}
}
rows
}
private def toCatalystRow(strs: String*): InternalRow = {
encoder.toRow(new GenericRowWithSchema(strs.toArray, schema)).copy()
}
}

View file

@ -834,7 +834,6 @@ class DataSourceV2SQLSuite
assert(catalogPath.equals(catalogPath))
}
}
// TODO: Add tests for validating namespace metadata when DESCRIBE NAMESPACE is available.
}
test("CreateNameSpace: test handling of 'IF NOT EXIST'") {
@ -915,6 +914,25 @@ class DataSourceV2SQLSuite
assert(exception.getMessage.contains("Namespace 'ns1' not found"))
}
test("DescribeNamespace using v2 catalog") {
withNamespace("testcat.ns1.ns2") {
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1.ns2 COMMENT " +
"'test namespace' LOCATION '/tmp/ns_test'")
val descriptionDf = sql("DESCRIBE NAMESPACE testcat.ns1.ns2")
assert(descriptionDf.schema.map(field => (field.name, field.dataType)) ===
Seq(
("name", StringType),
("value", StringType)
))
val description = descriptionDf.collect()
assert(description === Seq(
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test")
))
}
}
test("ShowNamespaces: show root namespaces with default v2 catalog") {
spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat")

View file

@ -108,25 +108,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
comparePlans(parsed1, expected1)
}
test("describe database") {
// DESCRIBE DATABASE [EXTENDED] db_name;
val sql1 = "DESCRIBE DATABASE EXTENDED db_name"
val sql2 = "DESCRIBE DATABASE db_name"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val expected1 = DescribeDatabaseCommand(
"db_name",
extended = true)
val expected2 = DescribeDatabaseCommand(
"db_name",
extended = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
test("create function") {
val sql1 =
"""