Merge branch 'master' of github.com:mesos/spark into blockmanager

This commit is contained in:
Reynold Xin 2013-05-03 01:02:32 -07:00
commit 93091f6936
13 changed files with 301 additions and 100 deletions

View file

@ -107,7 +107,7 @@ abstract class RDD[T: ClassManifest](
// =======================================================================
/** A unique ID for this RDD (within its SparkContext). */
val id = sc.newRddId()
val id: Int = sc.newRddId()
/** A friendly name for this RDD */
var name: String = null
@ -120,7 +120,8 @@ abstract class RDD[T: ClassManifest](
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): RDD[T] = {
// TODO: Handle changes of StorageLevel
@ -140,6 +141,15 @@ abstract class RDD[T: ClassManifest](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
/** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */
def unpersist(): RDD[T] = {
logInfo("Removing RDD " + id + " from persistence list")
sc.env.blockManager.master.removeRdd(id)
sc.persistentRdds.remove(id)
storageLevel = StorageLevel.NONE
this
}
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel = storageLevel

View file

@ -1,47 +1,50 @@
package spark
import java.io._
import java.util.concurrent.atomic.AtomicInteger
import java.net.URI
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import scala.collection.mutable.{ConcurrentMap, HashMap}
import akka.actor.Actor._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.SequenceFileInputFormat
import org.apache.hadoop.io.Writable
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.FloatWritable
import org.apache.hadoop.io.DoubleWritable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.ArrayWritable
import org.apache.hadoop.io.BooleanWritable
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.ArrayWritable
import org.apache.hadoop.io.DoubleWritable
import org.apache.hadoop.io.FloatWritable
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.SequenceFileInputFormat
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{SparkHadoopUtil, LocalSparkCluster}
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler._
import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.BlockManagerUI
import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@ -97,7 +100,7 @@ class SparkContext(
private[spark] val addedJars = HashMap[String, Long]()
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)

View file

@ -16,10 +16,16 @@ JavaRDDLike[T, JavaRDD[T]] {
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel))
/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
*/
def unpersist(): JavaRDD[T] = wrapRDD(rdd.unpersist())
// Transformations (return a new RDD)
/**

View file

@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
} )
}
// MUST be called within selector loop - else deadlock.
private def triggerForceCloseByException(key: SelectionKey, e: Exception) {
try {
key.interestOps(0)
} catch {
// ignore exceptions
case e: Exception => logDebug("Ignoring exception", e)
}
val conn = connectionsByKey.getOrElse(key, null)
if (conn == null) return
// Pushing to connect threadpool
handleConnectExecutor.execute(new Runnable {
override def run() {
try {
conn.callOnExceptionCallback(e)
} catch {
// ignore exceptions
case e: Exception => logDebug("Ignoring exception", e)
}
try {
conn.close()
} catch {
// ignore exceptions
case e: Exception => logDebug("Ignoring exception", e)
}
}
})
}
def run() {
try {
while(!selectorThread.isInterrupted) {
@ -200,6 +232,9 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
while(!keyInterestChangeRequests.isEmpty) {
val (key, ops) = keyInterestChangeRequests.dequeue
try {
if (key.isValid) {
val connection = connectionsByKey.getOrElse(key, null)
if (connection != null) {
val lastOps = key.interestOps()
@ -220,9 +255,53 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
"] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
}
}
} else {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
}
case e: Exception => {
logError("Exception processing key " + key, e)
triggerForceCloseByException(key, e)
}
}
}
val selectedKeysCount =
try {
selector.select()
} catch {
// Explicitly only dealing with CancelledKeyException here since other exceptions should be dealt with differently.
case e: CancelledKeyException => {
// Some keys within the selectors list are invalid/closed. clear them.
val allKeys = selector.keys().iterator()
while (allKeys.hasNext()) {
val key = allKeys.next()
try {
if (! key.isValid) {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
}
case e: Exception => {
logError("Exception processing key " + key, e)
triggerForceCloseByException(key, e)
}
}
}
}
0
}
val selectedKeysCount = selector.select()
if (selectedKeysCount == 0) {
logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
}
@ -231,10 +310,12 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
return
}
if (0 != selectedKeysCount) {
val selectedKeys = selector.selectedKeys().iterator()
while (selectedKeys.hasNext()) {
val key = selectedKeys.next
selectedKeys.remove()
try {
if (key.isValid) {
if (key.isAcceptable) {
acceptConnection(key)
@ -248,6 +329,21 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
if (key.isWritable) {
triggerWrite(key)
}
} else {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
// weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
}
case e: Exception => {
logError("Exception processing key " + key, e)
triggerForceCloseByException(key, e)
}
}
}
}
}

View file

@ -15,6 +15,7 @@ import akka.util.duration._
import spark.{Logging, SparkException, Utils}
private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
@ -87,6 +88,21 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
askDriverWithReply(RemoveBlock(blockId))
}
/**
* Remove all blocks belonging to the given RDD.
*/
def removeRdd(rddId: Int) {
val rddBlockPrefix = "rdd_" + rddId + "_"
// Get the list of blocks in block manager, and remove ones that are part of this RDD.
// The runtime complexity is linear to the number of blocks persisted in the cluster.
// It could be expensive if the cluster is large and has a lot of blocks persisted.
getStorageStatus.flatMap(_.blocks).foreach { case(blockId, status) =>
if (blockId.startsWith(rddBlockPrefix)) {
removeBlock(blockId)
}
}
}
/**
* Return the memory status for each block manager, in the form of a map from
* the block manager's id to two long values. The first value is the maximum

View file

@ -236,8 +236,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
localDir = new File(rootDir, "spark-local-" + localDirId)
if (!localDir.exists) {
localDir.mkdirs()
foundLocalDir = true
foundLocalDir = localDir.mkdirs()
}
} catch {
case e: Exception =>

View file

@ -22,12 +22,17 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
}
case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) {
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
extends Ordered[RDDInfo] {
override def toString = {
import Utils.memoryBytesToString
"RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
}
override def compare(that: RDDInfo) = {
this.id - that.id
}
}
/* Helper methods for storage-related objects */
@ -45,12 +50,12 @@ object StorageUtils {
sc: SparkContext) : Array[RDDInfo] = {
// Group by rddId, ignore the partition name
val groupedRddBlocks = infos.groupBy { case(k, v) =>
val groupedRddBlocks = infos.filterKeys(_.startsWith("rdd_")).groupBy { case(k, v) =>
k.substring(0,k.lastIndexOf('_'))
}.mapValues(_.values.toArray)
// For each RDD, generate an RDDInfo object
groupedRddBlocks.map { case(rddKey, rddBlocks) =>
val rddInfos = groupedRddBlocks.map { case(rddKey, rddBlocks) =>
// Add up memory and disk sizes
val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
@ -65,6 +70,10 @@ object StorageUtils {
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize)
}.toArray
scala.util.Sorting.quickSort(rddInfos)
rddInfos
}
/* Removes all BlockStatus object that are not part of a block prefix */

View file

@ -3,8 +3,10 @@ package spark
import network.ConnectionManagerId
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.prop.Checkers
import org.scalatest.time.{Span, Millis}
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
@ -252,6 +254,30 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(data2.count === 2)
}
}
test("unpersist RDDs") {
DistributedSuite.amMaster = true
sc = new SparkContext("local-cluster[3,1,512]", "test")
val data = sc.parallelize(Seq(true, false, false, false), 4)
data.persist(StorageLevel.MEMORY_ONLY_2)
data.count
assert(sc.persistentRdds.isEmpty == false)
data.unpersist()
assert(sc.persistentRdds.isEmpty == true)
failAfter(Span(3000, Millis)) {
try {
while (! sc.getRDDStorageInfo.isEmpty) {
Thread.sleep(200)
}
} catch {
case e: Exception =>
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
}
assert(sc.getRDDStorageInfo.isEmpty == true)
}
}
object DistributedSuite {

View file

@ -2,6 +2,8 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
@ -100,6 +102,28 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
test("unpersist RDD") {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
rdd.count
assert(sc.persistentRdds.isEmpty == false)
rdd.unpersist()
assert(sc.persistentRdds.isEmpty == true)
failAfter(Span(3000, Millis)) {
try {
while (! sc.getRDDStorageInfo.isEmpty) {
Thread.sleep(200)
}
} catch {
case e: Exception =>
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
}
assert(sc.getRDDStorageInfo.isEmpty == true)
}
test("caching with failures") {
sc = new SparkContext("local", "test")
val onlySplit = new Partition { override def index: Int = 0 }

View file

@ -207,6 +207,31 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
test("removing rdd") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
// Putting a1, a2 and a3 in memory.
store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
master.removeRdd(0)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
store.getSingle("rdd_0_0") should be (None)
master.getLocations("rdd_0_0") should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
store.getSingle("rdd_0_1") should be (None)
master.getLocations("rdd_0_1") should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
store.getSingle("nonrddblock") should not be (None)
master.getLocations("nonrddblock") should have size (1)
}
}
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)

View file

@ -29,7 +29,7 @@ object SparkBuild extends Build {
lazy val core = Project("core", file("core"), settings = coreSettings)
lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn (streaming)
lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming)

View file

@ -96,13 +96,6 @@
<classifier>hadoop1</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
@ -147,13 +140,6 @@
<classifier>hadoop2</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>

7
run
View file

@ -95,6 +95,7 @@ export JAVA_OPTS
CORE_DIR="$FWDIR/core"
REPL_DIR="$FWDIR/repl"
REPL_BIN_DIR="$FWDIR/repl-bin"
EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel"
STREAMING_DIR="$FWDIR/streaming"
@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then
CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
fi
CLASSPATH+=":$REPL_DIR/lib/*"
if [ -e repl-bin/target ]; then
for jar in `find "repl-bin/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
if [ -e $REPL_BIN_DIR/target ]; then
for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
CLASSPATH+=":$jar"
done
fi
@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH+=":$jar"
done
export CLASSPATH # Needed for spark-shell
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local.
@ -163,4 +163,5 @@ else
EXTRA_ARGS="$JAVA_OPTS"
fi
export CLASSPATH # Needed for spark-shell
exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"