diff --git a/README.md b/README.md index 6ffa3f4804..a0f42d5376 100644 --- a/README.md +++ b/README.md @@ -17,15 +17,14 @@ Spark requires Scala 2.9.1. This version has been tested with 2.9.1.final. The project is built using Simple Build Tool (SBT), which is packaged with it. To build Spark and its example programs, run: - sbt/sbt compile + sbt/sbt package To run Spark, you will need to have Scala's bin in your `PATH`, or you will need to set the `SCALA_HOME` environment variable to point to where you've installed Scala. Scala must be accessible through one of these methods on Mesos slave nodes as well as on the master. -To run one of the examples, first run `sbt/sbt package` to create a JAR with -the example classes. Then use `./run `. For example: +To run one of the examples, first run `sbt/sbt package` to build them. Then use `./run `. For example: ./run spark.examples.SparkLR local[2] diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index cc5bed257b..a6e6099d86 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -36,7 +36,16 @@ class Accumulable[T,R] ( else throw new UnsupportedOperationException("Can't use read value in task") } - private[spark] def localValue = value_ + /** + * Get the current value of this accumulator from within a task. + * + * This is NOT the global value of the accumulator. To get the global value after a + * completed operation on the dataset, call `value`. + * + * The typical use of this method is to directly mutate the local value, eg., to add + * an element to a Set. + */ + def localValue = value_ def value_= (t: T) { if (!deserialized) value_ = t @@ -156,4 +165,4 @@ private object Accumulators { } } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala index 1b4af9d84c..27bdbf3224 100644 --- a/core/src/main/scala/spark/DAGScheduler.scala +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -309,6 +309,7 @@ private trait DAGScheduler extends Scheduler with Logging { // outputs on the node as dead. case _ => // Non-fetch failure -- probably a bug in the job, so bail out + eventQueues -= runId throw new SparkException("Task failed: " + evt.task + ", reason: " + evt.reason) // TODO: Cancel all tasks that are still running } diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index 01c7efff1e..fa2832bc6e 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -293,6 +293,9 @@ class SimpleJob( if (numFailures(index) > MAX_TASK_FAILURES) { logError("Task %d:%d failed more than %d times; aborting job".format( jobId, index, MAX_TASK_FAILURES)) + val taskEndReason = ser.deserialize[TaskEndReason]( + status.getData.toByteArray, getClass.getClassLoader) + sched.taskEnded(tasks(index), taskEndReason, null, null) // To make DAGScheduler stop abort("Task %d failed more than %d times".format(index, MAX_TASK_FAILURES)) } } diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index 68230c4b92..f63126a45b 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -81,9 +81,9 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers { for (nThreads <- List(1, 10)) { //test single & multi-threaded val sc = new SparkContext("local[" + nThreads + "]", "test") - val setAcc = sc.accumlableCollection(mutable.HashSet[Int]()) - val bufferAcc = sc.accumlableCollection(mutable.ArrayBuffer[Int]()) - val mapAcc = sc.accumlableCollection(mutable.HashMap[Int,String]()) + val setAcc = sc.accumulableCollection(mutable.HashSet[Int]()) + val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]()) + val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]()) val d = sc.parallelize( (1 to maxI) ++ (1 to maxI)) d.foreach { x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)} @@ -100,7 +100,21 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers { } sc.stop() } - } -} \ No newline at end of file + test ("localValue readable in tasks") { + import SetAccum._ + val maxI = 1000 + for (nThreads <- List(1, 10)) { //test single & multi-threaded + val sc = new SparkContext("local[" + nThreads + "]", "test") + val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) + val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet} + val d = sc.parallelize(groupedInts) + d.foreach { + x => acc.localValue ++= x + } + acc.value should be ( (0 to maxI).toSet) + } + } + +} diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 0b85bbd46f..8879da4b61 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -58,7 +58,9 @@ def parse_args(): "WARNING: must be 64-bit; small instances won't work") parser.add_option("-m", "--master-instance-type", default="", help="Master instance type (leave empty for same as instance-type)") - parser.add_option("-z", "--zone", default="us-east-1b", + parser.add_option("-r", "--region", default="us-east-1", + help="EC2 region zone to launch instances in") + parser.add_option("-z", "--zone", default="", help="Availability zone to launch instances in") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + @@ -438,7 +440,7 @@ def ssh(host, opts, command): def main(): (opts, action, cluster_name) = parse_args() - conn = boto.connect_ec2() + conn = boto.ec2.connect_to_region(opts.region) # Select an AZ at random if it was not specified. if opts.zone == "":