[SPARK-17873][SQL] ALTER TABLE RENAME TO should allow users to specify database in destination table name(but have to be same as source table)

## What changes were proposed in this pull request?

Unlike Hive, in Spark SQL, ALTER TABLE RENAME TO cannot move a table from one database to another(e.g. `ALTER TABLE db1.tbl RENAME TO db2.tbl2`), and will report error if the database in source table and destination table is different. So in #14955 , we forbid users to specify database of destination table in ALTER TABLE RENAME TO, to be consistent with other database systems and also make it easier to rename tables in non-current database, e.g. users can write `ALTER TABLE db1.tbl RENAME TO tbl2`, instead of `ALTER TABLE db1.tbl RENAME TO db1.tbl2`.

However, this is a breaking change. Users may already have queries that specify database of destination table in ALTER TABLE RENAME TO.

This PR reverts most of #14955 , and simplify the usage of ALTER TABLE RENAME TO by making database of source table the default database of destination table, instead of current database, so that users can still write `ALTER TABLE db1.tbl RENAME TO tbl2`, which is consistent with other databases like MySQL, Postgres, etc.

## How was this patch tested?

The added back tests and some new tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15434 from cloud-fan/revert.
This commit is contained in:
Wenchen Fan 2016-10-18 20:23:13 -07:00 committed by Yin Huai
parent 2629cd7460
commit 4329c5cea4
6 changed files with 87 additions and 35 deletions

View file

@ -462,11 +462,20 @@ class SessionCatalog(
* If a database is specified in `oldName`, this will rename the table in that database.
* If no database is specified, this will first attempt to rename a temporary table with
* the same name, then, if that does not exist, rename the table in the current database.
*
* This assumes the database specified in `newName` matches the one in `oldName`.
*/
def renameTable(oldName: TableIdentifier, newName: String): Unit = synchronized {
def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized {
val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
newName.database.map(formatDatabaseName).foreach { newDb =>
if (db != newDb) {
throw new AnalysisException(
s"RENAME TABLE source and destination databases do not match: '$db' != '$newDb'")
}
}
val oldTableName = formatTableName(oldName.table)
val newTableName = formatTableName(newName)
val newTableName = formatTableName(newName.table)
if (db == globalTempViewManager.database) {
globalTempViewManager.rename(oldTableName, newTableName)
} else {
@ -476,6 +485,11 @@ class SessionCatalog(
requireTableNotExists(TableIdentifier(newTableName, Some(db)))
externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
if (newName.database.isDefined) {
throw new AnalysisException(
s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': cannot specify database " +
s"name '${newName.database.get}' in the destination table")
}
if (tempTables.contains(newTableName)) {
throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " +
"destination table already exists")

View file

@ -273,27 +273,34 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
sessionCatalog.renameTable(TableIdentifier("tbl1", Some("db2")), "tblone")
sessionCatalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier("tblone"))
assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbl2"))
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), "tbltwo")
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbltwo"))
assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbltwo"))
// Rename table without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
sessionCatalog.renameTable(TableIdentifier("tbltwo"), "table_two")
sessionCatalog.renameTable(TableIdentifier("tbltwo"), TableIdentifier("table_two"))
assert(externalCatalog.listTables("db2").toSet == Set("tblone", "table_two"))
// Renaming "db2.tblone" to "db1.tblones" should fail because databases don't match
intercept[AnalysisException] {
sessionCatalog.renameTable(
TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1")))
}
// The new table already exists
intercept[TableAlreadyExistsException] {
sessionCatalog.renameTable(TableIdentifier("tblone", Some("db2")), "table_two")
sessionCatalog.renameTable(
TableIdentifier("tblone", Some("db2")),
TableIdentifier("table_two"))
}
}
test("rename table when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[NoSuchDatabaseException] {
catalog.renameTable(TableIdentifier("tbl1", Some("unknown_db")), "tbl2")
catalog.renameTable(TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2"))
}
intercept[NoSuchTableException] {
catalog.renameTable(TableIdentifier("unknown_table", Some("db2")), "tbl2")
catalog.renameTable(TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2"))
}
}
@ -306,12 +313,12 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(sessionCatalog.getTempView("tbl1") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is not specified, temp table should be renamed first
sessionCatalog.renameTable(TableIdentifier("tbl1"), "tbl3")
sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3"))
assert(sessionCatalog.getTempView("tbl1").isEmpty)
assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is specified, temp tables are never renamed
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), "tbl4")
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4"))
assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
assert(sessionCatalog.getTempView("tbl4").isEmpty)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4"))

View file

@ -689,15 +689,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* }}}
*/
override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
val fromName = visitTableIdentifier(ctx.from)
val toName = visitTableIdentifier(ctx.to)
if (toName.database.isDefined) {
operationNotAllowed("Can not specify database in table/view name after RENAME TO", ctx)
}
AlterTableRenameCommand(
fromName,
toName.table,
visitTableIdentifier(ctx.from),
visitTableIdentifier(ctx.to),
ctx.VIEW != null)
}

View file

@ -146,7 +146,7 @@ case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends
*/
case class AlterTableRenameCommand(
oldName: TableIdentifier,
newName: String,
newName: TableIdentifier,
isView: Boolean)
extends RunnableCommand {
@ -159,7 +159,6 @@ case class AlterTableRenameCommand(
} else {
val table = catalog.getTableMetadata(oldName)
DDLUtils.verifyAlterTableType(catalog, table, isView)
val newTblName = TableIdentifier(newName, oldName.database)
// If an exception is thrown here we can just assume the table is uncached;
// this can happen with Hive tables when the underlying catalog is in-memory.
val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
@ -172,7 +171,7 @@ case class AlterTableRenameCommand(
}
// For datasource tables, we also need to update the "path" serde property
if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
val newPath = catalog.defaultTablePath(newTblName)
val newPath = catalog.defaultTablePath(newName)
val newTable = table.withNewStorage(
properties = table.storage.properties ++ Map("path" -> newPath))
catalog.alterTable(newTable)
@ -182,7 +181,7 @@ case class AlterTableRenameCommand(
catalog.refreshTable(oldName)
catalog.renameTable(oldName, newName)
if (wasCached) {
sparkSession.catalog.cacheTable(newTblName.unquotedString)
sparkSession.catalog.cacheTable(newName.unquotedString)
}
}
Seq.empty[Row]

View file

@ -387,20 +387,22 @@ class DDLCommandSuite extends PlanTest {
val parsed_table = parser.parsePlan(sql_table)
val parsed_view = parser.parsePlan(sql_view)
val expected_table = AlterTableRenameCommand(
TableIdentifier("table_name", None),
"new_table_name",
TableIdentifier("table_name"),
TableIdentifier("new_table_name"),
isView = false)
val expected_view = AlterTableRenameCommand(
TableIdentifier("table_name", None),
"new_table_name",
TableIdentifier("table_name"),
TableIdentifier("new_table_name"),
isView = true)
comparePlans(parsed_table, expected_table)
comparePlans(parsed_view, expected_view)
}
val e = intercept[ParseException](
parser.parsePlan("ALTER TABLE db1.tbl RENAME TO db1.tbl2")
)
assert(e.getMessage.contains("Can not specify database in table/view name after RENAME TO"))
test("alter table: rename table with database") {
val query = "ALTER TABLE db1.tbl RENAME TO db1.tbl2"
val plan = parseAs[AlterTableRenameCommand](query)
assert(plan.oldName == TableIdentifier("tbl", Some("db1")))
assert(plan.newName == TableIdentifier("tbl2", Some("db1")))
}
// ALTER TABLE table_name SET TBLPROPERTIES ('comment' = new_comment);

View file

@ -665,16 +665,27 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
createDatabase(catalog, "dbx")
createDatabase(catalog, "dby")
createTable(catalog, tableIdent1)
assert(catalog.listTables("dbx") == Seq(tableIdent1))
sql("ALTER TABLE dbx.tab1 RENAME TO tab2")
sql("ALTER TABLE dbx.tab1 RENAME TO dbx.tab2")
assert(catalog.listTables("dbx") == Seq(tableIdent2))
// The database in destination table name can be omitted, and we will use the database of source
// table for it.
sql("ALTER TABLE dbx.tab2 RENAME TO tab1")
assert(catalog.listTables("dbx") == Seq(tableIdent1))
catalog.setCurrentDatabase("dbx")
// rename without explicitly specifying database
sql("ALTER TABLE tab2 RENAME TO tab1")
assert(catalog.listTables("dbx") == Seq(tableIdent1))
sql("ALTER TABLE tab1 RENAME TO tab2")
assert(catalog.listTables("dbx") == Seq(tableIdent2))
// table to rename does not exist
intercept[AnalysisException] {
sql("ALTER TABLE dbx.does_not_exist RENAME TO tab2")
sql("ALTER TABLE dbx.does_not_exist RENAME TO dbx.tab2")
}
// destination database is different
intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 RENAME TO dby.tab2")
}
}
@ -696,6 +707,31 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(spark.table("teachers").collect().toSeq == df.collect().toSeq)
}
test("rename temporary table - destination table with database name") {
withTempView("tab1") {
sql(
"""
|CREATE TEMPORARY TABLE tab1
|USING org.apache.spark.sql.sources.DDLScanSource
|OPTIONS (
| From '1',
| To '10',
| Table 'test1'
|)
""".stripMargin)
val e = intercept[AnalysisException] {
sql("ALTER TABLE tab1 RENAME TO default.tab2")
}
assert(e.getMessage.contains(
"RENAME TEMPORARY TABLE from '`tab1`' to '`default`.`tab2`': " +
"cannot specify database name 'default' in the destination table"))
val catalog = spark.sessionState.catalog
assert(catalog.listTables("default") == Seq(TableIdentifier("tab1")))
}
}
test("rename temporary table") {
withTempView("tab1", "tab2") {
spark.range(10).createOrReplaceTempView("tab1")
@ -736,7 +772,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE tab1 RENAME TO tab2")
}
assert(e.getMessage.contains(
"RENAME TEMPORARY TABLE from '`tab1`' to 'tab2': destination table already exists"))
"RENAME TEMPORARY TABLE from '`tab1`' to '`tab2`': destination table already exists"))
val catalog = spark.sessionState.catalog
assert(catalog.listTables("default") == Seq(TableIdentifier("tab1"), TableIdentifier("tab2")))