[SPARK-14129][SPARK-14128][SQL] Alter table DDL commands

## What changes were proposed in this pull request?

In Spark 2.0, we want to handle the most common `ALTER TABLE` commands ourselves instead of passing the entire query text to Hive. This is done using the new `SessionCatalog` API introduced recently.

The commands supported in this patch include:
```
ALTER TABLE ... RENAME TO ...
ALTER TABLE ... SET TBLPROPERTIES ...
ALTER TABLE ... UNSET TBLPROPERTIES ...
ALTER TABLE ... SET LOCATION ...
ALTER TABLE ... SET SERDE ...
```
The commands we explicitly do not support are:
```
ALTER TABLE ... CLUSTERED BY ...
ALTER TABLE ... SKEWED BY ...
ALTER TABLE ... NOT CLUSTERED
ALTER TABLE ... NOT SORTED
ALTER TABLE ... NOT SKEWED
ALTER TABLE ... NOT STORED AS DIRECTORIES
```
For these we throw exceptions complaining that they are not supported.

## How was this patch tested?

`DDLSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #12121 from andrewor14/alter-table-ddl.
This commit is contained in:
Andrew Or 2016-04-05 14:54:07 -07:00 committed by Yin Huai
parent c59abad052
commit 45d8cdee39
9 changed files with 562 additions and 300 deletions

View file

@ -54,6 +54,10 @@ trait FunctionRegistry {
/** Checks if a function with a given name exists. */
def functionExists(name: String): Boolean = lookupFunction(name).isDefined
/** Clear all registered functions. */
def clear(): Unit
}
class SimpleFunctionRegistry extends FunctionRegistry {
@ -93,6 +97,10 @@ class SimpleFunctionRegistry extends FunctionRegistry {
functionBuilders.remove(name).isDefined
}
override def clear(): Unit = {
functionBuilders.clear()
}
def copy(): SimpleFunctionRegistry = synchronized {
val registry = new SimpleFunctionRegistry
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
@ -132,6 +140,10 @@ object EmptyFunctionRegistry extends FunctionRegistry {
throw new UnsupportedOperationException
}
override def clear(): Unit = {
throw new UnsupportedOperationException
}
}

View file

@ -304,11 +304,18 @@ class SessionCatalog(
dbTables ++ _tempTables
}
// TODO: It's strange that we have both refresh and invalidate here.
/**
* Refresh the cache entry for a metastore table, if any.
*/
def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
/**
* Invalidate the cache entry for a metastore table, if any.
*/
def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ }
/**
* Drop all existing temporary tables.
* For testing only.
@ -595,6 +602,11 @@ class SessionCatalog(
}
}
/**
* List all functions in the specified database, including temporary functions.
*/
def listFunctions(db: String): Seq[FunctionIdentifier] = listFunctions(db, "*")
/**
* List all matching functions in the specified database, including temporary functions.
*/
@ -609,4 +621,34 @@ class SessionCatalog(
// So, the returned list may have two entries for the same function.
dbFunctions ++ loadedFunctions
}
// -----------------
// | Other methods |
// -----------------
/**
* Drop all existing databases (except "default") along with all associated tables,
* partitions and functions, and set the current database to "default".
*
* This is mainly used for tests.
*/
private[sql] def reset(): Unit = {
val default = "default"
listDatabases().filter(_ != default).foreach { db =>
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
}
tempTables.clear()
functionRegistry.clear()
// restore built-in functions
FunctionRegistry.builtin.listFunction().foreach { f =>
val expressionInfo = FunctionRegistry.builtin.lookupFunction(f)
val functionBuilder = FunctionRegistry.builtin.lookupFunctionBuilder(f)
require(expressionInfo.isDefined, s"built-in function '$f' is missing expression info")
require(functionBuilder.isDefined, s"built-in function '$f' is missing function builder")
functionRegistry.registerFunction(f, expressionInfo.get, functionBuilder.get)
}
setCurrentDatabase(default)
}
}

View file

@ -41,4 +41,6 @@ class StringKeyHashMap[T](normalizer: (String) => String) {
def remove(key: String): Option[T] = base.remove(normalizer(key))
def iterator: Iterator[(String, T)] = base.toIterator
def clear(): Unit = base.clear()
}

View file

@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
import scala.collection.JavaConverters._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, AstBuilder, ParseException}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@ -378,8 +378,7 @@ class SparkSqlAstBuilder extends AstBuilder {
override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableRename(
visitTableIdentifier(ctx.from),
visitTableIdentifier(ctx.to))(
command(ctx))
visitTableIdentifier(ctx.to))
}
/**
@ -395,8 +394,7 @@ class SparkSqlAstBuilder extends AstBuilder {
ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
AlterTableSetProperties(
visitTableIdentifier(ctx.tableIdentifier),
visitTablePropertyList(ctx.tablePropertyList))(
command(ctx))
visitTablePropertyList(ctx.tablePropertyList))
}
/**
@ -404,17 +402,16 @@ class SparkSqlAstBuilder extends AstBuilder {
*
* For example:
* {{{
* ALTER TABLE table UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
* ALTER VIEW view UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
* ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* ALTER VIEW view UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* }}}
*/
override def visitUnsetTableProperties(
ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
AlterTableUnsetProperties(
visitTableIdentifier(ctx.tableIdentifier),
visitTablePropertyList(ctx.tablePropertyList),
ctx.EXISTS != null)(
command(ctx))
visitTablePropertyList(ctx.tablePropertyList).keys.toSeq,
ctx.EXISTS != null)
}
/**
@ -432,116 +429,41 @@ class SparkSqlAstBuilder extends AstBuilder {
Option(ctx.STRING).map(string),
Option(ctx.tablePropertyList).map(visitTablePropertyList),
// TODO a partition spec is allowed to have optional values. This is currently violated.
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
command(ctx))
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}
/**
* Create an [[AlterTableStorageProperties]] command.
*
* For example:
* {{{
* ALTER TABLE table CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS;
* }}}
*/
// TODO: don't even bother parsing alter table commands related to bucketing and skewing
override def visitBucketTable(ctx: BucketTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableStorageProperties(
visitTableIdentifier(ctx.tableIdentifier),
visitBucketSpec(ctx.bucketSpec))(
command(ctx))
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... CLUSTERED BY ... INTO N BUCKETS")
}
/**
* Create an [[AlterTableNotClustered]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT CLUSTERED;
* }}}
*/
override def visitUnclusterTable(ctx: UnclusterTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotClustered(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT CLUSTERED")
}
/**
* Create an [[AlterTableNotSorted]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT SORTED;
* }}}
*/
override def visitUnsortTable(ctx: UnsortTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotSorted(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SORTED")
}
/**
* Create an [[AlterTableSkewed]] command.
*
* For example:
* {{{
* ALTER TABLE table SKEWED BY (col1, col2)
* ON ((col1_value, col2_value) [, (col1_value, col2_value), ...])
* [STORED AS DIRECTORIES];
* }}}
*/
override def visitSkewTable(ctx: SkewTableContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier)
val (cols, values, storedAsDirs) = visitSkewSpec(ctx.skewSpec)
AlterTableSkewed(table, cols, values, storedAsDirs)(command(ctx))
throw new AnalysisException("Operation not allowed: ALTER TABLE ... SKEWED BY ...")
}
/**
* Create an [[AlterTableNotSorted]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT SKEWED;
* }}}
*/
override def visitUnskewTable(ctx: UnskewTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotSkewed(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SKEWED")
}
/**
* Create an [[AlterTableNotStoredAsDirs]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT STORED AS DIRECTORIES
* }}}
*/
override def visitUnstoreTable(ctx: UnstoreTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotStoredAsDirs(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... NOT STORED AS DIRECTORIES")
}
/**
* Create an [[AlterTableSkewedLocation]] command.
*
* For example:
* {{{
* ALTER TABLE table SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] );
* }}}
*/
override def visitSetTableSkewLocations(
ctx: SetTableSkewLocationsContext): LogicalPlan = withOrigin(ctx) {
val skewedMap = ctx.skewedLocationList.skewedLocation.asScala.flatMap {
slCtx =>
val location = string(slCtx.STRING)
if (slCtx.constant != null) {
Seq(visitStringConstant(slCtx.constant) -> location)
} else {
// TODO this is similar to what was in the original implementation. However this does not
// make to much sense to me since we should be storing a tuple of values (not column
// names) for which we want a dedicated storage location.
visitConstantList(slCtx.constantList).map(_ -> location)
}
}.toMap
AlterTableSkewedLocation(
visitTableIdentifier(ctx.tableIdentifier),
skewedMap)(
command(ctx))
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... SET SKEWED LOCATION ...")
}
/**
@ -703,8 +625,7 @@ class SparkSqlAstBuilder extends AstBuilder {
AlterTableSetLocation(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
visitLocationSpec(ctx.locationSpec))(
command(ctx))
visitLocationSpec(ctx.locationSpec))
}
/**

View file

@ -18,12 +18,11 @@
package org.apache.spark.sql.execution.command
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
@ -175,67 +174,133 @@ case class DescribeDatabase(
}
}
/** Rename in ALTER TABLE/VIEW: change the name of a table/view to a different name. */
/**
* A command that renames a table/view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table1 RENAME TO table2;
* ALTER VIEW view1 RENAME TO view2;
* }}}
*/
case class AlterTableRename(
oldName: TableIdentifier,
newName: TableIdentifier)(sql: String)
extends NativeDDLCommand(sql) with Logging
newName: TableIdentifier)
extends RunnableCommand {
/** Set Properties in ALTER TABLE/VIEW: add metadata to a table/view. */
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
catalog.invalidateTable(oldName)
catalog.renameTable(oldName, newName)
Seq.empty[Row]
}
}
/**
* A command that sets table/view properties.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
* ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
* }}}
*/
case class AlterTableSetProperties(
tableName: TableIdentifier,
properties: Map[String, String])(sql: String)
extends NativeDDLCommand(sql) with Logging
properties: Map[String, String])
extends RunnableCommand {
/** Unset Properties in ALTER TABLE/VIEW: remove metadata from a table/view. */
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
val table = catalog.getTable(tableName)
val newProperties = table.properties ++ properties
if (DDLUtils.isDatasourceTable(newProperties)) {
throw new AnalysisException(
"alter table properties is not supported for tables defined using the datasource API")
}
val newTable = table.copy(properties = newProperties)
catalog.alterTable(newTable)
Seq.empty[Row]
}
}
/**
* A command that unsets table/view properties.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
* ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
* }}}
*/
case class AlterTableUnsetProperties(
tableName: TableIdentifier,
properties: Map[String, String],
ifExists: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
propKeys: Seq[String],
ifExists: Boolean)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
val table = catalog.getTable(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"alter table properties is not supported for datasource tables")
}
if (!ifExists) {
propKeys.foreach { k =>
if (!table.properties.contains(k)) {
throw new AnalysisException(
s"attempted to unset non-existent property '$k' in table '$tableName'")
}
}
}
val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
val newTable = table.copy(properties = newProperties)
catalog.alterTable(newTable)
Seq.empty[Row]
}
}
/**
* A command that sets the serde class and/or serde properties of a table/view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
* ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
* }}}
*/
case class AlterTableSerDeProperties(
tableName: TableIdentifier,
serdeClassName: Option[String],
serdeProperties: Option[Map[String, String]],
partition: Option[Map[String, String]])(sql: String)
extends NativeDDLCommand(sql) with Logging
partition: Option[Map[String, String]])
extends RunnableCommand {
case class AlterTableStorageProperties(
tableName: TableIdentifier,
buckets: BucketSpec)(sql: String)
extends NativeDDLCommand(sql) with Logging
// should never happen if we parsed things correctly
require(serdeClassName.isDefined || serdeProperties.isDefined,
"alter table attempted to set neither serde class name nor serde properties")
case class AlterTableNotClustered(
tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
val table = catalog.getTable(tableName)
// Do not support setting serde for datasource tables
if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"alter table serde is not supported for datasource tables")
}
val newTable = table.withNewStorage(
serde = serdeClassName.orElse(table.storage.serde),
serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))
catalog.alterTable(newTable)
Seq.empty[Row]
}
case class AlterTableNotSorted(
tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
case class AlterTableSkewed(
tableName: TableIdentifier,
// e.g. (dt, country)
skewedCols: Seq[String],
// e.g. ('2008-08-08', 'us), ('2009-09-09', 'uk')
skewedValues: Seq[Seq[String]],
storedAsDirs: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging {
require(skewedValues.forall(_.size == skewedCols.size),
"number of columns in skewed values do not match number of skewed columns provided")
}
case class AlterTableNotSkewed(
tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
case class AlterTableNotStoredAsDirs(
tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
case class AlterTableSkewedLocation(
tableName: TableIdentifier,
skewedMap: Map[String, String])(sql: String)
extends NativeDDLCommand(sql) with Logging
/**
* Add Partition in ALTER TABLE/VIEW: add the table/view partitions.
* 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
@ -292,11 +357,53 @@ case class AlterTableSetFileFormat(
genericFormat: Option[String])(sql: String)
extends NativeDDLCommand(sql) with Logging
/**
* A command that sets the location of a table or a partition.
*
* For normal tables, this just sets the location URI in the table/partition's storage format.
* For datasource tables, this sets a "path" parameter in the table/partition's serde properties.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc";
* }}}
*/
case class AlterTableSetLocation(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
location: String)(sql: String)
extends NativeDDLCommand(sql) with Logging
location: String)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
val table = catalog.getTable(tableName)
partitionSpec match {
case Some(spec) =>
// Partition spec is specified, so we set the location only for this partition
val part = catalog.getPartition(tableName, spec)
val newPart =
if (DDLUtils.isDatasourceTable(table)) {
part.copy(storage = part.storage.copy(
serdeProperties = part.storage.serdeProperties ++ Map("path" -> location)))
} else {
part.copy(storage = part.storage.copy(locationUri = Some(location)))
}
catalog.alterPartitions(tableName, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
val newTable =
if (DDLUtils.isDatasourceTable(table)) {
table.withNewStorage(
serdeProperties = table.storage.serdeProperties ++ Map("path" -> location))
} else {
table.withNewStorage(locationUri = Some(location))
}
catalog.alterTable(newTable)
}
Seq.empty[Row]
}
}
case class AlterTableTouch(
tableName: TableIdentifier,
@ -341,3 +448,16 @@ case class AlterTableReplaceCol(
restrict: Boolean,
cascade: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
private object DDLUtils {
def isDatasourceTable(props: Map[String, String]): Boolean = {
props.contains("spark.sql.sources.provider")
}
def isDatasourceTable(table: CatalogTable): Boolean = {
isDatasourceTable(table.properties)
}
}

View file

@ -205,10 +205,10 @@ class DDLCommandSuite extends PlanTest {
val parsed_view = parser.parsePlan(sql_view)
val expected_table = AlterTableRename(
TableIdentifier("table_name", None),
TableIdentifier("new_table_name", None))(sql_table)
TableIdentifier("new_table_name", None))
val expected_view = AlterTableRename(
TableIdentifier("table_name", None),
TableIdentifier("new_table_name", None))(sql_view)
TableIdentifier("new_table_name", None))
comparePlans(parsed_table, expected_table)
comparePlans(parsed_view, expected_view)
}
@ -235,14 +235,14 @@ class DDLCommandSuite extends PlanTest {
val tableIdent = TableIdentifier("table_name", None)
val expected1_table = AlterTableSetProperties(
tableIdent, Map("test" -> "test", "comment" -> "new_comment"))(sql1_table)
tableIdent, Map("test" -> "test", "comment" -> "new_comment"))
val expected2_table = AlterTableUnsetProperties(
tableIdent, Map("comment" -> null, "test" -> null), ifExists = false)(sql2_table)
tableIdent, Seq("comment", "test"), ifExists = false)
val expected3_table = AlterTableUnsetProperties(
tableIdent, Map("comment" -> null, "test" -> null), ifExists = true)(sql3_table)
val expected1_view = expected1_table.copy()(sql = sql1_view)
val expected2_view = expected2_table.copy()(sql = sql2_view)
val expected3_view = expected3_table.copy()(sql = sql3_view)
tableIdent, Seq("comment", "test"), ifExists = true)
val expected1_view = expected1_table
val expected2_view = expected2_table
val expected3_view = expected3_table
comparePlans(parsed1_table, expected1_table)
comparePlans(parsed2_table, expected2_table)
@ -282,24 +282,24 @@ class DDLCommandSuite extends PlanTest {
val parsed5 = parser.parsePlan(sql5)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSerDeProperties(
tableIdent, Some("org.apache.class"), None, None)(sql1)
tableIdent, Some("org.apache.class"), None, None)
val expected2 = AlterTableSerDeProperties(
tableIdent,
Some("org.apache.class"),
Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
None)(sql2)
None)
val expected3 = AlterTableSerDeProperties(
tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)(sql3)
tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)
val expected4 = AlterTableSerDeProperties(
tableIdent,
Some("org.apache.class"),
Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql4)
Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))
val expected5 = AlterTableSerDeProperties(
tableIdent,
None,
Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql5)
Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
@ -307,103 +307,6 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed5, expected5)
}
test("alter table: storage properties") {
val sql1 = "ALTER TABLE table_name CLUSTERED BY (dt, country) INTO 10 BUCKETS"
val sql2 = "ALTER TABLE table_name CLUSTERED BY (dt, country) SORTED BY " +
"(dt, country DESC) INTO 10 BUCKETS"
val sql3 = "ALTER TABLE table_name NOT CLUSTERED"
val sql4 = "ALTER TABLE table_name NOT SORTED"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val parsed4 = parser.parsePlan(sql4)
val tableIdent = TableIdentifier("table_name", None)
val cols = List("dt", "country")
// TODO: also test the sort directions once we keep track of that
val expected1 = AlterTableStorageProperties(
tableIdent, BucketSpec(10, cols, Nil))(sql1)
val expected2 = AlterTableStorageProperties(
tableIdent, BucketSpec(10, cols, cols))(sql2)
val expected3 = AlterTableNotClustered(tableIdent)(sql3)
val expected4 = AlterTableNotSorted(tableIdent)(sql4)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
}
test("alter table: skewed") {
val sql1 =
"""
|ALTER TABLE table_name SKEWED BY (dt, country) ON
|(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn')) STORED AS DIRECTORIES
""".stripMargin
val sql2 =
"""
|ALTER TABLE table_name SKEWED BY (dt, country) ON
|('2008-08-08', 'us') STORED AS DIRECTORIES
""".stripMargin
val sql3 =
"""
|ALTER TABLE table_name SKEWED BY (dt, country) ON
|(('2008-08-08', 'us'), ('2009-09-09', 'uk'))
""".stripMargin
val sql4 = "ALTER TABLE table_name NOT SKEWED"
val sql5 = "ALTER TABLE table_name NOT STORED AS DIRECTORIES"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val parsed4 = parser.parsePlan(sql4)
val parsed5 = parser.parsePlan(sql5)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSkewed(
tableIdent,
Seq("dt", "country"),
Seq(List("2008-08-08", "us"), List("2009-09-09", "uk"), List("2010-10-10", "cn")),
storedAsDirs = true)(sql1)
val expected2 = AlterTableSkewed(
tableIdent,
Seq("dt", "country"),
Seq(List("2008-08-08", "us")),
storedAsDirs = true)(sql2)
val expected3 = AlterTableSkewed(
tableIdent,
Seq("dt", "country"),
Seq(List("2008-08-08", "us"), List("2009-09-09", "uk")),
storedAsDirs = false)(sql3)
val expected4 = AlterTableNotSkewed(tableIdent)(sql4)
val expected5 = AlterTableNotStoredAsDirs(tableIdent)(sql5)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
comparePlans(parsed5, expected5)
}
test("alter table: skewed location") {
val sql1 =
"""
|ALTER TABLE table_name SET SKEWED LOCATION
|('123'='location1', 'test'='location2')
""".stripMargin
val sql2 =
"""
|ALTER TABLE table_name SET SKEWED LOCATION
|(('2008-08-08', 'us')='location1', 'test'='location2')
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSkewedLocation(
tableIdent,
Map("123" -> "location1", "test" -> "location2"))(sql1)
val expected2 = AlterTableSkewedLocation(
tableIdent,
Map("2008-08-08" -> "location1", "us" -> "location1", "test" -> "location2"))(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
// ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec
// [LOCATION 'location1'] partition_spec [LOCATION 'location2'] ...;
test("alter table: add partition") {
@ -615,11 +518,11 @@ class DDLCommandSuite extends PlanTest {
val expected1 = AlterTableSetLocation(
tableIdent,
None,
"new location")(sql1)
"new location")
val expected2 = AlterTableSetLocation(
tableIdent,
Some(Map("dt" -> "2008-08-08", "country" -> "us")),
"new location")(sql2)
"new location")
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}

View file

@ -19,34 +19,63 @@ package org.apache.spark.sql.execution.command
import java.io.File
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.test.SharedSQLContext
class DDLSuite extends QueryTest with SharedSQLContext {
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private val escapedIdentifier = "`(.+)`".r
override def afterEach(): Unit = {
try {
// drop all databases, tables and functions after each test
sqlContext.sessionState.catalog.reset()
} finally {
super.afterEach()
}
}
/**
* Strip backticks, if any, from the string.
*/
def cleanIdentifier(ident: String): String = {
private def cleanIdentifier(ident: String): String = {
ident match {
case escapedIdentifier(i) => i
case plainIdent => plainIdent
}
}
/**
* Drops database `databaseName` after calling `f`.
*/
private def withDatabase(dbNames: String*)(f: => Unit): Unit = {
try f finally {
dbNames.foreach { name =>
sqlContext.sql(s"DROP DATABASE IF EXISTS $name CASCADE")
}
sqlContext.sessionState.catalog.setCurrentDatabase("default")
private def assertUnsupported(query: String): Unit = {
val e = intercept[AnalysisException] {
sql(query)
}
assert(e.getMessage.toLowerCase.contains("operation not allowed"))
}
private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false)
}
private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
catalog.createTable(CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL_TABLE,
storage = CatalogStorageFormat(None, None, None, None, Map()),
schema = Seq()), ignoreIfExists = false)
}
private def createTablePartition(
catalog: SessionCatalog,
spec: TablePartitionSpec,
tableName: TableIdentifier): Unit = {
val part = CatalogTablePartition(spec, CatalogStorageFormat(None, None, None, None, Map()))
catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
}
test("Create/Drop Database") {
@ -55,7 +84,7 @@ class DDLSuite extends QueryTest with SharedSQLContext {
val databaseNames = Seq("db1", "`database`")
databaseNames.foreach { dbName =>
withDatabase(dbName) {
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
@ -67,6 +96,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
} finally {
catalog.reset()
}
}
}
@ -76,8 +107,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
val databaseNames = Seq("db1", "`database`")
databaseNames.foreach { dbName =>
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
withDatabase(dbName) {
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
@ -90,6 +121,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
sql(s"CREATE DATABASE $dbName")
}.getMessage
assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
} finally {
catalog.reset()
}
}
}
@ -99,7 +132,7 @@ class DDLSuite extends QueryTest with SharedSQLContext {
val databaseNames = Seq("db1", "`database`")
databaseNames.foreach { dbName =>
withDatabase(dbName) {
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location =
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
@ -129,6 +162,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
} finally {
catalog.reset()
}
}
}
@ -159,6 +194,131 @@ class DDLSuite extends QueryTest with SharedSQLContext {
}
}
// TODO: test drop database in restrict mode
test("alter table: rename") {
val catalog = sqlContext.sessionState.catalog
val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
val tableIdent2 = TableIdentifier("tab2", Some("dbx"))
createDatabase(catalog, "dbx")
createDatabase(catalog, "dby")
createTable(catalog, tableIdent1)
assert(catalog.listTables("dbx") == Seq(tableIdent1))
sql("ALTER TABLE dbx.tab1 RENAME TO dbx.tab2")
assert(catalog.listTables("dbx") == Seq(tableIdent2))
catalog.setCurrentDatabase("dbx")
// rename without explicitly specifying database
sql("ALTER TABLE tab2 RENAME TO tab1")
assert(catalog.listTables("dbx") == Seq(tableIdent1))
// table to rename does not exist
intercept[AnalysisException] {
sql("ALTER TABLE dbx.does_not_exist RENAME TO dbx.tab2")
}
// destination database is different
intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 RENAME TO dby.tab2")
}
}
test("alter table: set location") {
testSetLocation(isDatasourceTable = false)
}
test("alter table: set location (datasource table)") {
testSetLocation(isDatasourceTable = true)
}
test("alter table: set properties") {
val catalog = sqlContext.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
assert(catalog.getTable(tableIdent).properties.isEmpty)
// set table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
assert(catalog.getTable(tableIdent).properties == Map("andrew" -> "or14", "kor" -> "bel"))
// set table properties without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
assert(catalog.getTable(tableIdent).properties ==
Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")
}
// throw exception for datasource tables
convertToDatasourceTable(catalog, tableIdent)
val e = intercept[AnalysisException] {
sql("ALTER TABLE tab1 SET TBLPROPERTIES ('sora' = 'bol')")
}
assert(e.getMessage.contains("datasource"))
}
test("alter table: unset properties") {
val catalog = sqlContext.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
// unset table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')")
sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
assert(catalog.getTable(tableIdent).properties == Map("p" -> "an", "c" -> "lan"))
// unset table properties without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
assert(catalog.getTable(tableIdent).properties == Map("c" -> "lan"))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
}
// property to unset does not exist
val e = intercept[AnalysisException] {
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')")
}
assert(e.getMessage.contains("xyz"))
// property to unset does not exist, but "IF EXISTS" is specified
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
assert(catalog.getTable(tableIdent).properties.isEmpty)
// throw exception for datasource tables
convertToDatasourceTable(catalog, tableIdent)
val e1 = intercept[AnalysisException] {
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('sora')")
}
assert(e1.getMessage.contains("datasource"))
}
test("alter table: set serde") {
testSetSerde(isDatasourceTable = false)
}
test("alter table: set serde (datasource table)") {
testSetSerde(isDatasourceTable = true)
}
test("alter table: bucketing is not supported") {
val catalog = sqlContext.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
assertUnsupported("ALTER TABLE dbx.tab1 CLUSTERED BY (blood, lemon, grape) INTO 11 BUCKETS")
assertUnsupported("ALTER TABLE dbx.tab1 CLUSTERED BY (fuji) SORTED BY (grape) INTO 5 BUCKETS")
assertUnsupported("ALTER TABLE dbx.tab1 NOT CLUSTERED")
assertUnsupported("ALTER TABLE dbx.tab1 NOT SORTED")
}
test("alter table: skew is not supported") {
val catalog = sqlContext.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
assertUnsupported("ALTER TABLE dbx.tab1 SKEWED BY (dt, country) ON " +
"(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn'))")
assertUnsupported("ALTER TABLE dbx.tab1 SKEWED BY (dt, country) ON " +
"(('2008-08-08', 'us'), ('2009-09-09', 'uk')) STORED AS DIRECTORIES")
assertUnsupported("ALTER TABLE dbx.tab1 NOT SKEWED")
assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES")
}
// TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext
test("show tables") {
@ -206,29 +366,129 @@ class DDLSuite extends QueryTest with SharedSQLContext {
}
test("show databases") {
withDatabase("showdb1A", "showdb2B") {
sql("CREATE DATABASE showdb1A")
sql("CREATE DATABASE showdb2B")
sql("CREATE DATABASE showdb1A")
sql("CREATE DATABASE showdb2B")
assert(
sql("SHOW DATABASES").count() >= 2)
assert(
sql("SHOW DATABASES").count() >= 2)
checkAnswer(
sql("SHOW DATABASES LIKE '*db1A'"),
Row("showdb1A") :: Nil)
checkAnswer(
sql("SHOW DATABASES LIKE '*db1A'"),
Row("showdb1A") :: Nil)
checkAnswer(
sql("SHOW DATABASES LIKE 'showdb1A'"),
Row("showdb1A") :: Nil)
checkAnswer(
sql("SHOW DATABASES LIKE 'showdb1A'"),
Row("showdb1A") :: Nil)
checkAnswer(
sql("SHOW DATABASES LIKE '*db1A|*db2B'"),
Row("showdb1A") ::
Row("showdb2B") :: Nil)
checkAnswer(
sql("SHOW DATABASES LIKE '*db1A|*db2B'"),
Row("showdb1A") ::
Row("showdb2B") :: Nil)
checkAnswer(
sql("SHOW DATABASES LIKE 'non-existentdb'"),
Nil)
checkAnswer(
sql("SHOW DATABASES LIKE 'non-existentdb'"),
Nil)
}
private def convertToDatasourceTable(
catalog: SessionCatalog,
tableIdent: TableIdentifier): Unit = {
catalog.alterTable(catalog.getTable(tableIdent).copy(
properties = Map("spark.sql.sources.provider" -> "csv")))
}
private def testSetLocation(isDatasourceTable: Boolean): Unit = {
val catalog = sqlContext.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val partSpec = Map("a" -> "1")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, partSpec, tableIdent)
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
assert(catalog.getTable(tableIdent).storage.locationUri.isEmpty)
assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty)
// Verify that the location is set to the expected string
def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = {
val storageFormat = spec
.map { s => catalog.getPartition(tableIdent, s).storage }
.getOrElse { catalog.getTable(tableIdent).storage }
if (isDatasourceTable) {
assert(storageFormat.serdeProperties.get("path") === Some(expected))
} else {
assert(storageFormat.locationUri === Some(expected))
}
}
// set table location
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
verifyLocation("/path/to/your/lovely/heart")
// set table partition location
sql("ALTER TABLE dbx.tab1 PARTITION (a='1') SET LOCATION '/path/to/part/ways'")
verifyLocation("/path/to/part/ways", Some(partSpec))
// set table location without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
verifyLocation("/swanky/steak/place")
// set table partition location without explicitly specifying database
sql("ALTER TABLE tab1 PARTITION (a='1') SET LOCATION 'vienna'")
verifyLocation("vienna", Some(partSpec))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE dbx.does_not_exist SET LOCATION '/mister/spark'")
}
// partition to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 PARTITION (b='2') SET LOCATION '/mister/spark'")
}
}
private def testSetSerde(isDatasourceTable: Boolean): Unit = {
val catalog = sqlContext.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
assert(catalog.getTable(tableIdent).storage.serde.isEmpty)
assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 SET SERDE 'whatever'")
}
val e2 = intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
}
assert(e1.getMessage.contains("datasource"))
assert(e2.getMessage.contains("datasource"))
} else {
sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.jadoop"))
assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.madoop"))
assert(catalog.getTable(tableIdent).storage.serdeProperties ==
Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
assert(catalog.getTable(tableIdent).storage.serdeProperties ==
Map("k" -> "vvv", "kay" -> "vee"))
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
assert(catalog.getTable(tableIdent).storage.serdeProperties ==
Map("k" -> "vvv", "kay" -> "veee"))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
}
}
}

View file

@ -360,6 +360,12 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"show_create_table_serde",
"show_create_table_view",
// These tests try to change how a table is bucketed, which we don't support
"alter4",
"sort_merge_join_desc_5",
"sort_merge_join_desc_6",
"sort_merge_join_desc_7",
// Index commands are not supported
"drop_index",
"drop_index_removes_partition_dirs",
@ -381,7 +387,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"alias_casted_column",
"alter2",
"alter3",
"alter4",
"alter5",
"alter_merge_2",
"alter_partition_format_loc",
@ -880,9 +885,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"sort_merge_join_desc_2",
"sort_merge_join_desc_3",
"sort_merge_join_desc_4",
"sort_merge_join_desc_5",
"sort_merge_join_desc_6",
"sort_merge_join_desc_7",
"stats0",
"stats_aggregator_error_1",
"stats_empty_partition",

View file

@ -94,7 +94,7 @@ private[sql] class HiveSessionCatalog(
metastoreCatalog.refreshTable(name)
}
def invalidateTable(name: TableIdentifier): Unit = {
override def invalidateTable(name: TableIdentifier): Unit = {
metastoreCatalog.invalidateTable(name)
}