2012-10-09 21:38:36 -04:00
|
|
|
package spark
|
2012-06-07 03:25:47 -04:00
|
|
|
|
|
|
|
import spark.partial.BoundedDouble
|
|
|
|
import spark.partial.MeanEvaluator
|
|
|
|
import spark.partial.PartialResult
|
|
|
|
import spark.partial.SumEvaluator
|
|
|
|
import spark.util.StatCounter
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Extra functions available on RDDs of Doubles through an implicit conversion.
|
2012-10-09 21:38:36 -04:00
|
|
|
* Import `spark.SparkContext._` at the top of your program to use these functions.
|
2012-06-07 03:25:47 -04:00
|
|
|
*/
|
|
|
|
class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
|
2012-10-09 21:38:36 -04:00
|
|
|
/** Add up the elements in this RDD. */
|
2012-06-07 03:25:47 -04:00
|
|
|
def sum(): Double = {
|
|
|
|
self.reduce(_ + _)
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Return a [[spark.util.StatCounter]] object that captures the mean, variance and count
|
|
|
|
* of the RDD's elements in one operation.
|
|
|
|
*/
|
2012-06-07 03:25:47 -04:00
|
|
|
def stats(): StatCounter = {
|
|
|
|
self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/** Compute the mean of this RDD's elements. */
|
2012-06-07 03:25:47 -04:00
|
|
|
def mean(): Double = stats().mean
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/** Compute the variance of this RDD's elements. */
|
2012-06-07 03:25:47 -04:00
|
|
|
def variance(): Double = stats().variance
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/** Compute the standard deviation of this RDD's elements. */
|
2012-06-07 03:25:47 -04:00
|
|
|
def stdev(): Double = stats().stdev
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Compute the sample standard deviation of this RDD's elements (which corrects for bias in
|
|
|
|
* estimating the standard deviation by dividing by N-1 instead of N).
|
|
|
|
*/
|
|
|
|
def sampleStdev(): Double = stats().stdev
|
|
|
|
|
|
|
|
/** (Experimental) Approximate operation to return the mean within a timeout. */
|
2012-06-07 03:25:47 -04:00
|
|
|
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
|
|
|
|
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
|
2013-02-18 01:13:26 -05:00
|
|
|
val evaluator = new MeanEvaluator(self.partitions.size, confidence)
|
2012-06-07 03:25:47 -04:00
|
|
|
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/** (Experimental) Approximate operation to return the sum within a timeout. */
|
2012-06-07 03:25:47 -04:00
|
|
|
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
|
|
|
|
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
|
2013-02-18 01:13:26 -05:00
|
|
|
val evaluator = new SumEvaluator(self.partitions.size, confidence)
|
2012-06-07 03:25:47 -04:00
|
|
|
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
|
|
|
|
}
|
|
|
|
}
|