[SPARK-15596][SPARK-15635][SQL] ALTER TABLE RENAME fixes
## What changes were proposed in this pull request? **SPARK-15596**: Even after we renamed a cached table, the plan would remain in the cache with the old table name. If I created a new table using the old name then the old table would return incorrect data. Note that this applies only to Hive tables. **SPARK-15635**: Renaming a datasource table would render the table not query-able. This is because we store the location of the table in a "path" property, which was not updated to reflect Hive's change in table location following a rename. ## How was this patch tested? DDLSuite Author: Andrew Or <andrew@databricks.com> Closes #13416 from andrewor14/rename-table.
This commit is contained in:
parent
5b08ee6396
commit
9e2643b21d
|
@ -23,6 +23,7 @@ import java.util.Date
|
|||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.Try
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
|
@ -145,8 +146,38 @@ case class AlterTableRenameCommand(
|
|||
override def run(sparkSession: SparkSession): Seq[Row] = {
|
||||
val catalog = sparkSession.sessionState.catalog
|
||||
DDLUtils.verifyAlterTableType(catalog, oldName, isView)
|
||||
catalog.invalidateTable(oldName)
|
||||
catalog.renameTable(oldName, newName)
|
||||
// If this is a temp view, just rename the view.
|
||||
// Otherwise, if this is a real table, we also need to uncache and invalidate the table.
|
||||
val isTemporary = catalog.isTemporaryTable(oldName)
|
||||
if (isTemporary) {
|
||||
catalog.renameTable(oldName, newName)
|
||||
} else {
|
||||
// If an exception is thrown here we can just assume the table is uncached;
|
||||
// this can happen with Hive tables when the underlying catalog is in-memory.
|
||||
val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
|
||||
if (wasCached) {
|
||||
try {
|
||||
sparkSession.catalog.uncacheTable(oldName.unquotedString)
|
||||
} catch {
|
||||
case NonFatal(e) => log.warn(e.toString, e)
|
||||
}
|
||||
}
|
||||
// For datasource tables, we also need to update the "path" serde property
|
||||
val table = catalog.getTableMetadata(oldName)
|
||||
if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
|
||||
val newPath = catalog.defaultTablePath(newName)
|
||||
val newTable = table.withNewStorage(
|
||||
serdeProperties = table.storage.serdeProperties ++ Map("path" -> newPath))
|
||||
catalog.alterTable(newTable)
|
||||
}
|
||||
// Invalidate the table last, otherwise uncaching the table would load the logical plan
|
||||
// back into the hive metastore cache
|
||||
catalog.invalidateTable(oldName)
|
||||
catalog.renameTable(oldName, newName)
|
||||
if (wasCached) {
|
||||
sparkSession.catalog.cacheTable(newName.unquotedString)
|
||||
}
|
||||
}
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
|
|
|
@ -446,6 +446,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|
|||
}
|
||||
}
|
||||
|
||||
test("alter table: rename cached table") {
|
||||
import testImplicits._
|
||||
sql("CREATE TABLE students (age INT, name STRING) USING parquet")
|
||||
val df = (1 to 2).map { i => (i, i.toString) }.toDF("age", "name")
|
||||
df.write.insertInto("students")
|
||||
spark.catalog.cacheTable("students")
|
||||
assume(spark.table("students").collect().toSeq == df.collect().toSeq, "bad test: wrong data")
|
||||
assume(spark.catalog.isCached("students"), "bad test: table was not cached in the first place")
|
||||
sql("ALTER TABLE students RENAME TO teachers")
|
||||
sql("CREATE TABLE students (age INT, name STRING) USING parquet")
|
||||
// Now we have both students and teachers.
|
||||
// The cached data for the old students table should not be read by the new students table.
|
||||
assert(!spark.catalog.isCached("students"))
|
||||
assert(spark.catalog.isCached("teachers"))
|
||||
assert(spark.table("students").collect().isEmpty)
|
||||
assert(spark.table("teachers").collect().toSeq == df.collect().toSeq)
|
||||
}
|
||||
|
||||
test("rename temporary table - destination table with database name") {
|
||||
withTempTable("tab1") {
|
||||
sql(
|
||||
|
|
Loading…
Reference in a new issue