Merge remote-tracking branch 'mesos/master' into ec2-updates

This commit is contained in:
Patrick Wendell 2013-08-24 14:50:58 -07:00
commit 4879685910
24 changed files with 763 additions and 121 deletions

View file

@ -16,7 +16,7 @@ Spark requires Scala 2.9.3 (Scala 2.10 is not yet supported). The project is
built using Simple Build Tool (SBT), which is packaged with it. To build
Spark and its example programs, run:
sbt/sbt package
sbt/sbt package assembly
Spark also supports building using Maven. If you would like to build using Maven,
see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html)
@ -43,10 +43,47 @@ locally with one thread, or "local[N]" to run locally with N threads.
## A Note About Hadoop Versions
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
storage systems. Because the HDFS API has changed in different versions of
storage systems. Because the protocols have changed in different versions of
Hadoop, you must build Spark against the same version that your cluster runs.
You can change the version by setting the `HADOOP_VERSION` variable at the top
of `project/SparkBuild.scala`, then rebuilding Spark.
You can change the version by setting the `SPARK_HADOOP_VERSION` environment
when building Spark.
For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop
versions without YARN, use:
# Apache Hadoop 1.2.1
$ SPARK_HADOOP_VERSION=1.2.1 sbt/sbt package assembly
# Cloudera CDH 4.2.0 with MapReduce v1
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt package assembly
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_WITH_YARN=true`:
# Apache Hadoop 2.0.5-alpha
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_WITH_YARN=true sbt/sbt package assembly
# Cloudera CDH 4.2.0 with MapReduce v2
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_WITH_YARN=true sbt/sbt package assembly
For convenience, these variables may also be set through the `conf/spark-env.sh` file
described below.
When developing a Spark application, specify the Hadoop version by adding the
"hadoop-client" artifact to your project's dependencies. For example, if you're
using Hadoop 1.0.1 and build your application using SBT, add this entry to
`libraryDependencies`:
"org.apache.hadoop" % "hadoop-client" % "1.2.1"
If your project is built with Maven, add this to your POM file's `<dependencies>` section:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<!-- the brackets are needed to tell Maven that this is a hard dependency on version "1.2.1" exactly -->
<version>[1.2.1]</version>
</dependency>
## Configuration

View file

@ -52,6 +52,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View file

@ -56,8 +56,7 @@ import spark.deploy.LocalSparkCluster
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
OrderedRDDFunctions}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler._
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
@ -65,6 +64,10 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.ui.SparkUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import scala.Some
import spark.scheduler.StageInfo
import spark.storage.RDDInfo
import spark.storage.StorageStatus
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@ -614,6 +617,16 @@ class SparkContext(
addedFiles.clear()
}
/**
* Gets the locality information associated with the partition in a particular rdd
* @param rdd of interest
* @param partition to be looked up for locality
* @return list of preferred locations for the partition
*/
private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
dagScheduler.getPreferredLocs(rdd, partition)
}
/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported

View file

@ -17,53 +17,76 @@
package spark.rdd
import spark.{Dependency, OneToOneDependency, NarrowDependency, RDD, Partition, TaskContext}
import spark._
import java.io.{ObjectOutputStream, IOException}
import scala.collection.mutable
import scala.Some
import scala.collection.mutable.ArrayBuffer
private[spark] case class CoalescedRDDPartition(
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int]
) extends Partition {
/**
* Class that captures a coalesced RDD by essentially keeping track of parent partitions
* @param index of this coalesced partition
* @param rdd which it belongs to
* @param parentsIndices list of indices in the parent that have been coalesced into this partition
* @param preferredLocation the preferred location for this partition
*/
case class CoalescedRDDPartition(
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int],
@transient preferredLocation: String = ""
) extends Partition {
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
// Update the reference to parent split at the time of task serialization
// Update the reference to parent partition at the time of task serialization
parents = parentsIndices.map(rdd.partitions(_))
oos.defaultWriteObject()
}
/**
* Computes how many of the parents partitions have getPreferredLocation
* as one of their preferredLocations
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction: Double = {
val loc = parents.count(p =>
rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
}
/**
* Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
* this RDD computes one or more of the parent ones. Will produce exactly `maxPartitions` if the
* parent had more than this many partitions, or fewer if the parent had fewer.
*
* This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
* or to avoid having a large number of small tasks when processing a directory with many files.
* Represents a coalesced RDD that has fewer partitions than its parent RDD
* This class uses the PartitionCoalescer class to find a good partitioning of the parent RDD
* so that each new partition has roughly the same number of parent partitions and that
* the preferred location of each new partition overlaps with as many preferred locations of its
* parent partitions
* @param prev RDD to be coalesced
* @param maxPartitions number of desired partitions in the coalesced RDD
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
class CoalescedRDD[T: ClassManifest](
@transient var prev: RDD[T],
maxPartitions: Int)
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getPartitions: Array[Partition] = {
val prevSplits = prev.partitions
if (prevSplits.length < maxPartitions) {
prevSplits.map(_.index).map{idx => new CoalescedRDDPartition(idx, prev, Array(idx)) }
} else {
(0 until maxPartitions).map { i =>
val rangeStart = ((i.toLong * prevSplits.length) / maxPartitions).toInt
val rangeEnd = (((i.toLong + 1) * prevSplits.length) / maxPartitions).toInt
new CoalescedRDDPartition(i, prev, (rangeStart until rangeEnd).toArray)
}.toArray
val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
pc.run().zipWithIndex.map {
case (pg, i) =>
val ids = pg.arr.map(_.index).toArray
new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
}
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
split.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentSplit =>
firstParent[T].iterator(parentSplit, context)
override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
firstParent[T].iterator(parentPartition, context)
}
}
@ -78,4 +101,242 @@ class CoalescedRDD[T: ClassManifest](
super.clearDependencies()
prev = null
}
/**
* Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
* then the preferred machine will be one which most parent splits prefer too.
* @param partition
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
}
}
/**
* Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
* this RDD computes one or more of the parent ones. It will produce exactly `maxPartitions` if the
* parent had more than maxPartitions, or fewer if the parent had fewer.
*
* This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
* or to avoid having a large number of small tasks when processing a directory with many files.
*
* If there is no locality information (no preferredLocations) in the parent, then the coalescing
* is very simple: chunk parents that are close in the Array in chunks.
* If there is locality information, it proceeds to pack them with the following four goals:
*
* (1) Balance the groups so they roughly have the same number of parent partitions
* (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer
* (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)
* (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
*
* Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000.
* We assume the final number of desired partitions is small, e.g. less than 1000.
*
* The algorithm tries to assign unique preferred machines to each partition. If the number of
* desired partitions is greater than the number of preferred machines (can happen), it needs to
* start picking duplicate preferred machines. This is determined using coupon collector estimation
* (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
* it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
* bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
* according to locality. (contact alig for questions)
*
*/
private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
val rnd = new scala.util.Random(7919) // keep this class deterministic
// each element of groupArr represents one coalesced partition
val groupArr = ArrayBuffer[PartitionGroup]()
// hash used to check whether some machine is already in groupArr
val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]()
// hash used for the first maxPartitions (to avoid duplicates)
val initialHash = mutable.Set[Partition]()
// determines the tradeoff between load-balancing the partitions sizes and their locality
// e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
val slack = (balanceSlack * prev.partitions.size).toInt
var noLocality = true // if true if no preferredLocations exists for parent RDD
// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
def currPrefLocs(part: Partition): Seq[String] = {
prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
}
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
// the rotator first goes through the first replica copy of each partition, then second, third
// the iterators return type is a tuple: (replicaString, partition)
class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
var it: Iterator[(String, Partition)] = resetIterator()
override val isEmpty = !it.hasNext
// initializes/resets to start iterating from the beginning
def resetIterator() = {
val iterators = (0 to 2).map( x =>
prev.partitions.iterator.flatMap(p => {
if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
} )
)
iterators.reduceLeft((x, y) => x ++ y)
}
// hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
def hasNext(): Boolean = { !isEmpty }
// return the next preferredLocation of some partition of the RDD
def next(): (String, Partition) = {
if (it.hasNext)
it.next()
else {
it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
it.next()
}
}
}
/**
* Sorts and gets the least element of the list associated with key in groupHash
* The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
* @param key string representing a partitioned group on preferred machine key
* @return Option of PartitionGroup that has least elements for key
*/
def getLeastGroupHash(key: String): Option[PartitionGroup] = {
groupHash.get(key).map(_.sortWith(compare).head)
}
def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
if (!initialHash.contains(part)) {
pgroup.arr += part // already assign this element
initialHash += part // needed to avoid assigning partitions to multiple buckets
true
} else { false }
}
/**
* Initializes targetLen partition groups and assigns a preferredLocation
* This uses coupon collector to estimate how many preferredLocations it must rotate through
* until it has seen most of the preferred locations (2 * n log(n))
* @param targetLen
*/
def setupGroups(targetLen: Int) {
val rotIt = new LocationIterator(prev)
// deal with empty case, just create targetLen partition groups with no preferred location
if (!rotIt.hasNext()) {
(1 to targetLen).foreach(x => groupArr += PartitionGroup())
return
}
noLocality = false
// number of iterations needed to be certain that we've seen most preferred locations
val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
var numCreated = 0
var tries = 0
// rotate through until either targetLen unique/distinct preferred locations have been created
// OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
// i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
while (numCreated < targetLen && tries < expectedCoupons2) {
tries += 1
val (nxt_replica, nxt_part) = rotIt.next()
if (!groupHash.contains(nxt_replica)) {
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
addPartToPGroup(nxt_part, pgroup)
groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
numCreated += 1
}
}
while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates
var (nxt_replica, nxt_part) = rotIt.next()
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
groupHash.get(nxt_replica).get += pgroup
var tries = 0
while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
nxt_part = rotIt.next()._2
tries += 1
}
numCreated += 1
}
}
/**
* Takes a parent RDD partition and decides which of the partition groups to put it in
* Takes locality into account, but also uses power of 2 choices to load balance
* It strikes a balance between the two use the balanceSlack variable
* @param p partition (ball to be thrown)
* @return partition group (bin to be put in)
*/
def pickBin(p: Partition): PartitionGroup = {
val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
if (prefPart== None) // if no preferred locations, just use basic power of two
return minPowerOfTwo
val prefPartActual = prefPart.get
if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows
return minPowerOfTwo // prefer balance over locality
else {
return prefPartActual // prefer locality over balance
}
}
def throwBalls() {
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
for ((p,i) <- prev.partitions.zipWithIndex) {
groupArr(i).arr += p
}
} else { // no locality available, then simply split partitions based on positions in array
for(i <- 0 until maxPartitions) {
val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
(rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
}
}
} else {
for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
pickBin(p).arr += p
}
}
}
def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
/**
* Runs the packing algorithm and returns an array of PartitionGroups that if possible are
* load balanced and grouped by locality
* @return array of partition groups
*/
def run(): Array[PartitionGroup] = {
setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
throwBalls() // assign partitions (balls) to each group (bins)
getPartitions
}
}
private[spark] case class PartitionGroup(prefLoc: String = "") {
var arr = mutable.ArrayBuffer[Partition]()
def size = arr.size
}

View file

@ -435,23 +435,24 @@ class DAGScheduler(
if (event != null) {
logDebug("Got event of type " + event.getClass.getName)
}
if (event != null) {
if (processEvent(event)) {
return
this.synchronized { // needed in case other threads makes calls into methods of this class
if (event != null) {
if (processEvent(event)) {
return
}
}
}
val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
// Periodically resubmit failed stages if some map output fetches have failed and we have
// waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
// tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
// the same time, so we want to make sure we've identified all the reduce tasks that depend
// on the failed node.
if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
resubmitFailedStages()
} else {
submitWaitingStages()
val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
// Periodically resubmit failed stages if some map output fetches have failed and we have
// waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
// tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
// the same time, so we want to make sure we've identified all the reduce tasks that depend
// on the failed node.
if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
resubmitFailedStages()
} else {
submitWaitingStages()
}
}
}
}
@ -789,7 +790,14 @@ class DAGScheduler(
visitedRdds.contains(target.rdd)
}
private def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
/**
* Synchronized method that might be called from other threads.
* @param rdd whose partitions are to be looked at
* @param partition to lookup locality information for
* @return list of machines that are preferred by the partition
*/
private[spark]
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (!cached.isEmpty) {

View file

@ -22,7 +22,8 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
import spark.rdd._
import scala.collection.parallel.mutable
class RDDSuite extends FunSuite with SharedSparkContext {
@ -173,6 +174,66 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
}
test("cogrouped RDDs with locality") {
val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
val coal3 = data3.coalesce(3)
val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i+2)).map{ j => "m" + (j%6)})))
val coalesced1 = data.coalesce(3)
assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
val splits = coalesced1.glom().collect().map(_.toList).toList
assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length)
assert(splits.forall(_.length >= 1) === true, "Some partitions were empty")
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
val coalesced4 = data.coalesce(20)
val listOfLists = coalesced4.glom().collect().map(_.toList).toList
val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
assert( sortedList === (1 to 9).
map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
}
test("cogrouped RDDs with locality, large scale (10K partitions)") {
// large scale experiment
import collection.mutable
val rnd = scala.util.Random
val partitions = 10000
val numMachines = 50
val machines = mutable.ListBuffer[String]()
(1 to numMachines).foreach(machines += "m"+_)
val blocks = (1 to partitions).map(i =>
{ (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } )
val data2 = sc.makeRDD(blocks)
val coalesced2 = data2.coalesce(numMachines*2)
// test that you get over 90% locality in each group
val minLocality = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
.foldLeft(1.)((perc, loc) => math.min(perc,loc))
assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
// test that the groups are load balanced with 100 +/- 20 elements in each
val maxImbalance = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
.foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs
val coalesced3 = data3.coalesce(numMachines*2)
val minLocality2 = coalesced3.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
.foldLeft(1.)((perc, loc) => math.min(perc,loc))
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
(minLocality2*100.).toInt + "%")
}
test("zipped RDDs") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)

View file

@ -18,7 +18,7 @@
require 'fileutils'
include FileUtils
if ENV['SKIP_API'] != '1'
if not (ENV['SKIP_API'] == '1' or ENV['SKIP_SCALADOC'] == '1')
# Build Scaladoc for Java/Scala
projects = ["core", "examples", "repl", "bagel", "streaming", "mllib"]

View file

@ -8,22 +8,26 @@ title: Building Spark with Maven
Building Spark using Maven Requires Maven 3 (the build process is tested with Maven 3.0.4) and Java 1.6 or newer.
Building with Maven requires that a Hadoop profile be specified explicitly at the command line, there is no default. There are two profiles to choose from, one for building for Hadoop 1 or Hadoop 2.
## Specifying the Hadoop version ##
for Hadoop 1 (using 0.20.205.0) use:
To enable support for HDFS and other Hadoop-supported storage systems, specify the exact Hadoop version by setting the "hadoop.version" property. If unset, Spark will build against Hadoop 1.0.4 by default.
$ mvn -Phadoop1 clean install
For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions without YARN, use:
# Apache Hadoop 1.2.1
$ mvn -Dhadoop.version=1.2.1 clean install
for Hadoop 2 (using 2.0.0-mr1-cdh4.1.1) use:
# Cloudera CDH 4.2.0 with MapReduce v1
$ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 clean install
$ mvn -Phadoop2 clean install
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, enable the "hadoop2-yarn" profile:
It uses the scala-maven-plugin which supports incremental and continuous compilation. E.g.
# Apache Hadoop 2.0.5-alpha
$ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha clean install
$ mvn -Phadoop2 scala:cc
# Cloudera CDH 4.2.0 with MapReduce v2
$ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 clean install
…should run continuous compilation (i.e. wait for changes). However, this has not been tested extensively.
## Spark Tests in Maven ##
@ -31,11 +35,11 @@ Tests are run by default via the scalatest-maven-plugin. With this you can do th
Skip test execution (but not compilation):
$ mvn -DskipTests -Phadoop2 clean install
$ mvn -Dhadoop.version=... -DskipTests clean install
To run a specific test suite:
$ mvn -Phadoop2 -Dsuites=spark.repl.ReplSuite test
$ mvn -Dhadoop.version=... -Dsuites=spark.repl.ReplSuite test
## Setting up JVM Memory Usage Via Maven ##
@ -53,6 +57,15 @@ To fix these, you can do the following:
export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=128M"
## Continuous Compilation ##
We use the scala-maven-plugin which supports incremental and continuous compilation. E.g.
$ mvn scala:cc
…should run continuous compilation (i.e. wait for changes). However, this has not been tested extensively.
## Using With IntelliJ IDEA ##
This setup works fine in IntelliJ IDEA 11.1.4. After opening the project via the pom.xml file in the project root folder, you only need to activate either the hadoop1 or hadoop2 profile in the "Maven Properties" popout. We have not tried Eclipse/Scala IDE with this.
@ -61,6 +74,6 @@ This setup works fine in IntelliJ IDEA 11.1.4. After opening the project via the
It includes support for building a Debian package containing a 'fat-jar' which includes the repl, the examples and bagel. This can be created by specifying the deb profile:
$ mvn -Phadoop2,deb clean install
$ mvn -Pdeb clean install
The debian package can then be found under repl/target. We added the short commit hash to the file name so that we can distinguish individual packages build for SNAPSHOT versions.

View file

@ -146,7 +146,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.ui.port</td>
<td>33000</td>
<td>3030</td>
<td>
Port for your application's dashboard, which shows memory and workload data
</td>

View file

@ -6,7 +6,7 @@ title: Launching Spark on YARN
Experimental support for running over a [YARN (Hadoop
NextGen)](http://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html)
cluster was added to Spark in version 0.6.0. This was merged into master as part of 0.7 effort.
To build spark core with YARN support, please use the hadoop2-yarn profile.
To build spark with YARN support, please use the hadoop2-yarn profile.
Ex: mvn -Phadoop2-yarn clean install
# Building spark core consolidated jar.
@ -15,18 +15,12 @@ We need a consolidated spark core jar (which bundles all the required dependenci
This can be built either through sbt or via maven.
- Building spark assembled jar via sbt.
It is a manual process of enabling it in project/SparkBuild.scala.
Please comment out the
HADOOP_VERSION, HADOOP_MAJOR_VERSION and HADOOP_YARN
variables before the line 'For Hadoop 2 YARN support'
Next, uncomment the subsequent 3 variable declaration lines (for these three variables) which enable hadoop yarn support.
Enable YARN support by setting `SPARK_WITH_YARN=true` when invoking sbt:
Assembly of the jar Ex:
./sbt/sbt clean assembly
SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_WITH_YARN=true ./sbt/sbt clean assembly
The assembled jar would typically be something like :
`./core/target/spark-core-assembly-0.8.0-SNAPSHOT.jar`
`./yarn/target/spark-yarn-assembly-0.8.0-SNAPSHOT.jar`
- Building spark assembled jar via Maven.
@ -34,16 +28,16 @@ The assembled jar would typically be something like :
Something like this. Ex:
mvn -Phadoop2-yarn clean package -DskipTests=true
mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha clean package -DskipTests=true
This will build the shaded (consolidated) jar. Typically something like :
`./repl-bin/target/spark-repl-bin-<VERSION>-shaded-hadoop2-yarn.jar`
`./yarn/target/spark-yarn-bin-<VERSION>-shaded.jar`
# Preparations
- Building spark core assembled jar (see above).
- Building spark-yarn assembly (see above).
- Your application code must be packaged into a separate JAR file.
If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}` file can be generated by running `sbt/sbt package`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.

View file

@ -7,11 +7,46 @@ A "Spark Streaming" receiver can be a simple network stream, streams of messages
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
## A quick and naive walk-through
### Write a simple receiver
This starts with implementing [NetworkReceiver](#References)
Following is a simple socket text-stream receiver.
{% highlight scala %}
class SocketTextStreamReceiver(host: String,
port: Int
) extends NetworkReceiver[String] {
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
protected def onStart() = {
blocksGenerator.start()
val socket = new Socket(host, port)
val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
var data: String = dataInputStream.readLine()
while (data != null) {
blocksGenerator += data
data = dataInputStream.readLine()
}
}
protected def onStop() {
blocksGenerator.stop()
}
}
{% endhighlight %}
All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.
### An Actor as Receiver.
This starts with implementing [Actor](#References)
Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
@ -46,7 +81,16 @@ All we did here is mixed in trait Receiver and called pushBlock api method to pu
{% endhighlight %}
* Plug-in the actor configuration into the spark streaming context and create a DStream.
* Plug-in the custom receiver into the spark streaming context and create a DStream.
{% highlight scala %}
val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
"localhost", 8445))
{% endhighlight %}
* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.
{% highlight scala %}
@ -99,3 +143,4 @@ _A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
2.[NetworkReceiver](http://spark-project.org/docs/latest/api/streaming/index.html#spark.streaming.dstream.NetworkReceiver)

View file

@ -301,6 +301,9 @@ dstream.checkpoint(checkpointInterval) // checkpointInterval must be a multiple
For DStreams that must be checkpointed (that is, DStreams created by `updateStateByKey` and `reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by default set to a multiple of the DStream's sliding interval such that its at least 10 seconds.
## Customizing Receiver
Spark comes with a built in support for most common usage scenarios where input stream source can be either a network socket stream to support for a few message queues. Apart from that it is also possible to supply your own custom receiver via a convenient API. Find more details at [Custom Receiver Guide](streaming-custom-receivers.html)
# Performance Tuning
Getting the best performance of a Spark Streaming application on a cluster requires a bit of tuning. This section explains a number of the parameters and configurations that can tuned to improve the performance of you application. At a high level, you need to consider two things:
<ol>

View file

@ -46,7 +46,7 @@ export TERM=dumb # Prevents color codes in SBT output
VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z0-9.-]*\).*/\1/')
# Initialize defaults
SPARK_HADOOP_VERSION=1.2.1
SPARK_HADOOP_VERSION=1.0.4
SPARK_WITH_YARN=false
MAKE_TGZ=false

View file

@ -123,10 +123,28 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
// Initialize user and product factors randomly
val seed = new Random().nextInt()
var users = userOutLinks.mapValues(_.elementIds.map(u => randomFactor(rank, seed ^ u)))
var products = productOutLinks.mapValues(_.elementIds.map(p => randomFactor(rank, seed ^ ~p)))
// Initialize user and product factors randomly, but use a deterministic seed for each partition
// so that fault recovery works
val seedGen = new Random()
val seed1 = seedGen.nextInt()
val seed2 = seedGen.nextInt()
// Hash an integer to propagate random bits at all positions, similar to java.util.HashTable
def hash(x: Int): Int = {
val r = x ^ (x >>> 20) ^ (x >>> 12)
r ^ (r >>> 7) ^ (r >>> 4)
}
var users = userOutLinks.mapPartitionsWithIndex { (index, itr) =>
val rand = new Random(hash(seed1 ^ index))
itr.map { case (x, y) =>
(x, y.elementIds.map(_ => randomFactor(rank, rand)))
}
}
var products = productOutLinks.mapPartitionsWithIndex { (index, itr) =>
val rand = new Random(hash(seed2 ^ index))
itr.map { case (x, y) =>
(x, y.elementIds.map(_ => randomFactor(rank, rand)))
}
}
for (iter <- 0 until iterations) {
// perform ALS update
@ -213,11 +231,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
}
/**
* Make a random factor vector with the given seed.
* TODO: Initialize things using mapPartitionsWithIndex to make it faster?
* Make a random factor vector with the given random.
*/
private def randomFactor(rank: Int, seed: Int): Array[Double] = {
val rand = new Random(seed)
private def randomFactor(rank: Int, rand: Random): Array[Double] = {
Array.fill(rank)(rand.nextDouble)
}

28
pom.xml
View file

@ -71,10 +71,10 @@
<java.version>1.5</java.version>
<scala.version>2.9.3</scala.version>
<mesos.version>0.12.1</mesos.version>
<akka.version>2.0.3</akka.version>
<akka.version>2.0.5</akka.version>
<slf4j.version>1.7.2</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>1.2.1</hadoop.version>
<hadoop.version>1.0.4</hadoop.version>
<!-- <hadoop.version>2.0.0-mr1-cdh4.1.2</hadoop.version> -->
<PermGen>64m</PermGen>
@ -157,12 +157,17 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>7.5.3.v20111011</version>
<version>7.6.8.v20121106</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.1</version>
<version>14.0.1</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>1.3.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@ -318,7 +323,7 @@
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
<version>0.8</version>
<version>0.9</version>
<scope>test</scope>
</dependency>
<dependency>
@ -437,6 +442,7 @@
<args>
<arg>-unchecked</arg>
<arg>-optimise</arg>
<arg>-deprecation</arg>
</args>
<jvmArgs>
<jvmArg>-Xms64m</jvmArg>
@ -579,8 +585,8 @@
<properties>
<hadoop.major.version>2</hadoop.major.version>
<!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
<!-- <yarn.version>0.23.7</yarn.version> -->
<yarn.version>2.0.5-alpha</yarn.version>
<!-- <hadoop.version>0.23.7</hadoop.version> -->
<hadoop.version>2.0.5-alpha</hadoop.version>
</properties>
<modules>
@ -607,7 +613,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${yarn.version}</version>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
@ -638,7 +644,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${yarn.version}</version>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
@ -669,7 +675,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${yarn.version}</version>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
@ -700,7 +706,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${yarn.version}</version>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>

View file

@ -24,11 +24,10 @@ import AssemblyKeys._
//import com.jsuereth.pgp.sbtplugin.PgpKeys._
object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
// "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop.
// Note that these variables can be set through the environment variables
// SPARK_HADOOP_VERSION and SPARK_WITH_YARN.
val DEFAULT_HADOOP_VERSION = "1.2.1"
// Hadoop version to build against. For example, "1.0.4" for Apache releases, or
// "2.0.0-mr1-cdh4.2.0" for Cloudera Hadoop. Note that these variables can be set
// through the environment variables SPARK_HADOOP_VERSION and SPARK_WITH_YARN.
val DEFAULT_HADOOP_VERSION = "1.0.4"
val DEFAULT_WITH_YARN = false
// HBase version; set as appropriate.
@ -58,14 +57,14 @@ object SparkBuild extends Build {
// Allows build configuration to be set through environment variables
lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION)
lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match {
lazy val isYarnEnabled = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match {
case None => DEFAULT_WITH_YARN
case Some(v) => v.toBoolean
}
// Conditionally include the yarn sub-project
lazy val maybeYarn = if(isYarnMode) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if(isYarnMode) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarnRef
def sharedSettings = Defaults.defaultSettings ++ Seq(
@ -134,7 +133,6 @@ object SparkBuild extends Build {
*/
libraryDependencies ++= Seq(
"io.netty" % "netty" % "3.5.3.Final",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
@ -165,17 +163,16 @@ object SparkBuild extends Build {
name := "spark-core",
resolvers ++= Seq(
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
),
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "14.0.1",
"com.google.code.findbugs" % "jsr305" % "1.3.9",
"log4j" % "log4j" % "1.2.16",
"log4j" % "log4j" % "1.2.17",
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"commons-daemon" % "commons-daemon" % "1.0.10",
"commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
"com.ning" % "compress-lzf" % "0.8.4",
"org.xerial.snappy" % "snappy-java" % "1.0.5",
"org.ow2.asm" % "asm" % "4.0",
@ -256,7 +253,14 @@ object SparkBuild extends Build {
) ++ assemblySettings ++ extraAssemblySettings
def yarnSettings = sharedSettings ++ Seq(
name := "spark-yarn",
name := "spark-yarn"
) ++ extraYarnSettings ++ assemblySettings ++ extraAssemblySettings
// Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
// if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
def extraYarnSettings = if(isYarnEnabled) yarnEnabledSettings else Seq()
def yarnEnabledSettings = Seq(
libraryDependencies ++= Seq(
// Exclude rule required for all ?
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
@ -264,7 +268,7 @@ object SparkBuild extends Build {
"org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm)
)
) ++ assemblySettings ++ extraAssemblySettings
)
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
mergeStrategy in assembly := {

View file

@ -6,9 +6,9 @@ resolvers += "Spray Repository" at "http://repo.spray.cc/"
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1")
// For Sonatype publishing
//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)

View file

@ -31,6 +31,7 @@ from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
read_from_pickle_file
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from py4j.java_collections import ListConverter, MapConverter
@ -357,6 +358,63 @@ class RDD(object):
3
"""
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
def stats(self):
"""
Return a L{StatCounter} object that captures the mean, variance
and count of the RDD's elements in one operation.
"""
def redFunc(left_counter, right_counter):
return left_counter.mergeStats(right_counter)
return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
def mean(self):
"""
Compute the mean of this RDD's elements.
>>> sc.parallelize([1, 2, 3]).mean()
2.0
"""
return self.stats().mean()
def variance(self):
"""
Compute the variance of this RDD's elements.
>>> sc.parallelize([1, 2, 3]).variance()
0.666...
"""
return self.stats().variance()
def stdev(self):
"""
Compute the standard deviation of this RDD's elements.
>>> sc.parallelize([1, 2, 3]).stdev()
0.816...
"""
return self.stats().stdev()
def sampleStdev(self):
"""
Compute the sample standard deviation of this RDD's elements (which corrects for bias in
estimating the standard deviation by dividing by N-1 instead of N).
>>> sc.parallelize([1, 2, 3]).sampleStdev()
1.0
"""
return self.stats().sampleStdev()
def sampleVariance(self):
"""
Compute the sample variance of this RDD's elements (which corrects for bias in
estimating the variance by dividing by N-1 instead of N).
>>> sc.parallelize([1, 2, 3]).sampleVariance()
1.0
"""
return self.stats().sampleVariance()
def countByValue(self):
"""
@ -777,7 +835,7 @@ def _test():
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs)
(failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)

View file

@ -0,0 +1,109 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file is ported from spark/util/StatCounter.scala
import copy
import math
class StatCounter(object):
def __init__(self, values=[]):
self.n = 0L # Running count of our values
self.mu = 0.0 # Running mean of our values
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)
for v in values:
self.merge(v)
# Add a value into this StatCounter, updating the internal statistics.
def merge(self, value):
delta = value - self.mu
self.n += 1
self.mu += delta / self.n
self.m2 += delta * (value - self.mu)
return self
# Merge another StatCounter into this one, adding up the internal statistics.
def mergeStats(self, other):
if not isinstance(other, StatCounter):
raise Exception("Can only merge Statcounters!")
if other is self: # reference equality holds
self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order
else:
if self.n == 0:
self.mu = other.mu
self.m2 = other.m2
self.n = other.n
elif other.n != 0:
delta = other.mu - self.mu
if other.n * 10 < self.n:
self.mu = self.mu + (delta * other.n) / (self.n + other.n)
elif self.n * 10 < other.n:
self.mu = other.mu - (delta * self.n) / (self.n + other.n)
else:
self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n)
self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n)
self.n += other.n
return self
# Clone this StatCounter
def copy(self):
return copy.deepcopy(self)
def count(self):
return self.n
def mean(self):
return self.mu
def sum(self):
return self.n * self.mu
# Return the variance of the values.
def variance(self):
if self.n == 0:
return float('nan')
else:
return self.m2 / self.n
#
# Return the sample variance, which corrects for bias in estimating the variance by dividing
# by N-1 instead of N.
#
def sampleVariance(self):
if self.n <= 1:
return float('nan')
else:
return self.m2 / (self.n - 1)
# Return the standard deviation of the values.
def stdev(self):
return math.sqrt(self.variance())
#
# Return the sample standard deviation of the values, which corrects for bias in estimating the
# variance by dividing by N-1 instead of N.
#
def sampleStdev(self):
return math.sqrt(self.sampleVariance())
def __repr__(self):
return "(count: %s, mean: %s, stdev: %s)" % (self.count(), self.mean(), self.stdev())

View file

@ -48,6 +48,12 @@
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-mllib</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>

View file

@ -25,4 +25,4 @@ fi
export SPARK_HOME=$(cd "$(dirname $0)/.." 2>&1 >/dev/null ; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=128m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"

View file

@ -183,6 +183,7 @@ class StreamingContext private (
/**
* Create an input stream with any arbitrary user implemented network receiver.
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of NetworkReceiver
*/
def networkStream[T: ClassManifest](
@ -195,6 +196,7 @@ class StreamingContext private (
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel RDD storage level. Defaults to memory-only.

View file

@ -145,8 +145,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
}
/**
* Stops the receiver and reports to exception to the tracker.
* This should be called whenever an exception has happened on any thread
* Stops the receiver and reports exception to the tracker.
* This should be called whenever an exception is to be handled on any thread
* of the receiver.
*/
protected def stopOnError(e: Exception) {
@ -202,7 +202,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
}
/**
* Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
* Batches objects created by a [[spark.streaming.dstream.NetworkReceiver]] and puts them into
* appropriately named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.

View file

@ -45,6 +45,8 @@ object ReceiverSupervisorStrategy {
* A receiver trait to be mixed in with your Actor to gain access to
* pushBlock API.
*
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
*
* @example {{{
* class MyActor extends Actor with Receiver{
* def receive {