Merge pull request #162 from shivaram/dev
Use maxMemory to better estimate memory available for BlockManager cache
This commit is contained in:
commit
b8fe672399
|
@ -222,11 +222,16 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
|
|||
// TODO: also register a listener for when it unloads
|
||||
logInfo("Computing partition " + split)
|
||||
try {
|
||||
val values = new ArrayBuffer[Any]
|
||||
values ++= rdd.compute(split)
|
||||
blockManager.put(key, values.iterator, storageLevel, false)
|
||||
// BlockManager will iterate over results from compute to create RDD
|
||||
blockManager.put(key, rdd.compute(split), storageLevel, false)
|
||||
//future.apply() // Wait for the reply from the cache tracker
|
||||
return values.iterator.asInstanceOf[Iterator[T]]
|
||||
blockManager.get(key) match {
|
||||
case Some(values) =>
|
||||
return values.asInstanceOf[Iterator[T]]
|
||||
case None =>
|
||||
logWarning("loading partition failed after computing it " + key)
|
||||
return null
|
||||
}
|
||||
} finally {
|
||||
loading.synchronized {
|
||||
loading.remove(key)
|
||||
|
|
|
@ -580,6 +580,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
object BlockManager {
|
||||
def getMaxMemoryFromSystemProperties(): Long = {
|
||||
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
|
||||
(Runtime.getRuntime.totalMemory * memoryFraction).toLong
|
||||
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue