diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 044f910388..b60319668c 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -114,8 +114,8 @@ statement tableIdentifier partitionSpec? describeColName? #describeTable | REFRESH TABLE tableIdentifier #refreshTable | REFRESH .*? #refreshResource - | CACHE LAZY? TABLE identifier (AS? query)? #cacheTable - | UNCACHE TABLE identifier #uncacheTable + | CACHE LAZY? TABLE tableIdentifier (AS? query)? #cacheTable + | UNCACHE TABLE tableIdentifier #uncacheTable | CLEAR CACHE #clearCache | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE tableIdentifier partitionSpec? #loadData diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a0508ad601..154c25adfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -221,14 +221,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { */ override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) { val query = Option(ctx.query).map(plan) - CacheTableCommand(ctx.identifier.getText, query, ctx.LAZY != null) + val tableIdent = visitTableIdentifier(ctx.tableIdentifier) + if (query.isDefined && tableIdent.database.isDefined) { + val database = tableIdent.database.get + throw new ParseException(s"It is not allowed to add database prefix `$database` to " + + s"the table name in CACHE TABLE AS SELECT", ctx) + } + CacheTableCommand(tableIdent, query, ctx.LAZY != null) } /** * Create an [[UncacheTableCommand]] logical plan. */ override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { - UncacheTableCommand(ctx.identifier.getText) + UncacheTableCommand(visitTableIdentifier(ctx.tableIdentifier)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 5332366d24..697e2ff211 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -18,15 +18,17 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan case class CacheTableCommand( - tableName: String, - plan: Option[LogicalPlan], - isLazy: Boolean) - extends RunnableCommand { + tableIdent: TableIdentifier, + plan: Option[LogicalPlan], + isLazy: Boolean) extends RunnableCommand { + require(plan.isEmpty || tableIdent.database.isEmpty, + "Database name is not allowed in CACHE TABLE AS SELECT") override protected def innerChildren: Seq[QueryPlan[_]] = { plan.toSeq @@ -34,13 +36,13 @@ case class CacheTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { plan.foreach { logicalPlan => - Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName) + Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString) } - sparkSession.catalog.cacheTable(tableName) + sparkSession.catalog.cacheTable(tableIdent.quotedString) if (!isLazy) { // Performs eager caching - sparkSession.table(tableName).count() + sparkSession.table(tableIdent).count() } Seq.empty[Row] @@ -50,10 +52,10 @@ case class CacheTableCommand( } -case class UncacheTableCommand(tableName: String) extends RunnableCommand { +case class UncacheTableCommand(tableIdent: TableIdentifier) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.catalog.uncacheTable(tableName) + sparkSession.catalog.uncacheTable(tableIdent.quotedString) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index b56c200e9e..088f684365 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -79,7 +79,7 @@ case class CreateViewCommand( if (isTemporary && tableDesc.identifier.database.isDefined) { val database = tableDesc.identifier.database.get throw new AnalysisException( - s"It is not allowed to add database prefix ${database} for the TEMPORARY view name.") + s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.") } override def run(sparkSession: SparkSession): Seq[Row] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index d7df18ae1c..6f6abfa93c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -73,11 +73,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } test("cache temp table") { - testData.select('key).createOrReplaceTempView("tempTable") - assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0) - spark.catalog.cacheTable("tempTable") - assertCached(sql("SELECT COUNT(*) FROM tempTable")) - spark.catalog.uncacheTable("tempTable") + withTempTable("tempTable") { + testData.select('key).createOrReplaceTempView("tempTable") + assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0) + spark.catalog.cacheTable("tempTable") + assertCached(sql("SELECT COUNT(*) FROM tempTable")) + spark.catalog.uncacheTable("tempTable") + } } test("unpersist an uncached table will not raise exception") { @@ -95,9 +97,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } test("cache table as select") { - sql("CACHE TABLE tempTable AS SELECT key FROM testData") - assertCached(sql("SELECT COUNT(*) FROM tempTable")) - spark.catalog.uncacheTable("tempTable") + withTempTable("tempTable") { + sql("CACHE TABLE tempTable AS SELECT key FROM testData") + assertCached(sql("SELECT COUNT(*) FROM tempTable")) + spark.catalog.uncacheTable("tempTable") + } } test("uncaching temp table") { @@ -223,32 +227,36 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { - sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") - assertCached(spark.table("testCacheTable")) + withTempTable("testCacheTable") { + sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") + assertCached(spark.table("testCacheTable")) - val rddId = rddIdOf("testCacheTable") - assert( - isMaterialized(rddId), - "Eagerly cached in-memory table should have already been materialized") + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") - spark.catalog.uncacheTable("testCacheTable") - eventually(timeout(10 seconds)) { - assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + spark.catalog.uncacheTable("testCacheTable") + eventually(timeout(10 seconds)) { + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } } } test("CACHE TABLE tableName AS SELECT ...") { - sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10") - assertCached(spark.table("testCacheTable")) + withTempTable("testCacheTable") { + sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10") + assertCached(spark.table("testCacheTable")) - val rddId = rddIdOf("testCacheTable") - assert( - isMaterialized(rddId), - "Eagerly cached in-memory table should have already been materialized") + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") - spark.catalog.uncacheTable("testCacheTable") - eventually(timeout(10 seconds)) { - assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + spark.catalog.uncacheTable("testCacheTable") + eventually(timeout(10 seconds)) { + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 1d1d5e3f7b..b45be0251d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -449,7 +449,7 @@ private[hive] class TestHiveQueryExecution( override lazy val analyzed: LogicalPlan = { val describedTables = logical match { - case CacheTableCommand(tbl, _, _) => tbl :: Nil + case CacheTableCommand(tbl, _, _) => tbl.table :: Nil case _ => Nil } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index e35a71917f..f7c3e347b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -21,6 +21,7 @@ import java.io.File import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -128,29 +129,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { - sql("CACHE TABLE testCacheTable AS SELECT * FROM src") - assertCached(table("testCacheTable")) + withTempTable("testCacheTable") { + sql("CACHE TABLE testCacheTable AS SELECT * FROM src") + assertCached(table("testCacheTable")) - val rddId = rddIdOf("testCacheTable") - assert( - isMaterialized(rddId), - "Eagerly cached in-memory table should have already been materialized") + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") - uncacheTable("testCacheTable") - assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } } test("CACHE TABLE tableName AS SELECT ...") { - sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10") - assertCached(table("testCacheTable")) + withTempTable("testCacheTable") { + sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10") + assertCached(table("testCacheTable")) - val rddId = rddIdOf("testCacheTable") - assert( - isMaterialized(rddId), - "Eagerly cached in-memory table should have already been materialized") + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") - uncacheTable("testCacheTable") - assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } } test("CACHE LAZY TABLE tableName") { @@ -172,9 +177,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } test("CACHE TABLE with Hive UDF") { - sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1") - assertCached(table("udfTest")) - uncacheTable("udfTest") + withTempTable("udfTest") { + sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1") + assertCached(table("udfTest")) + uncacheTable("udfTest") + } } test("REFRESH TABLE also needs to recache the data (data source tables)") { @@ -267,6 +274,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto Utils.deleteRecursively(tempPath) } + test("Cache/Uncache Qualified Tables") { + withTempDatabase { db => + withTempTable("cachedTable") { + sql(s"CREATE TABLE $db.cachedTable STORED AS PARQUET AS SELECT 1") + sql(s"CACHE TABLE $db.cachedTable") + assertCached(spark.table(s"$db.cachedTable")) + + activateDatabase(db) { + assertCached(spark.table("cachedTable")) + sql("UNCACHE TABLE cachedTable") + assert(!spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should not be cached") + sql(s"CACHE TABLE cachedTable") + assert(spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should be cached") + } + + sql(s"UNCACHE TABLE $db.cachedTable") + assert(!spark.catalog.isCached(s"$db.cachedTable"), + "Table 'cachedTable' should not be cached") + } + } + } + + test("Cache Table As Select - having database name") { + withTempDatabase { db => + withTempTable("cachedTable") { + val e = intercept[ParseException] { + sql(s"CACHE TABLE $db.cachedTable AS SELECT 1") + }.getMessage + assert(e.contains("It is not allowed to add database prefix ") && + e.contains("to the table name in CACHE TABLE AS SELECT")) + } + } + } + test("SPARK-11246 cache parquet table") { sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")