Merge branch 'master' into collection_accumulators

Conflicts:
	core/src/test/scala/spark/AccumulatorSuite.scala
This commit is contained in:
Imran Rashid 2012-09-11 11:20:56 -07:00
commit 1490d09b5d
6 changed files with 40 additions and 12 deletions

View file

@ -17,15 +17,14 @@ Spark requires Scala 2.9.1. This version has been tested with 2.9.1.final.
The project is built using Simple Build Tool (SBT), which is packaged with it.
To build Spark and its example programs, run:
sbt/sbt compile
sbt/sbt package
To run Spark, you will need to have Scala's bin in your `PATH`, or you
will need to set the `SCALA_HOME` environment variable to point to where
you've installed Scala. Scala must be accessible through one of these
methods on Mesos slave nodes as well as on the master.
To run one of the examples, first run `sbt/sbt package` to create a JAR with
the example classes. Then use `./run <class> <params>`. For example:
To run one of the examples, first run `sbt/sbt package` to build them. Then use `./run <class> <params>`. For example:
./run spark.examples.SparkLR local[2]

View file

@ -36,7 +36,16 @@ class Accumulable[T,R] (
else throw new UnsupportedOperationException("Can't use read value in task")
}
private[spark] def localValue = value_
/**
* Get the current value of this accumulator from within a task.
*
* This is NOT the global value of the accumulator. To get the global value after a
* completed operation on the dataset, call `value`.
*
* The typical use of this method is to directly mutate the local value, eg., to add
* an element to a Set.
*/
def localValue = value_
def value_= (t: T) {
if (!deserialized) value_ = t
@ -156,4 +165,4 @@ private object Accumulators {
}
}
}
}
}

View file

@ -309,6 +309,7 @@ private trait DAGScheduler extends Scheduler with Logging {
// outputs on the node as dead.
case _ =>
// Non-fetch failure -- probably a bug in the job, so bail out
eventQueues -= runId
throw new SparkException("Task failed: " + evt.task + ", reason: " + evt.reason)
// TODO: Cancel all tasks that are still running
}

View file

@ -293,6 +293,9 @@ class SimpleJob(
if (numFailures(index) > MAX_TASK_FAILURES) {
logError("Task %d:%d failed more than %d times; aborting job".format(
jobId, index, MAX_TASK_FAILURES))
val taskEndReason = ser.deserialize[TaskEndReason](
status.getData.toByteArray, getClass.getClassLoader)
sched.taskEnded(tasks(index), taskEndReason, null, null) // To make DAGScheduler stop
abort("Task %d failed more than %d times".format(index, MAX_TASK_FAILURES))
}
}

View file

@ -81,9 +81,9 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers {
for (nThreads <- List(1, 10)) {
//test single & multi-threaded
val sc = new SparkContext("local[" + nThreads + "]", "test")
val setAcc = sc.accumlableCollection(mutable.HashSet[Int]())
val bufferAcc = sc.accumlableCollection(mutable.ArrayBuffer[Int]())
val mapAcc = sc.accumlableCollection(mutable.HashMap[Int,String]())
val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
val d = sc.parallelize( (1 to maxI) ++ (1 to maxI))
d.foreach {
x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)}
@ -100,7 +100,21 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers {
}
sc.stop()
}
}
}
test ("localValue readable in tasks") {
import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
val sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
val d = sc.parallelize(groupedInts)
d.foreach {
x => acc.localValue ++= x
}
acc.value should be ( (0 to maxI).toSet)
}
}
}

View file

@ -58,7 +58,9 @@ def parse_args():
"WARNING: must be 64-bit; small instances won't work")
parser.add_option("-m", "--master-instance-type", default="",
help="Master instance type (leave empty for same as instance-type)")
parser.add_option("-z", "--zone", default="us-east-1b",
parser.add_option("-r", "--region", default="us-east-1",
help="EC2 region zone to launch instances in")
parser.add_option("-z", "--zone", default="",
help="Availability zone to launch instances in")
parser.add_option("-a", "--ami", default="latest",
help="Amazon Machine Image ID to use, or 'latest' to use latest " +
@ -438,7 +440,7 @@ def ssh(host, opts, command):
def main():
(opts, action, cluster_name) = parse_args()
conn = boto.connect_ec2()
conn = boto.ec2.connect_to_region(opts.region)
# Select an AZ at random if it was not specified.
if opts.zone == "":