[SPARK-2189][SQL] Adds dropTempTable API

This PR adds an API for unregistering temporary tables. If a temporary table has been cached before, it's unpersisted as well.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #3039 from liancheng/unregister-temp-table and squashes the following commits:

54ae99f [Cheng Lian] Fixes Scala styling issue
1948c14 [Cheng Lian] Removes the unpersist argument
aca41d3 [Cheng Lian] Ensures thread safety
7d4fb2b [Cheng Lian] Adds unregisterTempTable API
This commit is contained in:
Cheng Lian 2014-11-02 16:00:24 -08:00 committed by Michael Armbrust
parent 06232d23ff
commit 9081b9f9f7
3 changed files with 46 additions and 0 deletions

View file

@ -103,6 +103,19 @@ private[sql] trait CacheManager {
cachedData.remove(dataIndex)
}
/** Tries to remove the data for the given SchemaRDD from the cache if it's cached */
private[sql] def tryUncacheQuery(
query: SchemaRDD,
blocking: Boolean = true): Boolean = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
val found = dataIndex >= 0
if (found) {
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
cachedData.remove(dataIndex)
}
found
}
/** Optionally returns cached data for the given SchemaRDD */
private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock {

View file

@ -276,6 +276,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
catalog.registerTable(None, tableName, rdd.queryExecution.logical)
}
/**
* Drops the temporary table with the given table name in the catalog. If the table has been
* cached/persisted before, it's also unpersisted.
*
* @param tableName the name of the table to be unregistered.
*
* @group userf
*/
def dropTempTable(tableName: String): Unit = {
tryUncacheQuery(table(tableName))
catalog.unregisterTable(None, tableName)
}
/**
* Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is
* used for SQL parsing can be configured with 'spark.sql.dialect'.

View file

@ -231,4 +231,24 @@ class CachedTableSuite extends QueryTest {
assert(cached.statistics.sizeInBytes === actualSizeInBytes)
}
}
test("Drops temporary table") {
testData.select('key).registerTempTable("t1")
table("t1")
dropTempTable("t1")
assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
}
test("Drops cached temporary table") {
testData.select('key).registerTempTable("t1")
testData.select('key).registerTempTable("t2")
cacheTable("t1")
assert(isCached("t1"))
assert(isCached("t2"))
dropTempTable("t1")
assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
assert(!isCached("t2"))
}
}