spark-instrumented-optimizer/core/src/main/scala/spark/Cache.scala

64 lines
2.6 KiB
Scala
Raw Normal View History

package spark
import java.util.concurrent.atomic.AtomicInteger
2012-10-02 22:00:19 -04:00
private[spark] sealed trait CachePutResponse
private[spark] case class CachePutSuccess(size: Long) extends CachePutResponse
private[spark] case class CachePutFailure() extends CachePutResponse
/**
2012-02-10 11:19:53 -05:00
* An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
* both partitions of cached RDDs and broadcast variables on Spark executors. Caches are also aware
* of which entries are part of the same dataset (for example, partitions in the same RDD). The key
* for each value in a cache is a (datasetID, partition) pair.
*
2012-02-10 11:19:53 -05:00
* A single Cache instance gets created on each machine and is shared by all caches (i.e. both the
* RDD split cache and the broadcast variable cache), to enable global replacement policies.
* However, because these several independent modules all perform caching, it is important to give
* them separate key namespaces, so that an RDD and a broadcast variable (for example) do not use
* the same key. For this purpose, Cache has the notion of KeySpaces. Each client module must first
* ask for a KeySpace, and then call get() and put() on that space using its own keys.
*
* This abstract class handles the creation of key spaces, so that subclasses need only deal with
* keys that are unique across modules.
*/
2012-10-02 22:00:19 -04:00
private[spark] abstract class Cache {
private val nextKeySpaceId = new AtomicInteger(0)
private def newKeySpaceId() = nextKeySpaceId.getAndIncrement()
def newKeySpace() = new KeySpace(this, newKeySpaceId())
/**
* Get the value for a given (datasetId, partition), or null if it is not
* found.
*/
def get(datasetId: Any, partition: Int): Any
/**
* Attempt to put a value in the cache; returns CachePutFailure if this was
* not successful (e.g. because the cache replacement policy forbids it), and
* CachePutSuccess if successful. If size estimation is available, the cache
* implementation should set the size field in CachePutSuccess.
*/
def put(datasetId: Any, partition: Int, value: Any): CachePutResponse
/**
* Report the capacity of the cache partition. By default this just reports
* zero. Specific implementations can choose to provide the capacity number.
*/
def getCapacity: Long = 0L
}
/**
* A key namespace in a Cache.
*/
2012-10-02 22:00:19 -04:00
private[spark] class KeySpace(cache: Cache, val keySpaceId: Int) {
def get(datasetId: Any, partition: Int): Any =
cache.get((keySpaceId, datasetId), partition)
def put(datasetId: Any, partition: Int, value: Any): CachePutResponse =
cache.put((keySpaceId, datasetId), partition, value)
def getCapacity: Long = cache.getCapacity
}