From 6790908b119484be88dec47bd833bd78e27ec516 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Thu, 2 Aug 2012 12:05:05 -0700 Subject: [PATCH 1/2] Use maxMemory to better estimate memory available for BlockManager cache --- core/src/main/scala/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 5067601198..cde74e5805 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -580,6 +580,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m object BlockManager { def getMaxMemoryFromSystemProperties(): Long = { val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble - (Runtime.getRuntime.totalMemory * memoryFraction).toLong + (Runtime.getRuntime.maxMemory * memoryFraction).toLong } } From 1a07bb9ba42df39260db1c3a222433fb756fe036 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Thu, 2 Aug 2012 12:22:33 -0700 Subject: [PATCH 2/2] Avoid an extra partition copy by passing an iterator to blockManager.put --- core/src/main/scala/spark/CacheTracker.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 19870408d3..22110832f8 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -222,11 +222,16 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl // TODO: also register a listener for when it unloads logInfo("Computing partition " + split) try { - val values = new ArrayBuffer[Any] - values ++= rdd.compute(split) - blockManager.put(key, values.iterator, storageLevel, false) + // BlockManager will iterate over results from compute to create RDD + blockManager.put(key, rdd.compute(split), storageLevel, false) //future.apply() // Wait for the reply from the cache tracker - return values.iterator.asInstanceOf[Iterator[T]] + blockManager.get(key) match { + case Some(values) => + return values.asInstanceOf[Iterator[T]] + case None => + logWarning("loading partition failed after computing it " + key) + return null + } } finally { loading.synchronized { loading.remove(key)