[SPARK-20495][SQL][CORE] Add StorageLevel to cacheTable API
## What changes were proposed in this pull request? Currently cacheTable API only supports MEMORY_AND_DISK. This PR adds additional API to take different storage levels. ## How was this patch tested? unit tests Author: madhu <phatak.dev@gmail.com> Closes #17802 from phatak-dev/cacheTableAPI.
This commit is contained in:
parent
5773ab121d
commit
9064f1b044
|
@ -36,6 +36,8 @@ object MimaExcludes {
|
|||
|
||||
// Exclude rules for 2.3.x
|
||||
lazy val v23excludes = v22excludes ++ Seq(
|
||||
// [SPARK-20495][SQL] Add StorageLevel to cacheTable API
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable")
|
||||
)
|
||||
|
||||
// Exclude rules for 2.2.x
|
||||
|
|
|
@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
|
|||
import org.apache.spark.annotation.{Experimental, InterfaceStability}
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
/**
|
||||
* Catalog interface for Spark. To access this, use `SparkSession.catalog`.
|
||||
|
@ -476,6 +476,18 @@ abstract class Catalog {
|
|||
*/
|
||||
def cacheTable(tableName: String): Unit
|
||||
|
||||
/**
|
||||
* Caches the specified table with the given storage level.
|
||||
*
|
||||
* @param tableName is either a qualified or unqualified name that designates a table/view.
|
||||
* If no database identifier is provided, it refers to a temporary view or
|
||||
* a table/view in the current database.
|
||||
* @param storageLevel storage level to cache table.
|
||||
* @since 2.3.0
|
||||
*/
|
||||
def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
|
||||
|
||||
|
||||
/**
|
||||
* Removes the specified table from the in-memory cache.
|
||||
*
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
|
|||
import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
|
||||
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
@ -419,6 +421,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
|
|||
sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName))
|
||||
}
|
||||
|
||||
/**
|
||||
* Caches the specified table or view with the given storage level.
|
||||
*
|
||||
* @group cachemgmt
|
||||
* @since 2.3.0
|
||||
*/
|
||||
override def cacheTable(tableName: String, storageLevel: StorageLevel): Unit = {
|
||||
sparkSession.sharedState.cacheManager.cacheQuery(
|
||||
sparkSession.table(tableName), Some(tableName), storageLevel)
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the specified table or view from the in-memory cache.
|
||||
*
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
|
|||
import org.apache.spark.sql.catalyst.plans.logical.Range
|
||||
import org.apache.spark.sql.test.SharedSQLContext
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
|
||||
/**
|
||||
|
@ -535,4 +536,11 @@ class CatalogSuite
|
|||
.createTempView("fork_table", Range(1, 2, 3, 4), overrideIfExists = true)
|
||||
assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
|
||||
}
|
||||
|
||||
test("cacheTable with storage level") {
|
||||
createTempTable("my_temp_table")
|
||||
spark.catalog.cacheTable("my_temp_table", StorageLevel.DISK_ONLY)
|
||||
assert(spark.table("my_temp_table").storageLevel == StorageLevel.DISK_ONLY)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue