From e46d547ccd43c0fb3a79a30a7c43a78afba6f93f Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Tue, 30 Apr 2013 16:15:56 +0530 Subject: [PATCH 01/16] Fix issues reported by Reynold --- .../spark/network/ConnectionManager.scala | 64 +++++++++++++++---- run | 7 +- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 0c6bdb1559..a79fce8697 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } ) } + // MUST be called within selector loop - else deadlock. + private def triggerForceCloseByException(key: SelectionKey, e: Exception) { + try { + key.interestOps(0) + } catch { + // ignore exceptions + case e: Exception => logDebug("Ignoring exception", e) + } + + val conn = connectionsByKey.getOrElse(key, null) + if (conn == null) return + + // Pushing to connect threadpool + handleConnectExecutor.execute(new Runnable { + override def run() { + try { + conn.callOnExceptionCallback(e) + } catch { + // ignore exceptions + case e: Exception => logDebug("Ignoring exception", e) + } + try { + conn.close() + } catch { + // ignore exceptions + case e: Exception => logDebug("Ignoring exception", e) + } + } + }) + } + + def run() { try { while(!selectorThread.isInterrupted) { @@ -235,18 +267,26 @@ private[spark] class ConnectionManager(port: Int) extends Logging { while (selectedKeys.hasNext()) { val key = selectedKeys.next selectedKeys.remove() - if (key.isValid) { - if (key.isAcceptable) { - acceptConnection(key) - } else - if (key.isConnectable) { - triggerConnect(key) - } else - if (key.isReadable) { - triggerRead(key) - } else - if (key.isWritable) { - triggerWrite(key) + try { + if (key.isValid) { + if (key.isAcceptable) { + acceptConnection(key) + } else + if (key.isConnectable) { + triggerConnect(key) + } else + if (key.isReadable) { + triggerRead(key) + } else + if (key.isWritable) { + triggerWrite(key) + } + } + } catch { + // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException. + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) } } } diff --git a/run b/run index 756f8703f2..0a58ac4a36 100755 --- a/run +++ b/run @@ -95,6 +95,7 @@ export JAVA_OPTS CORE_DIR="$FWDIR/core" REPL_DIR="$FWDIR/repl" +REPL_BIN_DIR="$FWDIR/repl-bin" EXAMPLES_DIR="$FWDIR/examples" BAGEL_DIR="$FWDIR/bagel" STREAMING_DIR="$FWDIR/streaming" @@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then CLASSPATH+=":$FWDIR/lib_managed/bundles/*" fi CLASSPATH+=":$REPL_DIR/lib/*" -if [ -e repl-bin/target ]; then - for jar in `find "repl-bin/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do +if [ -e $REPL_BIN_DIR/target ]; then + for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do CLASSPATH+=":$jar" done fi @@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do CLASSPATH+=":$jar" done -export CLASSPATH # Needed for spark-shell # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack # to avoid the -sources and -doc packages that are built by publish-local. @@ -163,4 +163,5 @@ else EXTRA_ARGS="$JAVA_OPTS" fi +export CLASSPATH # Needed for spark-shell exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" From 48854e1dbf1d02e1e19f59d0aee0e281d41b3b45 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Tue, 30 Apr 2013 23:59:33 +0530 Subject: [PATCH 02/16] If key is not valid, close connection --- .gitignore | 2 -- core/src/main/scala/spark/network/ConnectionManager.scala | 3 +++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 155e785b01..5bb2f33574 100644 --- a/.gitignore +++ b/.gitignore @@ -29,8 +29,6 @@ project/build/target/ project/plugins/target/ project/plugins/lib_managed/ project/plugins/src_managed/ -logs/ -log/ spark-tests.log streaming-tests.log dependency-reduced-pom.xml diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index a79fce8697..2d9b4be4b3 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -281,6 +281,9 @@ private[spark] class ConnectionManager(port: Int) extends Logging { if (key.isWritable) { triggerWrite(key) } + } else { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() } } catch { // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException. From 538614acfe95b0c064679122af3bc990b669e4e0 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 1 May 2013 00:05:32 +0530 Subject: [PATCH 03/16] Be more aggressive and defensive in select also --- .../spark/network/ConnectionManager.scala | 83 ++++++++++++------- 1 file changed, 55 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 2d9b4be4b3..9b00fddd40 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -254,7 +254,32 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } } - val selectedKeysCount = selector.select() + val selectedKeysCount = + try { + selector.select() + } catch { + case e: CancelledKeyException => { + // Some keys within the selectors list are invalid/closed. clear them. + val allKeys = selector.keys().iterator() + + while (allKeys.hasNext()) { + val key = allKeys.next() + try { + if (! key.isValid) { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() + } + } catch { + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) + } + } + } + } + 0 + } + if (selectedKeysCount == 0) { logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys") } @@ -262,34 +287,36 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logInfo("Selector thread was interrupted!") return } - - val selectedKeys = selector.selectedKeys().iterator() - while (selectedKeys.hasNext()) { - val key = selectedKeys.next - selectedKeys.remove() - try { - if (key.isValid) { - if (key.isAcceptable) { - acceptConnection(key) - } else - if (key.isConnectable) { - triggerConnect(key) - } else - if (key.isReadable) { - triggerRead(key) - } else - if (key.isWritable) { - triggerWrite(key) + + if (0 != selectedKeysCount) { + val selectedKeys = selector.selectedKeys().iterator() + while (selectedKeys.hasNext()) { + val key = selectedKeys.next + selectedKeys.remove() + try { + if (key.isValid) { + if (key.isAcceptable) { + acceptConnection(key) + } else + if (key.isConnectable) { + triggerConnect(key) + } else + if (key.isReadable) { + triggerRead(key) + } else + if (key.isWritable) { + triggerWrite(key) + } + } else { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() + } + } catch { + // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException. + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) } - } else { - logInfo("Key not valid ? " + key) - throw new CancelledKeyException() - } - } catch { - // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException. - case e: CancelledKeyException => { - logInfo("key already cancelled ? " + key, e) - triggerForceCloseByException(key, e) } } } From 0f45477be16254971763cbc07feac7460cffd0bd Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 1 May 2013 00:10:02 +0530 Subject: [PATCH 04/16] Change indentation --- .../spark/network/ConnectionManager.scala | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 9b00fddd40..925d076951 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -255,30 +255,30 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } val selectedKeysCount = - try { - selector.select() - } catch { - case e: CancelledKeyException => { - // Some keys within the selectors list are invalid/closed. clear them. - val allKeys = selector.keys().iterator() + try { + selector.select() + } catch { + case e: CancelledKeyException => { + // Some keys within the selectors list are invalid/closed. clear them. + val allKeys = selector.keys().iterator() - while (allKeys.hasNext()) { - val key = allKeys.next() - try { - if (! key.isValid) { - logInfo("Key not valid ? " + key) - throw new CancelledKeyException() - } - } catch { - case e: CancelledKeyException => { - logInfo("key already cancelled ? " + key, e) - triggerForceCloseByException(key, e) + while (allKeys.hasNext()) { + val key = allKeys.next() + try { + if (! key.isValid) { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() + } + } catch { + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) + } } } } + 0 } - 0 - } if (selectedKeysCount == 0) { logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys") From 3b748ced2258246bd9b7c250363645cea27cf622 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 1 May 2013 00:30:30 +0530 Subject: [PATCH 05/16] Be more aggressive and defensive in all uses of SelectionKey in select loop --- .../spark/network/ConnectionManager.scala | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 925d076951..03926a6038 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -232,24 +232,37 @@ private[spark] class ConnectionManager(port: Int) extends Logging { while(!keyInterestChangeRequests.isEmpty) { val (key, ops) = keyInterestChangeRequests.dequeue - val connection = connectionsByKey.getOrElse(key, null) - if (connection != null) { - val lastOps = key.interestOps() - key.interestOps(ops) - // hot loop - prevent materialization of string if trace not enabled. - if (isTraceEnabled()) { - def intToOpStr(op: Int): String = { - val opStrs = ArrayBuffer[String]() - if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ" - if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE" - if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT" - if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT" - if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " " + try { + if (key.isValid) { + val connection = connectionsByKey.getOrElse(key, null) + if (connection != null) { + val lastOps = key.interestOps() + key.interestOps(ops) + + // hot loop - prevent materialization of string if trace not enabled. + if (isTraceEnabled()) { + def intToOpStr(op: Int): String = { + val opStrs = ArrayBuffer[String]() + if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ" + if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE" + if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT" + if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT" + if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " " + } + + logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() + + "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]") + } } - - logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() + - "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]") + } else { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() + } + } catch { + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) } } } From c446ac31d7065d227f168a7f27010bdf98ef7ad1 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 1 May 2013 00:32:30 +0530 Subject: [PATCH 06/16] Spurious commit, reverting gitignore change --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 5bb2f33574..155e785b01 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,8 @@ project/build/target/ project/plugins/target/ project/plugins/lib_managed/ project/plugins/src_managed/ +logs/ +log/ spark-tests.log streaming-tests.log dependency-reduced-pom.xml From 60cabb35cbfd2af0e5ba34c4a416aa2640091acc Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 1 May 2013 01:17:14 +0530 Subject: [PATCH 07/16] Add addition catch block for exception too --- .../scala/spark/network/ConnectionManager.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 03926a6038..0eb03630d0 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -264,6 +264,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logInfo("key already cancelled ? " + key, e) triggerForceCloseByException(key, e) } + case e: Exception => { + logError("Exception processing key " + key, e) + triggerForceCloseByException(key, e) + } } } @@ -271,6 +275,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { try { selector.select() } catch { + // Explicitly only dealing with CancelledKeyException here since other exceptions should be dealt with differently. case e: CancelledKeyException => { // Some keys within the selectors list are invalid/closed. clear them. val allKeys = selector.keys().iterator() @@ -287,6 +292,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logInfo("key already cancelled ? " + key, e) triggerForceCloseByException(key, e) } + case e: Exception => { + logError("Exception processing key " + key, e) + triggerForceCloseByException(key, e) + } } } } @@ -330,6 +339,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logInfo("key already cancelled ? " + key, e) triggerForceCloseByException(key, e) } + case e: Exception => { + logError("Exception processing key " + key, e) + triggerForceCloseByException(key, e) + } } } } From 848156273178bed5763bcbc91baa788bd4a57f6e Mon Sep 17 00:00:00 2001 From: harshars Date: Mon, 25 Mar 2013 20:09:07 -0700 Subject: [PATCH 08/16] Merged Ram's commit on removing RDDs. Conflicts: core/src/main/scala/spark/SparkContext.scala --- core/src/main/scala/spark/SparkContext.scala | 62 ++++++++++++------- .../test/scala/spark/DistributedSuite.scala | 12 ++++ core/src/test/scala/spark/RDDSuite.scala | 7 +++ 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 5f5ec0b0f4..8bee1d65a2 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -1,47 +1,50 @@ package spark import java.io._ -import java.util.concurrent.atomic.AtomicInteger import java.net.URI +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ +import scala.collection.mutable.{ConcurrentMap, HashMap} + +import akka.actor.Actor._ -import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.InputFormat -import org.apache.hadoop.mapred.SequenceFileInputFormat -import org.apache.hadoop.io.Writable -import org.apache.hadoop.io.IntWritable -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.FloatWritable -import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.ArrayWritable import org.apache.hadoop.io.BooleanWritable import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.ArrayWritable +import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.io.FloatWritable +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileInputFormat +import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.SequenceFileInputFormat import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} + import org.apache.mesos.MesosNativeLibrary -import spark.deploy.{SparkHadoopUtil, LocalSparkCluster} -import spark.partial.ApproximateEvaluator -import spark.partial.PartialResult +import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} -import spark.scheduler._ +import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} +import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} import spark.scheduler.local.LocalScheduler -import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import spark.storage.BlockManagerUI +import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo} import spark.util.{MetadataCleaner, TimeStampedHashMap} -import spark.storage.{StorageStatus, StorageUtils, RDDInfo} + /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -97,7 +100,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() + private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]]() private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) @@ -520,6 +523,21 @@ class SparkContext( env.blockManager.master.getStorageStatus } + def removeRDD(id: Int): Unit = { + val storageStatusList = getExecutorStorageStatus + val groupedRddBlocks = storageStatusList.flatMap(_.blocks).toMap + logInfo("RDD to remove: " + id) + groupedRddBlocks.foreach(x => { + val k = x._1.substring(0,x._1.lastIndexOf('_')) + val rdd_id = "rdd_" + id + logInfo("RDD to check: " + rdd_id) + if(k.equals(rdd_id)) { + env.blockManager.master.removeBlock(x._1) + } + }) + persistentRdds.remove(id) + } + /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. @@ -743,7 +761,7 @@ class SparkContext( /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ private[spark] def cleanup(cleanupTime: Long) { - persistentRdds.clearOldValues(cleanupTime) + // do nothing. this needs to be removed. } } diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index c9b4707def..c7f6ab3133 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -252,6 +252,18 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(data2.count === 2) } } + + test("remove RDDs cleanly") { + DistributedSuite.amMaster = true + sc = new SparkContext("local-cluster[3,1,512]", "test") + val data = sc.parallelize(Seq(true, false, false, false), 4) + data.persist(StorageLevel.MEMORY_ONLY_2) + data.count + sc.removeRDD(data.id) + assert(sc.persistentRdds.isEmpty == true) + assert(sc.getRDDStorageInfo.isEmpty == true) + + } } object DistributedSuite { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 7fbdd44340..88b7ab9f52 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -100,6 +100,13 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } + test("remove RDD") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1,2,3,4), 2).cache() + sc.removeRDD(rdd.id) + assert(sc.persistentRdds.empty == true) + } + test("caching with failures") { sc = new SparkContext("local", "test") val onlySplit = new Partition { override def index: Int = 0 } From 3227ec8edde05cff27c1f9de8861d18b3cda1aae Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 16:07:44 -0700 Subject: [PATCH 09/16] Cleaned up Ram's code. Moved SparkContext.remove to RDD.unpersist. Also updated unit tests to make sure they are properly testing for concurrency. --- core/src/main/scala/spark/RDD.scala | 17 +++++++++++ core/src/main/scala/spark/SparkContext.scala | 25 ++++------------ .../scala/spark/storage/BlockManagerUI.scala | 4 +-- .../test/scala/spark/DistributedSuite.scala | 30 ++++++++++++++----- core/src/test/scala/spark/RDDSuite.scala | 27 +++++++++++++---- 5 files changed, 68 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 09e52ebf3e..c77f9915c0 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -140,6 +140,23 @@ abstract class RDD[T: ClassManifest]( /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): RDD[T] = persist() + /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */ + def unpersist(): RDD[T] = { + logInfo("Removing RDD " + id + " from persistence list") + val rddBlockPrefix = "rdd_" + id + "_" + // Get the list of blocks in block manager, and remove ones that are part of this RDD. + // The runtime complexity is linear to the number of blocks persisted in the cluster. + // It could be expensive if the cluster is large and has a lot of blocks persisted. + sc.getExecutorStorageStatus().flatMap(_.blocks).foreach { case(blockId, status) => + if (blockId.startsWith(rddBlockPrefix)) { + sc.env.blockManager.master.removeBlock(blockId) + } + } + sc.persistentRdds.remove(id) + storageLevel = StorageLevel.NONE + this + } + /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 8bee1d65a2..b686c595b8 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -100,7 +100,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]]() + private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) @@ -508,36 +508,21 @@ class SparkContext( * Return information about what RDDs are cached, if they are in mem or on disk, how much space * they take, etc. */ - def getRDDStorageInfo : Array[RDDInfo] = { - StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) + def getRDDStorageInfo(): Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus(), this) } - def getStageInfo: Map[Stage,StageInfo] = { + def getStageInfo(): Map[Stage,StageInfo] = { dagScheduler.stageToInfos } /** * Return information about blocks stored in all of the slaves */ - def getExecutorStorageStatus : Array[StorageStatus] = { + def getExecutorStorageStatus(): Array[StorageStatus] = { env.blockManager.master.getStorageStatus } - def removeRDD(id: Int): Unit = { - val storageStatusList = getExecutorStorageStatus - val groupedRddBlocks = storageStatusList.flatMap(_.blocks).toMap - logInfo("RDD to remove: " + id) - groupedRddBlocks.foreach(x => { - val k = x._1.substring(0,x._1.lastIndexOf('_')) - val rdd_id = "rdd_" + id - logInfo("RDD to check: " + rdd_id) - if(k.equals(rdd_id)) { - env.blockManager.master.removeBlock(x._1) - } - }) - persistentRdds.remove(id) - } - /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 07da572044..c9e4519efe 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -45,7 +45,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val storageStatusList = sc.getExecutorStorageStatus + val storageStatusList = sc.getExecutorStorageStatus() // Calculate macro-level statistics val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) @@ -60,7 +60,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, parameter("id") { id => completeWith { val prefix = "rdd_" + id.toString - val storageStatusList = sc.getExecutorStorageStatus + val storageStatusList = sc.getExecutorStorageStatus() val filteredStorageStatusList = StorageUtils. filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index c7f6ab3133..ab3e197035 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -3,8 +3,10 @@ package spark import network.ConnectionManagerId import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Timeouts._ import org.scalatest.matchers.ShouldMatchers import org.scalatest.prop.Checkers +import org.scalatest.time.{Span, Millis} import org.scalacheck.Arbitrary._ import org.scalacheck.Gen import org.scalacheck.Prop._ @@ -252,24 +254,36 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(data2.count === 2) } } - - test("remove RDDs cleanly") { + + test("unpersist RDDs") { DistributedSuite.amMaster = true sc = new SparkContext("local-cluster[3,1,512]", "test") val data = sc.parallelize(Seq(true, false, false, false), 4) data.persist(StorageLevel.MEMORY_ONLY_2) data.count - sc.removeRDD(data.id) + assert(sc.persistentRdds.isEmpty == false) + data.unpersist() assert(sc.persistentRdds.isEmpty == true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case e: Exception => + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } assert(sc.getRDDStorageInfo.isEmpty == true) - } } object DistributedSuite { // Indicates whether this JVM is marked for failure. var mark = false - + // Set by test to remember if we are in the driver program so we can assert // that we are not. var amMaster = false @@ -286,9 +300,9 @@ object DistributedSuite { // Act like an identity function, but if mark was set to true previously, fail, // crashing the entire JVM. def failOnMarkedIdentity(item: Boolean): Boolean = { - if (mark) { + if (mark) { System.exit(42) - } + } item - } + } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 88b7ab9f52..cee6312572 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -2,6 +2,8 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.FunSuite +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.time.{Span, Millis} import spark.SparkContext._ import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD} @@ -100,11 +102,26 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } - test("remove RDD") { - sc = new SparkContext("local", "test") - val rdd = sc.makeRDD(Array(1,2,3,4), 2).cache() - sc.removeRDD(rdd.id) - assert(sc.persistentRdds.empty == true) + test("unpersist RDD") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() + rdd.count + assert(sc.persistentRdds.isEmpty == false) + rdd.unpersist() + assert(sc.persistentRdds.isEmpty == true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case e: Exception => + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } + assert(sc.getRDDStorageInfo.isEmpty == true) } test("caching with failures") { From 34637b97ec7ebdd356653324f15345b00b3a2ac2 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 16:12:37 -0700 Subject: [PATCH 10/16] Added SparkContext.cleanup back. Not sure why it was removed before ... --- core/src/main/scala/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index b686c595b8..401e55d615 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -746,7 +746,7 @@ class SparkContext( /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ private[spark] def cleanup(cleanupTime: Long) { - // do nothing. this needs to be removed. + persistentRdds.clearOldValues(cleanupTime) } } From 204eb32e14e8fce5e4b4cf602375ae9b4ed136c9 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 16:14:58 -0700 Subject: [PATCH 11/16] Changed the type of the persistentRdds hashmap back to TimeStampedHashMap. --- core/src/main/scala/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 401e55d615..d7d450d958 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -100,7 +100,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]] + private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) From 207afe4088219a0c7350b3f80eb60e86c97e140f Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 18 Apr 2013 12:08:11 -0700 Subject: [PATCH 12/16] Remove spark-repl's extraneous dependency on spark-streaming --- project/SparkBuild.scala | 2 +- repl/pom.xml | 14 -------------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f2410085d8..190d723435 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -29,7 +29,7 @@ object SparkBuild extends Build { lazy val core = Project("core", file("core"), settings = coreSettings) - lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn (streaming) + lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) diff --git a/repl/pom.xml b/repl/pom.xml index 038da5d988..92a2020b48 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -96,13 +96,6 @@ hadoop1 runtime - - org.spark-project - spark-streaming - ${project.version} - hadoop1 - runtime - org.apache.hadoop hadoop-core @@ -147,13 +140,6 @@ hadoop2 runtime - - org.spark-project - spark-streaming - ${project.version} - hadoop2 - runtime - org.apache.hadoop hadoop-core From c047f0e3adae59d7e388a1d42d940c3cd5714f82 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 26 Apr 2013 13:28:21 +0800 Subject: [PATCH 13/16] filter out Spark streaming block RDD and sort RDDInfo with id --- .../scala/spark/storage/StorageUtils.scala | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index dec47a9d41..8f52168c24 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -4,9 +4,9 @@ import spark.{Utils, SparkContext} import BlockManagerMasterActor.BlockStatus private[spark] -case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, +case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, blocks: Map[String, BlockStatus]) { - + def memUsed(blockPrefix: String = "") = { blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize). reduceOption(_+_).getOrElse(0l) @@ -22,35 +22,40 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) { + numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) + extends Ordered[RDDInfo] { override def toString = { import Utils.memoryBytesToString "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id, storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize)) } + + override def compare(that: RDDInfo) = { + this.id - that.id + } } /* Helper methods for storage-related objects */ private[spark] object StorageUtils { - /* Given the current storage status of the BlockManager, returns information for each RDD */ - def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus], + /* Given the current storage status of the BlockManager, returns information for each RDD */ + def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus], sc: SparkContext) : Array[RDDInfo] = { - rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc) + rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc) } - /* Given a list of BlockStatus objets, returns information for each RDD */ - def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], + /* Given a list of BlockStatus objets, returns information for each RDD */ + def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], sc: SparkContext) : Array[RDDInfo] = { // Group by rddId, ignore the partition name - val groupedRddBlocks = infos.groupBy { case(k, v) => + val groupedRddBlocks = infos.filterKeys(_.startsWith("rdd_")).groupBy { case(k, v) => k.substring(0,k.lastIndexOf('_')) }.mapValues(_.values.toArray) // For each RDD, generate an RDDInfo object - groupedRddBlocks.map { case(rddKey, rddBlocks) => + val rddInfos = groupedRddBlocks.map { case(rddKey, rddBlocks) => // Add up memory and disk sizes val memSize = rddBlocks.map(_.memSize).reduce(_ + _) @@ -65,10 +70,14 @@ object StorageUtils { RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize) }.toArray + + scala.util.Sorting.quickSort(rddInfos) + + rddInfos } - /* Removes all BlockStatus object that are not part of a block prefix */ - def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus], + /* Removes all BlockStatus object that are not part of a block prefix */ + def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus], prefix: String) : Array[StorageStatus] = { storageStatusList.map { status => From 98df9d28536f5208530488a316df9401e16490bd Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 20:17:09 -0700 Subject: [PATCH 14/16] Added removeRdd function in BlockManager. --- core/src/main/scala/spark/RDD.scala | 15 +++-------- core/src/main/scala/spark/SparkContext.scala | 8 +++--- .../spark/storage/BlockManagerMaster.scala | 16 ++++++++++++ .../scala/spark/storage/BlockManagerUI.scala | 4 +-- .../spark/storage/BlockManagerSuite.scala | 25 +++++++++++++++++++ 5 files changed, 51 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index c77f9915c0..fd14ef17f1 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -107,7 +107,7 @@ abstract class RDD[T: ClassManifest]( // ======================================================================= /** A unique ID for this RDD (within its SparkContext). */ - val id = sc.newRddId() + val id: Int = sc.newRddId() /** A friendly name for this RDD */ var name: String = null @@ -120,7 +120,8 @@ abstract class RDD[T: ClassManifest]( /** * Set this RDD's storage level to persist its values across operations after the first time - * it is computed. Can only be called once on each RDD. + * it is computed. This can only be used to assign a new storage level if the RDD does not + * have a storage level set yet.. */ def persist(newLevel: StorageLevel): RDD[T] = { // TODO: Handle changes of StorageLevel @@ -143,15 +144,7 @@ abstract class RDD[T: ClassManifest]( /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */ def unpersist(): RDD[T] = { logInfo("Removing RDD " + id + " from persistence list") - val rddBlockPrefix = "rdd_" + id + "_" - // Get the list of blocks in block manager, and remove ones that are part of this RDD. - // The runtime complexity is linear to the number of blocks persisted in the cluster. - // It could be expensive if the cluster is large and has a lot of blocks persisted. - sc.getExecutorStorageStatus().flatMap(_.blocks).foreach { case(blockId, status) => - if (blockId.startsWith(rddBlockPrefix)) { - sc.env.blockManager.master.removeBlock(blockId) - } - } + sc.env.blockManager.master.removeRdd(id) sc.persistentRdds.remove(id) storageLevel = StorageLevel.NONE this diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index d7d450d958..2ae4ad8659 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -508,18 +508,18 @@ class SparkContext( * Return information about what RDDs are cached, if they are in mem or on disk, how much space * they take, etc. */ - def getRDDStorageInfo(): Array[RDDInfo] = { - StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus(), this) + def getRDDStorageInfo: Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) } - def getStageInfo(): Map[Stage,StageInfo] = { + def getStageInfo: Map[Stage,StageInfo] = { dagScheduler.stageToInfos } /** * Return information about blocks stored in all of the slaves */ - def getExecutorStorageStatus(): Array[StorageStatus] = { + def getExecutorStorageStatus: Array[StorageStatus] = { env.blockManager.master.getStorageStatus } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 6fae62d373..ac26c16867 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -15,6 +15,7 @@ import akka.util.duration._ import spark.{Logging, SparkException, Utils} + private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging { val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt @@ -87,6 +88,21 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi askDriverWithReply(RemoveBlock(blockId)) } + /** + * Remove all blocks belonging to the given RDD. + */ + def removeRdd(rddId: Int) { + val rddBlockPrefix = "rdd_" + rddId + "_" + // Get the list of blocks in block manager, and remove ones that are part of this RDD. + // The runtime complexity is linear to the number of blocks persisted in the cluster. + // It could be expensive if the cluster is large and has a lot of blocks persisted. + getStorageStatus.flatMap(_.blocks).foreach { case(blockId, status) => + if (blockId.startsWith(rddBlockPrefix)) { + removeBlock(blockId) + } + } + } + /** * Return the memory status for each block manager, in the form of a map from * the block manager's id to two long values. The first value is the maximum diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index c9e4519efe..07da572044 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -45,7 +45,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val storageStatusList = sc.getExecutorStorageStatus() + val storageStatusList = sc.getExecutorStorageStatus // Calculate macro-level statistics val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) @@ -60,7 +60,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, parameter("id") { id => completeWith { val prefix = "rdd_" + id.toString - val storageStatusList = sc.getExecutorStorageStatus() + val storageStatusList = sc.getExecutorStorageStatus val filteredStorageStatusList = StorageUtils. filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 5a11a4483b..9fe0de665c 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -207,6 +207,31 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } } + test("removing rdd") { + store = new BlockManager("", actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + // Putting a1, a2 and a3 in memory. + store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) + master.removeRdd(0) + + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("rdd_0_0") should be (None) + master.getLocations("rdd_0_0") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("rdd_0_1") should be (None) + master.getLocations("rdd_0_1") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("nonrddblock") should not be (None) + master.getLocations("nonrddblock") should have size (1) + } + } + test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) store = new BlockManager("", actorSystem, master, serializer, 2000) From 4a318774088f829fe54c3ef0b5f565a845631b4e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 20:31:54 -0700 Subject: [PATCH 15/16] Added the unpersist api to JavaRDD. --- core/src/main/scala/spark/api/java/JavaRDD.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index e29f1e5899..eb81ed64cd 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -14,12 +14,18 @@ JavaRDDLike[T, JavaRDD[T]] { /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaRDD[T] = wrapRDD(rdd.cache()) - /** + /** * Set this RDD's storage level to persist its values across operations after the first time - * it is computed. Can only be called once on each RDD. + * it is computed. This can only be used to assign a new storage level if the RDD does not + * have a storage level set yet.. */ def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel)) + /** + * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + */ + def unpersist(): JavaRDD[T] = wrapRDD(rdd.unpersist()) + // Transformations (return a new RDD) /** @@ -31,7 +37,7 @@ JavaRDDLike[T, JavaRDD[T]] { * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions)) - + /** * Return a new RDD containing only the elements that satisfy a predicate. */ @@ -54,7 +60,7 @@ JavaRDDLike[T, JavaRDD[T]] { */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] = wrapRDD(rdd.sample(withReplacement, fraction, seed)) - + /** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). @@ -63,7 +69,7 @@ JavaRDDLike[T, JavaRDD[T]] { /** * Return an RDD with the elements from `this` that are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ From c847dd3da29483fede326cb9821b0d33f735137e Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Tue, 19 Mar 2013 15:08:22 -0700 Subject: [PATCH 16/16] Don't accept generated temp directory names that can't be created successfully. --- core/src/main/scala/spark/storage/DiskStore.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index c9553d2e0f..215c25132b 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -168,8 +168,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) localDir = new File(rootDir, "spark-local-" + localDirId) if (!localDir.exists) { - localDir.mkdirs() - foundLocalDir = true + foundLocalDir = localDir.mkdirs() } } catch { case e: Exception =>