From b8cd6811697f10cd969cfe750f7fd815e0545177 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 2 Oct 2012 11:52:12 -0700 Subject: [PATCH 1/2] Allow whitespaces in cluster URL configuration for local cluster. --- core/src/main/scala/spark/SparkContext.scala | 68 +++++++++---------- .../test/scala/spark/DistributedSuite.scala | 22 +++++- 2 files changed, 53 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 79a9e8e34e..0a0a5d772b 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -55,7 +55,7 @@ class SparkContext( val sparkHome: String, val jars: Seq[String]) extends Logging { - + def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil) // Ensure logging is initialized before we spawn any threads @@ -78,30 +78,30 @@ class SparkContext( true, isLocal) SparkEnv.set(env) - + // Used to store a URL for each static file/jar together with the file's local timestamp val addedFiles = HashMap[String, Long]() val addedJars = HashMap[String, Long]() - + // Add each JAR given through the constructor jars.foreach { addJar(_) } - + // Create and start the scheduler private var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks - val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r // Regular expression for simulating a Spark cluster of [N, cores, memory] locally - val LOCAL_CLUSTER_REGEX = """local-cluster\[([0-9]+),([0-9]+),([0-9]+)]""".r + val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters val SPARK_REGEX = """(spark://.*)""".r - + master match { - case "local" => + case "local" => new LocalScheduler(1, 0, this) - case LOCAL_N_REGEX(threads) => + case LOCAL_N_REGEX(threads) => new LocalScheduler(threads.toInt, 0, this) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => @@ -112,7 +112,7 @@ class SparkContext( val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) scheduler.initialize(backend) scheduler - + case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) => val scheduler = new ClusterScheduler(this) val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt) @@ -140,13 +140,13 @@ class SparkContext( taskScheduler.start() private var dagScheduler = new DAGScheduler(taskScheduler) - + // Methods for creating RDDs def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { new ParallelCollection[T](this, seq, numSlices) } - + def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { parallelize(seq, numSlices) } @@ -187,14 +187,14 @@ class SparkContext( } /** - * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, * values and the InputFormat so that users don't need to pass them directly. */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]) : RDD[(K, V)] = { hadoopFile(path, - fm.erasure.asInstanceOf[Class[F]], + fm.erasure.asInstanceOf[Class[F]], km.erasure.asInstanceOf[Class[K]], vm.erasure.asInstanceOf[Class[V]], minSplits) @@ -215,7 +215,7 @@ class SparkContext( new Configuration) } - /** + /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. */ @@ -231,7 +231,7 @@ class SparkContext( new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) } - /** + /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. */ @@ -257,14 +257,14 @@ class SparkContext( sequenceFile(path, keyClass, valueClass, defaultMinSplits) /** - * Version of sequenceFile() for types implicitly convertible to Writables through a + * Version of sequenceFile() for types implicitly convertible to Writables through a * WritableConverter. * * WritableConverters are provided in a somewhat strange way (by an implicit function) to support - * both subclasses of Writable and types for which we define a converter (e.g. Int to + * both subclasses of Writable and types for which we define a converter (e.g. Int to * IntWritable). The most natural thing would've been to have implicit objects for the * converters, but then we couldn't have an object for every subclass of Writable (you can't - * have a parameterized singleton object). We use functions instead to create a new converter + * have a parameterized singleton object). We use functions instead to create a new converter * for the appropriate type. In addition, we pass the converter a ClassManifest of its type to * allow it to figure out the Writable class to use in the subclass case. */ @@ -289,7 +289,7 @@ class SparkContext( * that there's very little effort required to save arbitrary objects. */ def objectFile[T: ClassManifest]( - path: String, + path: String, minSplits: Int = defaultMinSplits ): RDD[T] = { sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits) @@ -318,7 +318,7 @@ class SparkContext( /** * Create an accumulator from a "mutable collection" type. - * + * * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by * standard mutable collections. So you can use this with mutable Map, Set, etc. */ @@ -329,7 +329,7 @@ class SparkContext( // Keep around a weak hash map of values to Cached versions? def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal) - + // Adds a file dependency to all Tasks executed in the future. def addFile(path: String) { val uri = new URI(path) @@ -338,11 +338,11 @@ class SparkContext( case _ => path } addedFiles(key) = System.currentTimeMillis - + // Fetch the file locally in case the task is executed locally val filename = new File(path.split("/").last) Utils.fetchFile(path, new File(".")) - + logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) } @@ -350,7 +350,7 @@ class SparkContext( addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedFiles.clear() } - + // Adds a jar dependency to all Tasks executed in the future. def addJar(path: String) { val uri = new URI(path) @@ -366,7 +366,7 @@ class SparkContext( addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedJars.clear() } - + // Stop the SparkContext def stop() { dagScheduler.stop() @@ -400,7 +400,7 @@ class SparkContext( /** * Run a function on a given set of partitions in an RDD and return the results. This is the main * entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies - * whether the scheduler can run the computation on the master rather than shipping it out to the + * whether the scheduler can run the computation on the master rather than shipping it out to the * cluster, for short actions like first(). */ def runJob[T, U: ClassManifest]( @@ -419,13 +419,13 @@ class SparkContext( def runJob[T, U: ClassManifest]( rdd: RDD[T], - func: Iterator[T] => U, + func: Iterator[T] => U, partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) } - + /** * Run a job on all partitions in an RDD and return the results in an array. */ @@ -472,7 +472,7 @@ class SparkContext( private[spark] def newShuffleId(): Int = { nextShuffleId.getAndIncrement() } - + private var nextRddId = new AtomicInteger(0) // Register a new RDD, returning its RDD ID @@ -500,7 +500,7 @@ object SparkContext { implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = new PairRDDFunctions(rdd) - + implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest]( rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) @@ -521,7 +521,7 @@ object SparkContext { implicit def longToLongWritable(l: Long) = new LongWritable(l) implicit def floatToFloatWritable(f: Float) = new FloatWritable(f) - + implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d) implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b) @@ -532,7 +532,7 @@ object SparkContext { private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = { def anyToWritable[U <% Writable](u: U): Writable = u - + new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray) } @@ -576,7 +576,7 @@ object SparkContext { Nil } } - + // Find the JAR that contains the class of a particular object def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass) } diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index fce1deaa5c..48c0a830e0 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -18,9 +18,9 @@ import storage.StorageLevel class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { val clusterUrl = "local-cluster[2,1,512]" - + @transient var sc: SparkContext = _ - + after { if (sc != null) { sc.stop() @@ -28,6 +28,22 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } } + test("local-cluster format") { + sc = new SparkContext("local-cluster[2,1,512]", "test") + assert(sc.parallelize(1 to 2, 2).count == 2) + sc.stop() + sc = new SparkContext("local-cluster[2 , 1 , 512]", "test") + assert(sc.parallelize(1 to 2, 2).count == 2) + sc.stop() + sc = new SparkContext("local-cluster[2, 1, 512]", "test") + assert(sc.parallelize(1 to 2, 2).count == 2) + sc.stop() + sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test") + assert(sc.parallelize(1 to 2, 2).count == 2) + sc.stop() + sc = null + } + test("simple groupByKey") { sc = new SparkContext(clusterUrl, "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5) @@ -38,7 +54,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter val valuesFor2 = groups.find(_._1 == 2).get._2 assert(valuesFor2.toList.sorted === List(1)) } - + test("accumulators") { sc = new SparkContext(clusterUrl, "test") val accum = sc.accumulator(0) From 79975856165099c74ee04e0e5443a9832014eaff Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 2 Oct 2012 15:45:25 -0700 Subject: [PATCH 2/2] Added a check to make sure SPARK_MEM <= memoryPerSlave for local cluster mode. --- core/src/main/scala/spark/SparkContext.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0a0a5d772b..1ef1712c56 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -113,9 +113,20 @@ class SparkContext( scheduler.initialize(backend) scheduler - case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) => + case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => + // Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang. + val memoryPerSlaveInt = memoryPerSlave.toInt + val sparkMemEnv = System.getenv("SPARK_MEM") + val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512 + if (sparkMemEnvInt > memoryPerSlaveInt) { + throw new SparkException( + "Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format( + memoryPerSlaveInt, sparkMemEnvInt)) + } + val scheduler = new ClusterScheduler(this) - val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt) + val localCluster = new LocalSparkCluster( + numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) val sparkUrl = localCluster.start() val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) scheduler.initialize(backend)