[SPARK-29482][SQL] ANALYZE TABLE should look up catalog/table like v2 commands

### What changes were proposed in this pull request?

Add `AnalyzeTableStatement` and `AnalyzeColumnStatement`, and make ANALYZE TABLE go through the same catalog/table resolution framework of v2 commands.

### Why are the changes needed?

It's important to make all the commands have the same table resolution behavior, to avoid confusing end-users. e.g.
```
USE my_catalog
DESC t // success and describe the table t from my_catalog
ANALYZE TABLE t // report table not found as there is no table t in the session catalog
```

### Does this PR introduce any user-facing change?

yes. When running ANALYZE TABLE, Spark fails the command if the current catalog is set to a v2 catalog, or the table name specified a v2 catalog.

### How was this patch tested?

new tests

Closes #26129 from cloud-fan/analyze-table.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
This commit is contained in:
Wenchen Fan 2019-10-18 12:55:49 +02:00 committed by Gengliang Wang
parent dba673f0e3
commit 74351468de
8 changed files with 180 additions and 113 deletions

View file

@ -124,7 +124,7 @@ statement
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #replaceTable
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
| ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
| ALTER TABLE multipartIdentifier
ADD (COLUMN | COLUMNS)

View file

@ -2660,4 +2660,52 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
isExtended)
}
}
/**
* Create an [[AnalyzeTableStatement]], or an [[AnalyzeColumnStatement]].
* Example SQL for analyzing a table or a set of partitions :
* {{{
* ANALYZE TABLE multi_part_name [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
* COMPUTE STATISTICS [NOSCAN];
* }}}
*
* Example SQL for analyzing columns :
* {{{
* ANALYZE TABLE multi_part_name COMPUTE STATISTICS FOR COLUMNS column1, column2;
* }}}
*
* Example SQL for analyzing all columns of a table:
* {{{
* ANALYZE TABLE multi_part_name COMPUTE STATISTICS FOR ALL COLUMNS;
* }}}
*/
override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
def checkPartitionSpec(): Unit = {
if (ctx.partitionSpec != null) {
logWarning("Partition specification is ignored when collecting column statistics: " +
ctx.partitionSpec.getText)
}
}
if (ctx.identifier != null &&
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
}
val tableName = visitMultipartIdentifier(ctx.multipartIdentifier())
if (ctx.ALL() != null) {
checkPartitionSpec()
AnalyzeColumnStatement(tableName, None, allColumns = true)
} else if (ctx.identifierSeq() == null) {
val partitionSpec = if (ctx.partitionSpec != null) {
visitPartitionSpec(ctx.partitionSpec)
} else {
Map.empty[String, Option[String]]
}
AnalyzeTableStatement(tableName, partitionSpec, noScan = ctx.identifier != null)
} else {
checkPartitionSpec()
AnalyzeColumnStatement(
tableName, Option(visitIdentifierSeq(ctx.identifierSeq())), allColumns = false)
}
}
}

View file

@ -292,3 +292,22 @@ case class ShowNamespacesStatement(namespace: Option[Seq[String]], pattern: Opti
* A USE statement, as parsed from SQL.
*/
case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends ParsedStatement
/**
* An ANALYZE TABLE statement, as parsed from SQL.
*/
case class AnalyzeTableStatement(
tableName: Seq[String],
partitionSpec: Map[String, Option[String]],
noScan: Boolean) extends ParsedStatement
/**
* An ANALYZE TABLE FOR COLUMNS statement, as parsed from SQL.
*/
case class AnalyzeColumnStatement(
tableName: Seq[String],
columnNames: Option[Seq[String]],
allColumns: Boolean) extends ParsedStatement {
require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " +
"mutually exclusive. Only one of them should be specified.")
}

View file

@ -879,6 +879,82 @@ class DDLParserSuite extends AnalysisTest {
ShowNamespacesStatement(Some(Seq("testcat", "ns1")), Some("*pattern*")))
}
test("analyze table statistics") {
comparePlans(parsePlan("analyze table a.b.c compute statistics"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map.empty, noScan = false))
comparePlans(parsePlan("analyze table a.b.c compute statistics noscan"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map.empty, noScan = true))
comparePlans(parsePlan("analyze table a.b.c partition (a) compute statistics nOscAn"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map("a" -> None), noScan = true))
// Partitions specified
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09"), "hr" -> Some("11")), noScan = false))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09"), "hr" -> Some("11")), noScan = true))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09")), noScan = true))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09"), "hr" -> None), noScan = false))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09"), "hr" -> None), noScan = true))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds, hr=11) COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> None, "hr" -> Some("11")), noScan = true))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds, hr) COMPUTE STATISTICS"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map("ds" -> None, "hr" -> None), noScan = false))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds, hr) COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map("ds" -> None, "hr" -> None), noScan = true))
intercept("analyze table a.b.c compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
intercept("analyze table a.b.c partition (a) compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
}
test("analyze table column statistics") {
intercept("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR COLUMNS", "")
comparePlans(
parsePlan("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR COLUMNS key, value"),
AnalyzeColumnStatement(Seq("a", "b", "c"), Option(Seq("key", "value")), allColumns = false))
// Partition specified - should be ignored
comparePlans(
parsePlan(
s"""
|ANALYZE TABLE a.b.c PARTITION(ds='2017-06-10')
|COMPUTE STATISTICS FOR COLUMNS key, value
""".stripMargin),
AnalyzeColumnStatement(Seq("a", "b", "c"), Option(Seq("key", "value")), allColumns = false))
// Partition specified should be ignored in case of COMPUTE STATISTICS FOR ALL COLUMNS
comparePlans(
parsePlan(
s"""
|ANALYZE TABLE a.b.c PARTITION(ds='2017-06-10')
|COMPUTE STATISTICS FOR ALL COLUMNS
""".stripMargin),
AnalyzeColumnStatement(Seq("a", "b", "c"), None, allColumns = true))
intercept("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR ALL COLUMNS key, value",
"mismatched input 'key' expecting <EOF>")
intercept("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR ALL",
"missing 'COLUMNS' at '<EOF>'")
}
private case class TableSpec(
name: Seq[String],
schema: Option[StructType],

View file

@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
@ -265,6 +265,24 @@ class ResolveSessionCatalog(
// TODO (SPARK-29014): we should check if the current catalog is session catalog here.
case ShowTablesStatement(None, pattern) if defaultCatalog.isEmpty =>
ShowTablesCommand(None, pattern)
case AnalyzeTableStatement(tableName, partitionSpec, noScan) =>
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
}
if (partitionSpec.isEmpty) {
AnalyzeTableCommand(parts.asTableIdentifier, noScan)
} else {
AnalyzePartitionCommand(parts.asTableIdentifier, partitionSpec, noScan)
}
case AnalyzeColumnStatement(tableName, columnNames, allColumns) =>
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
}
AnalyzeColumnCommand(parts.asTableIdentifier, columnNames, allColumns)
}
private def buildCatalogTable(

View file

@ -89,55 +89,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
ResetCommand
}
/**
* Create an [[AnalyzeTableCommand]] command, or an [[AnalyzePartitionCommand]]
* or an [[AnalyzeColumnCommand]] command.
* Example SQL for analyzing a table or a set of partitions :
* {{{
* ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
* COMPUTE STATISTICS [NOSCAN];
* }}}
*
* Example SQL for analyzing columns :
* {{{
* ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;
* }}}
*
* Example SQL for analyzing all columns of a table:
* {{{
* ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR ALL COLUMNS;
* }}}
*/
override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
def checkPartitionSpec(): Unit = {
if (ctx.partitionSpec != null) {
logWarning("Partition specification is ignored when collecting column statistics: " +
ctx.partitionSpec.getText)
}
}
if (ctx.identifier != null &&
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
}
val table = visitTableIdentifier(ctx.tableIdentifier)
if (ctx.ALL() != null) {
checkPartitionSpec()
AnalyzeColumnCommand(table, None, allColumns = true)
} else if (ctx.identifierSeq() == null) {
if (ctx.partitionSpec != null) {
AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec),
noscan = ctx.identifier != null)
} else {
AnalyzeTableCommand(table, noscan = ctx.identifier != null)
}
} else {
checkPartitionSpec()
AnalyzeColumnCommand(table,
Option(visitIdentifierSeq(ctx.identifierSeq())), allColumns = false)
}
}
/**
* Create a [[ShowTablesCommand]] logical plan.
* Example SQL :

View file

@ -1164,6 +1164,23 @@ class DataSourceV2SQLSuite
}
}
test("ANALYZE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
val e = intercept[AnalysisException] {
sql(s"ANALYZE TABLE $t COMPUTE STATISTICS")
}
assert(e.message.contains("ANALYZE TABLE is only supported with v1 tables"))
val e2 = intercept[AnalysisException] {
sql(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS")
}
assert(e2.message.contains("ANALYZE TABLE is only supported with v1 tables"))
}
}
private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)

View file

@ -216,68 +216,6 @@ class SparkSqlParserSuite extends AnalysisTest {
assertEqual("DESCRIBE " + query, DescribeQueryCommand(query, parser.parsePlan(query)))
}
test("analyze table statistics") {
assertEqual("analyze table t compute statistics",
AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
assertEqual("analyze table t compute statistics noscan",
AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
assertEqual("analyze table t partition (a) compute statistics nOscAn",
AnalyzePartitionCommand(TableIdentifier("t"), Map("a" -> None), noscan = true))
// Partitions specified
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> Some("2008-04-09"))))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
assertEqual("ANALYZE TABLE t PARTITION(ds, hr=11) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> None, "hr" -> Some("11"))))
assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
partitionSpec = Map("ds" -> None, "hr" -> None)))
assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> None, "hr" -> None)))
intercept("analyze table t compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
intercept("analyze table t partition (a) compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
}
test("analyze table column statistics") {
intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS", "")
assertEqual("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS key, value",
AnalyzeColumnCommand(TableIdentifier("t"), Option(Seq("key", "value")), allColumns = false))
// Partition specified - should be ignored
assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " +
"COMPUTE STATISTICS FOR COLUMNS key, value",
AnalyzeColumnCommand(TableIdentifier("t"), Option(Seq("key", "value")), allColumns = false))
// Partition specified should be ignored in case of COMPUTE STATISTICS FOR ALL COLUMNS
assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " +
"COMPUTE STATISTICS FOR ALL COLUMNS",
AnalyzeColumnCommand(TableIdentifier("t"), None, allColumns = true))
intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR ALL COLUMNS key, value",
"mismatched input 'key' expecting <EOF>")
intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR ALL",
"missing 'COLUMNS' at '<EOF>'")
}
test("query organization") {
// Test all valid combinations of order by/sort by/distribute by/cluster by/limit/windows
val baseSql = "select * from t"