[SPARK-15862][SQL] Better Error Message When Having Database Name in CACHE TABLE AS SELECT

#### What changes were proposed in this pull request?
~~If the temp table already exists, we should not silently replace it when doing `CACHE TABLE AS SELECT`. This is inconsistent with the behavior of `CREAT VIEW` or `CREATE TABLE`. This PR is to fix this silent drop.~~

~~Maybe, we also can introduce new syntax for replacing the existing one. For example, in Hive, to replace a view, the syntax should be like `ALTER VIEW AS SELECT` or `CREATE OR REPLACE VIEW AS SELECT`~~

The table name in `CACHE TABLE AS SELECT` should NOT contain database prefix like "database.table". Thus, this PR captures this in Parser and outputs a better error message, instead of reporting the view already exists.

In addition, refactoring the `Parser` to generate table identifiers instead of returning the table name string.

#### How was this patch tested?
- Added a test case for caching and uncaching qualified table names
- Fixed a few test cases that do not drop temp table at the end
- Added the related test case for the issue resolved in this PR

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #13572 from gatorsmile/cacheTableAsSelect.
This commit is contained in:
gatorsmile 2016-06-16 10:01:59 -07:00 committed by Cheng Lian
parent 7c6c692637
commit 6451cf9270
7 changed files with 117 additions and 60 deletions

View file

@ -114,8 +114,8 @@ statement
tableIdentifier partitionSpec? describeColName? #describeTable tableIdentifier partitionSpec? describeColName? #describeTable
| REFRESH TABLE tableIdentifier #refreshTable | REFRESH TABLE tableIdentifier #refreshTable
| REFRESH .*? #refreshResource | REFRESH .*? #refreshResource
| CACHE LAZY? TABLE identifier (AS? query)? #cacheTable | CACHE LAZY? TABLE tableIdentifier (AS? query)? #cacheTable
| UNCACHE TABLE identifier #uncacheTable | UNCACHE TABLE tableIdentifier #uncacheTable
| CLEAR CACHE #clearCache | CLEAR CACHE #clearCache
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
tableIdentifier partitionSpec? #loadData tableIdentifier partitionSpec? #loadData

View file

@ -221,14 +221,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*/ */
override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) { override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) {
val query = Option(ctx.query).map(plan) val query = Option(ctx.query).map(plan)
CacheTableCommand(ctx.identifier.getText, query, ctx.LAZY != null) val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
if (query.isDefined && tableIdent.database.isDefined) {
val database = tableIdent.database.get
throw new ParseException(s"It is not allowed to add database prefix `$database` to " +
s"the table name in CACHE TABLE AS SELECT", ctx)
}
CacheTableCommand(tableIdent, query, ctx.LAZY != null)
} }
/** /**
* Create an [[UncacheTableCommand]] logical plan. * Create an [[UncacheTableCommand]] logical plan.
*/ */
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
UncacheTableCommand(ctx.identifier.getText) UncacheTableCommand(visitTableIdentifier(ctx.tableIdentifier))
} }
/** /**

View file

@ -18,15 +18,17 @@
package org.apache.spark.sql.execution.command package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
case class CacheTableCommand( case class CacheTableCommand(
tableName: String, tableIdent: TableIdentifier,
plan: Option[LogicalPlan], plan: Option[LogicalPlan],
isLazy: Boolean) isLazy: Boolean) extends RunnableCommand {
extends RunnableCommand { require(plan.isEmpty || tableIdent.database.isEmpty,
"Database name is not allowed in CACHE TABLE AS SELECT")
override protected def innerChildren: Seq[QueryPlan[_]] = { override protected def innerChildren: Seq[QueryPlan[_]] = {
plan.toSeq plan.toSeq
@ -34,13 +36,13 @@ case class CacheTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = { override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan => plan.foreach { logicalPlan =>
Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName) Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
} }
sparkSession.catalog.cacheTable(tableName) sparkSession.catalog.cacheTable(tableIdent.quotedString)
if (!isLazy) { if (!isLazy) {
// Performs eager caching // Performs eager caching
sparkSession.table(tableName).count() sparkSession.table(tableIdent).count()
} }
Seq.empty[Row] Seq.empty[Row]
@ -50,10 +52,10 @@ case class CacheTableCommand(
} }
case class UncacheTableCommand(tableName: String) extends RunnableCommand { case class UncacheTableCommand(tableIdent: TableIdentifier) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = { override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.catalog.uncacheTable(tableName) sparkSession.catalog.uncacheTable(tableIdent.quotedString)
Seq.empty[Row] Seq.empty[Row]
} }

View file

@ -79,7 +79,7 @@ case class CreateViewCommand(
if (isTemporary && tableDesc.identifier.database.isDefined) { if (isTemporary && tableDesc.identifier.database.isDefined) {
val database = tableDesc.identifier.database.get val database = tableDesc.identifier.database.get
throw new AnalysisException( throw new AnalysisException(
s"It is not allowed to add database prefix ${database} for the TEMPORARY view name.") s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.")
} }
override def run(sparkSession: SparkSession): Seq[Row] = { override def run(sparkSession: SparkSession): Seq[Row] = {

View file

@ -73,11 +73,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
} }
test("cache temp table") { test("cache temp table") {
testData.select('key).createOrReplaceTempView("tempTable") withTempTable("tempTable") {
assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0) testData.select('key).createOrReplaceTempView("tempTable")
spark.catalog.cacheTable("tempTable") assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
assertCached(sql("SELECT COUNT(*) FROM tempTable")) spark.catalog.cacheTable("tempTable")
spark.catalog.uncacheTable("tempTable") assertCached(sql("SELECT COUNT(*) FROM tempTable"))
spark.catalog.uncacheTable("tempTable")
}
} }
test("unpersist an uncached table will not raise exception") { test("unpersist an uncached table will not raise exception") {
@ -95,9 +97,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
} }
test("cache table as select") { test("cache table as select") {
sql("CACHE TABLE tempTable AS SELECT key FROM testData") withTempTable("tempTable") {
assertCached(sql("SELECT COUNT(*) FROM tempTable")) sql("CACHE TABLE tempTable AS SELECT key FROM testData")
spark.catalog.uncacheTable("tempTable") assertCached(sql("SELECT COUNT(*) FROM tempTable"))
spark.catalog.uncacheTable("tempTable")
}
} }
test("uncaching temp table") { test("uncaching temp table") {
@ -223,32 +227,36 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
} }
test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") withTempTable("testCacheTable") {
assertCached(spark.table("testCacheTable")) sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
assertCached(spark.table("testCacheTable"))
val rddId = rddIdOf("testCacheTable") val rddId = rddIdOf("testCacheTable")
assert( assert(
isMaterialized(rddId), isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized") "Eagerly cached in-memory table should have already been materialized")
spark.catalog.uncacheTable("testCacheTable") spark.catalog.uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) { eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
} }
} }
test("CACHE TABLE tableName AS SELECT ...") { test("CACHE TABLE tableName AS SELECT ...") {
sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10") withTempTable("testCacheTable") {
assertCached(spark.table("testCacheTable")) sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
assertCached(spark.table("testCacheTable"))
val rddId = rddIdOf("testCacheTable") val rddId = rddIdOf("testCacheTable")
assert( assert(
isMaterialized(rddId), isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized") "Eagerly cached in-memory table should have already been materialized")
spark.catalog.uncacheTable("testCacheTable") spark.catalog.uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) { eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
} }
} }

View file

@ -449,7 +449,7 @@ private[hive] class TestHiveQueryExecution(
override lazy val analyzed: LogicalPlan = { override lazy val analyzed: LogicalPlan = {
val describedTables = logical match { val describedTables = logical match {
case CacheTableCommand(tbl, _, _) => tbl :: Nil case CacheTableCommand(tbl, _, _) => tbl.table :: Nil
case _ => Nil case _ => Nil
} }

View file

@ -21,6 +21,7 @@ import java.io.File
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.test.SQLTestUtils
@ -128,29 +129,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
} }
test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
sql("CACHE TABLE testCacheTable AS SELECT * FROM src") withTempTable("testCacheTable") {
assertCached(table("testCacheTable")) sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
assertCached(table("testCacheTable"))
val rddId = rddIdOf("testCacheTable") val rddId = rddIdOf("testCacheTable")
assert( assert(
isMaterialized(rddId), isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized") "Eagerly cached in-memory table should have already been materialized")
uncacheTable("testCacheTable") uncacheTable("testCacheTable")
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
} }
test("CACHE TABLE tableName AS SELECT ...") { test("CACHE TABLE tableName AS SELECT ...") {
sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10") withTempTable("testCacheTable") {
assertCached(table("testCacheTable")) sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10")
assertCached(table("testCacheTable"))
val rddId = rddIdOf("testCacheTable") val rddId = rddIdOf("testCacheTable")
assert( assert(
isMaterialized(rddId), isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized") "Eagerly cached in-memory table should have already been materialized")
uncacheTable("testCacheTable") uncacheTable("testCacheTable")
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
} }
test("CACHE LAZY TABLE tableName") { test("CACHE LAZY TABLE tableName") {
@ -172,9 +177,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
} }
test("CACHE TABLE with Hive UDF") { test("CACHE TABLE with Hive UDF") {
sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1") withTempTable("udfTest") {
assertCached(table("udfTest")) sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1")
uncacheTable("udfTest") assertCached(table("udfTest"))
uncacheTable("udfTest")
}
} }
test("REFRESH TABLE also needs to recache the data (data source tables)") { test("REFRESH TABLE also needs to recache the data (data source tables)") {
@ -267,6 +274,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
Utils.deleteRecursively(tempPath) Utils.deleteRecursively(tempPath)
} }
test("Cache/Uncache Qualified Tables") {
withTempDatabase { db =>
withTempTable("cachedTable") {
sql(s"CREATE TABLE $db.cachedTable STORED AS PARQUET AS SELECT 1")
sql(s"CACHE TABLE $db.cachedTable")
assertCached(spark.table(s"$db.cachedTable"))
activateDatabase(db) {
assertCached(spark.table("cachedTable"))
sql("UNCACHE TABLE cachedTable")
assert(!spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should not be cached")
sql(s"CACHE TABLE cachedTable")
assert(spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should be cached")
}
sql(s"UNCACHE TABLE $db.cachedTable")
assert(!spark.catalog.isCached(s"$db.cachedTable"),
"Table 'cachedTable' should not be cached")
}
}
}
test("Cache Table As Select - having database name") {
withTempDatabase { db =>
withTempTable("cachedTable") {
val e = intercept[ParseException] {
sql(s"CACHE TABLE $db.cachedTable AS SELECT 1")
}.getMessage
assert(e.contains("It is not allowed to add database prefix ") &&
e.contains("to the table name in CACHE TABLE AS SELECT"))
}
}
}
test("SPARK-11246 cache parquet table") { test("SPARK-11246 cache parquet table") {
sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1") sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")