Add Java API
Add distinct() method to RDD. Fix bug in DoubleRDDFunctions.
This commit is contained in:
parent
628bb5ca7f
commit
01dce3f569
|
@ -112,6 +112,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
|
||||||
|
|
||||||
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
|
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
|
||||||
|
|
||||||
|
def distinct(): RDD[T] = map(x => (x, "")).reduceByKey((x, y) => x).map(_._1)
|
||||||
|
|
||||||
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
|
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
|
||||||
new SampledRDD(this, withReplacement, fraction, seed)
|
new SampledRDD(this, withReplacement, fraction, seed)
|
||||||
|
|
||||||
|
@ -359,6 +361,14 @@ class MappedRDD[U: ClassManifest, T: ClassManifest](
|
||||||
override def compute(split: Split) = prev.iterator(split).map(f)
|
override def compute(split: Split) = prev.iterator(split).map(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class PartitioningPreservingMappedRDD[U: ClassManifest, T: ClassManifest](
|
||||||
|
prev: RDD[T],
|
||||||
|
f: T => U)
|
||||||
|
extends MappedRDD[U, T](prev, f) {
|
||||||
|
|
||||||
|
override val partitioner = prev.partitioner
|
||||||
|
}
|
||||||
|
|
||||||
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
|
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
|
||||||
prev: RDD[T],
|
prev: RDD[T],
|
||||||
f: T => TraversableOnce[U])
|
f: T => TraversableOnce[U])
|
||||||
|
|
|
@ -48,10 +48,13 @@ import spark.storage.BlockManagerMaster
|
||||||
class SparkContext(
|
class SparkContext(
|
||||||
master: String,
|
master: String,
|
||||||
frameworkName: String,
|
frameworkName: String,
|
||||||
val sparkHome: String = null,
|
val sparkHome: String,
|
||||||
val jars: Seq[String] = Nil)
|
val jars: Seq[String])
|
||||||
extends Logging {
|
extends Logging {
|
||||||
|
|
||||||
|
def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil)
|
||||||
|
|
||||||
|
|
||||||
// Ensure logging is initialized before we spawn any threads
|
// Ensure logging is initialized before we spawn any threads
|
||||||
initLogging()
|
initLogging()
|
||||||
|
|
||||||
|
|
63
core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
Normal file
63
core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
package spark.api.java
|
||||||
|
|
||||||
|
import spark.RDD
|
||||||
|
import spark.SparkContext.doubleRDDToDoubleRDDFunctions
|
||||||
|
import spark.api.java.function.{Function => JFunction}
|
||||||
|
|
||||||
|
import java.lang.Double
|
||||||
|
|
||||||
|
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
|
||||||
|
|
||||||
|
val classManifest = implicitly[ClassManifest[Double]]
|
||||||
|
|
||||||
|
lazy val rdd = srdd.map(x => Double.valueOf(x))
|
||||||
|
|
||||||
|
def wrapRDD: (RDD[Double]) => JavaDoubleRDD = rdd => new JavaDoubleRDD(rdd.map(_.doubleValue))
|
||||||
|
|
||||||
|
// Common RDD functions
|
||||||
|
|
||||||
|
import JavaDoubleRDD.fromRDD
|
||||||
|
|
||||||
|
def cache(): JavaDoubleRDD = fromRDD(srdd.cache())
|
||||||
|
|
||||||
|
// first() has to be overriden here in order for its return type to be Double instead of Object.
|
||||||
|
override def first(): Double = srdd.first()
|
||||||
|
|
||||||
|
// Transformations (return a new RDD)
|
||||||
|
|
||||||
|
def distinct() = fromRDD(srdd.distinct())
|
||||||
|
|
||||||
|
def filter(f: JFunction[Double, java.lang.Boolean]) =
|
||||||
|
fromRDD(srdd.filter(x => f(x).booleanValue()))
|
||||||
|
|
||||||
|
def sample(withReplacement: Boolean, fraction: Double, seed: Int) =
|
||||||
|
fromRDD(srdd.sample(withReplacement, fraction, seed))
|
||||||
|
|
||||||
|
def union(other: JavaDoubleRDD) = fromRDD(srdd.union(other.srdd))
|
||||||
|
|
||||||
|
// Double RDD functions
|
||||||
|
|
||||||
|
def sum() = srdd.sum()
|
||||||
|
|
||||||
|
def stats() = srdd.stats()
|
||||||
|
|
||||||
|
def mean() = srdd.mean()
|
||||||
|
|
||||||
|
def variance() = srdd.variance()
|
||||||
|
|
||||||
|
def stdev() = srdd.stdev()
|
||||||
|
|
||||||
|
def meanApprox(timeout: Long, confidence: Double) = srdd.meanApprox(timeout, confidence)
|
||||||
|
|
||||||
|
def meanApprox(timeout: Long) = srdd.meanApprox(timeout)
|
||||||
|
|
||||||
|
def sumApprox(timeout: Long, confidence: Double) = srdd.sumApprox(timeout, confidence)
|
||||||
|
|
||||||
|
def sumApprox(timeout: Long) = srdd.sumApprox(timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
object JavaDoubleRDD {
|
||||||
|
def fromRDD(rdd: RDD[scala.Double]): JavaDoubleRDD = new JavaDoubleRDD(rdd)
|
||||||
|
|
||||||
|
implicit def toRDD(rdd: JavaDoubleRDD): RDD[scala.Double] = rdd.srdd
|
||||||
|
}
|
272
core/src/main/scala/spark/api/java/JavaPairRDD.scala
Normal file
272
core/src/main/scala/spark/api/java/JavaPairRDD.scala
Normal file
|
@ -0,0 +1,272 @@
|
||||||
|
package spark.api.java
|
||||||
|
|
||||||
|
import spark.SparkContext.rddToPairRDDFunctions
|
||||||
|
import spark.api.java.function.{Function2 => JFunction2}
|
||||||
|
import spark.api.java.function.{Function => JFunction}
|
||||||
|
import spark.partial.BoundedDouble
|
||||||
|
import spark.partial.PartialResult
|
||||||
|
import spark._
|
||||||
|
|
||||||
|
import java.util.{List => JList}
|
||||||
|
import java.util.Comparator
|
||||||
|
|
||||||
|
import scala.Tuple2
|
||||||
|
import scala.collection.Map
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapred.JobConf
|
||||||
|
import org.apache.hadoop.mapred.OutputFormat
|
||||||
|
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
|
||||||
|
import org.apache.hadoop.conf.Configuration
|
||||||
|
|
||||||
|
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
|
||||||
|
implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
|
||||||
|
|
||||||
|
def wrapRDD = JavaPairRDD.fromRDD _
|
||||||
|
|
||||||
|
def classManifest = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
|
||||||
|
|
||||||
|
import JavaPairRDD._
|
||||||
|
|
||||||
|
// Common RDD functions
|
||||||
|
|
||||||
|
def cache() = new JavaPairRDD[K, V](rdd.cache())
|
||||||
|
|
||||||
|
// Transformations (return a new RDD)
|
||||||
|
|
||||||
|
def distinct() = new JavaPairRDD[K, V](rdd.distinct())
|
||||||
|
|
||||||
|
def filter(f: Function[(K, V), java.lang.Boolean]) =
|
||||||
|
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
|
||||||
|
|
||||||
|
def sample(withReplacement: Boolean, fraction: Double, seed: Int) =
|
||||||
|
new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))
|
||||||
|
|
||||||
|
def union(other: JavaPairRDD[K, V]) = new JavaPairRDD[K, V](rdd.union(other.rdd))
|
||||||
|
|
||||||
|
// first() has to be overridden here so that the generated method has the signature
|
||||||
|
// 'public scala.Tuple2 first()'; if the trait's definition is used,
|
||||||
|
// then the method has the signature 'public java.lang.Object first()',
|
||||||
|
// causing NoSuchMethodErrors at runtime.
|
||||||
|
override def first(): (K, V) = rdd.first()
|
||||||
|
|
||||||
|
// Pair RDD functions
|
||||||
|
|
||||||
|
def combineByKey[C](createCombiner: Function[V, C],
|
||||||
|
mergeValue: JFunction2[C, V, C],
|
||||||
|
mergeCombiners: JFunction2[C, C, C],
|
||||||
|
partitioner: Partitioner): JavaPairRDD[K, C] = {
|
||||||
|
implicit val cm: ClassManifest[C] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
|
||||||
|
fromRDD(rdd.combineByKey(
|
||||||
|
createCombiner,
|
||||||
|
mergeValue,
|
||||||
|
mergeCombiners,
|
||||||
|
partitioner
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
def combineByKey[C](createCombiner: JFunction[V, C],
|
||||||
|
mergeValue: JFunction2[C, V, C],
|
||||||
|
mergeCombiners: JFunction2[C, C, C],
|
||||||
|
numSplits: Int): JavaPairRDD[K, C] =
|
||||||
|
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
|
||||||
|
|
||||||
|
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
|
||||||
|
fromRDD(rdd.reduceByKey(partitioner, func))
|
||||||
|
|
||||||
|
def reduceByKeyLocally(func: JFunction2[V, V, V]) = {
|
||||||
|
rdd.reduceByKeyLocally(func)
|
||||||
|
}
|
||||||
|
|
||||||
|
def countByKey() = rdd.countByKey()
|
||||||
|
|
||||||
|
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
|
||||||
|
rdd.countByKeyApprox(timeout).map(mapAsJavaMap)
|
||||||
|
|
||||||
|
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
|
||||||
|
: PartialResult[java.util.Map[K, BoundedDouble]] =
|
||||||
|
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
|
||||||
|
|
||||||
|
def reduceByKey(func: JFunction2[V, V, V], numSplits: Int): JavaPairRDD[K, V] =
|
||||||
|
fromRDD(rdd.reduceByKey(func, numSplits))
|
||||||
|
|
||||||
|
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
|
||||||
|
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
|
||||||
|
|
||||||
|
def groupByKey(numSplits: Int): JavaPairRDD[K, JList[V]] =
|
||||||
|
fromRDD(groupByResultToJava(rdd.groupByKey(numSplits)))
|
||||||
|
|
||||||
|
def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
|
||||||
|
fromRDD(rdd.partitionBy(partitioner))
|
||||||
|
|
||||||
|
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
|
||||||
|
fromRDD(rdd.join(other, partitioner))
|
||||||
|
|
||||||
|
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
|
||||||
|
: JavaPairRDD[K, (V, Option[W])] =
|
||||||
|
fromRDD(rdd.leftOuterJoin(other, partitioner))
|
||||||
|
|
||||||
|
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
|
||||||
|
: JavaPairRDD[K, (Option[V], W)] =
|
||||||
|
fromRDD(rdd.rightOuterJoin(other, partitioner))
|
||||||
|
|
||||||
|
def combineByKey[C](createCombiner: JFunction[V, C],
|
||||||
|
mergeValue: JFunction2[C, V, C],
|
||||||
|
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
|
||||||
|
implicit val cm: ClassManifest[C] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
|
||||||
|
fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners))
|
||||||
|
}
|
||||||
|
|
||||||
|
def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
|
||||||
|
val partitioner = rdd.defaultPartitioner(rdd)
|
||||||
|
fromRDD(reduceByKey(partitioner, func))
|
||||||
|
}
|
||||||
|
|
||||||
|
def groupByKey(): JavaPairRDD[K, JList[V]] =
|
||||||
|
fromRDD(groupByResultToJava(rdd.groupByKey()))
|
||||||
|
|
||||||
|
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] =
|
||||||
|
fromRDD(rdd.join(other))
|
||||||
|
|
||||||
|
def join[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, W)] =
|
||||||
|
fromRDD(rdd.join(other, numSplits))
|
||||||
|
|
||||||
|
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] =
|
||||||
|
fromRDD(rdd.leftOuterJoin(other))
|
||||||
|
|
||||||
|
def leftOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, Option[W])] =
|
||||||
|
fromRDD(rdd.leftOuterJoin(other, numSplits))
|
||||||
|
|
||||||
|
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] =
|
||||||
|
fromRDD(rdd.rightOuterJoin(other))
|
||||||
|
|
||||||
|
def rightOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (Option[V], W)] =
|
||||||
|
fromRDD(rdd.rightOuterJoin(other, numSplits))
|
||||||
|
|
||||||
|
def collectAsMap(): Map[K, V] = rdd.collectAsMap()
|
||||||
|
|
||||||
|
def mapValues[U](f: Function[V, U]): JavaPairRDD[K, U] = {
|
||||||
|
implicit val cm: ClassManifest[U] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
|
||||||
|
fromRDD(rdd.mapValues(f))
|
||||||
|
}
|
||||||
|
|
||||||
|
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
def fn = (x: V) => f.apply(x).asScala
|
||||||
|
implicit val cm: ClassManifest[U] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
|
||||||
|
fromRDD(rdd.flatMapValues(fn))
|
||||||
|
}
|
||||||
|
|
||||||
|
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
|
||||||
|
: JavaPairRDD[K, (JList[V], JList[W])] =
|
||||||
|
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
|
||||||
|
|
||||||
|
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
|
||||||
|
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
|
||||||
|
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
|
||||||
|
|
||||||
|
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
|
||||||
|
fromRDD(cogroupResultToJava(rdd.cogroup(other)))
|
||||||
|
|
||||||
|
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
|
||||||
|
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
|
||||||
|
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
|
||||||
|
|
||||||
|
def cogroup[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (JList[V], JList[W])]
|
||||||
|
= fromRDD(cogroupResultToJava(rdd.cogroup(other, numSplits)))
|
||||||
|
|
||||||
|
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numSplits: Int)
|
||||||
|
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
|
||||||
|
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numSplits)))
|
||||||
|
|
||||||
|
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
|
||||||
|
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
|
||||||
|
|
||||||
|
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
|
||||||
|
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
|
||||||
|
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
|
||||||
|
|
||||||
|
def lookup(key: K): JList[V] = seqAsJavaList(rdd.lookup(key))
|
||||||
|
|
||||||
|
def saveAsHadoopFile[F <: OutputFormat[_, _]](
|
||||||
|
path: String,
|
||||||
|
keyClass: Class[_],
|
||||||
|
valueClass: Class[_],
|
||||||
|
outputFormatClass: Class[F],
|
||||||
|
conf: JobConf) {
|
||||||
|
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
def saveAsHadoopFile[F <: OutputFormat[_, _]](
|
||||||
|
path: String,
|
||||||
|
keyClass: Class[_],
|
||||||
|
valueClass: Class[_],
|
||||||
|
outputFormatClass: Class[F]) {
|
||||||
|
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
|
||||||
|
}
|
||||||
|
|
||||||
|
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
|
||||||
|
path: String,
|
||||||
|
keyClass: Class[_],
|
||||||
|
valueClass: Class[_],
|
||||||
|
outputFormatClass: Class[F],
|
||||||
|
conf: Configuration) {
|
||||||
|
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
|
||||||
|
path: String,
|
||||||
|
keyClass: Class[_],
|
||||||
|
valueClass: Class[_],
|
||||||
|
outputFormatClass: Class[F]) {
|
||||||
|
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
|
||||||
|
}
|
||||||
|
|
||||||
|
def saveAsHadoopDataset(conf: JobConf) {
|
||||||
|
rdd.saveAsHadoopDataset(conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Ordered RDD Functions
|
||||||
|
def sortByKey(): JavaPairRDD[K, V] = sortByKey(true)
|
||||||
|
|
||||||
|
def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = {
|
||||||
|
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
|
||||||
|
sortByKey(comp, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V] = sortByKey(comp, true)
|
||||||
|
|
||||||
|
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
|
||||||
|
class KeyOrdering(val a: K) extends Ordered[K] {
|
||||||
|
override def compare(b: K) = comp.compare(a, b)
|
||||||
|
}
|
||||||
|
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
|
||||||
|
fromRDD(new OrderedRDDFunctions(rdd).sortByKey(ascending))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object JavaPairRDD {
|
||||||
|
def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassManifest[K],
|
||||||
|
vcm: ClassManifest[T]): RDD[(K, JList[T])] =
|
||||||
|
new PartitioningPreservingMappedRDD(rdd, (x: (K, Seq[T])) => (x._1, seqAsJavaList(x._2)))
|
||||||
|
|
||||||
|
def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])
|
||||||
|
: RDD[(K, (JList[V], JList[W]))] = new PartitioningPreservingMappedRDD(rdd,
|
||||||
|
(x: (K, (Seq[V], Seq[W]))) => (x._1, (seqAsJavaList(x._2._1), seqAsJavaList(x._2._2))))
|
||||||
|
|
||||||
|
def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))])
|
||||||
|
: RDD[(K, (JList[V], JList[W1], JList[W2]))] = new PartitioningPreservingMappedRDD(rdd,
|
||||||
|
(x: (K, (Seq[V], Seq[W1], Seq[W2]))) => (x._1, (seqAsJavaList(x._2._1),
|
||||||
|
seqAsJavaList(x._2._2),
|
||||||
|
seqAsJavaList(x._2._3))))
|
||||||
|
|
||||||
|
def fromRDD[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
|
||||||
|
new JavaPairRDD[K, V](rdd)
|
||||||
|
|
||||||
|
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
|
||||||
|
}
|
34
core/src/main/scala/spark/api/java/JavaRDD.scala
Normal file
34
core/src/main/scala/spark/api/java/JavaRDD.scala
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
package spark.api.java
|
||||||
|
|
||||||
|
import spark._
|
||||||
|
import spark.api.java.function.{Function => JFunction}
|
||||||
|
|
||||||
|
class JavaRDD[T](val rdd: RDD[T])(implicit val classManifest: ClassManifest[T]) extends
|
||||||
|
JavaRDDLike[T, JavaRDD[T]] {
|
||||||
|
|
||||||
|
def wrapRDD = JavaRDD.fromRDD
|
||||||
|
|
||||||
|
// Common RDD functions
|
||||||
|
|
||||||
|
def cache() = wrapRDD(rdd.cache())
|
||||||
|
|
||||||
|
// Transformations (return a new RDD)
|
||||||
|
|
||||||
|
def distinct() = wrapRDD(rdd.distinct())
|
||||||
|
|
||||||
|
def filter(f: JFunction[T, java.lang.Boolean]) = wrapRDD(rdd.filter((x => f(x).booleanValue())))
|
||||||
|
|
||||||
|
def sample(withReplacement: Boolean, fraction: Double, seed: Int) =
|
||||||
|
wrapRDD(rdd.sample(withReplacement, fraction, seed))
|
||||||
|
|
||||||
|
def union(other: JavaRDD[T]) = wrapRDD(rdd.union(other.rdd))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
object JavaRDD {
|
||||||
|
|
||||||
|
implicit def fromRDD[T: ClassManifest](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
|
||||||
|
|
||||||
|
implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
|
||||||
|
}
|
||||||
|
|
149
core/src/main/scala/spark/api/java/JavaRDDLike.scala
Normal file
149
core/src/main/scala/spark/api/java/JavaRDDLike.scala
Normal file
|
@ -0,0 +1,149 @@
|
||||||
|
package spark.api.java
|
||||||
|
|
||||||
|
import spark.{PartitioningPreservingMappedRDD, Split, RDD}
|
||||||
|
import spark.api.java.JavaPairRDD._
|
||||||
|
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
|
||||||
|
import spark.partial.{PartialResult, BoundedDouble}
|
||||||
|
|
||||||
|
import java.util.{List => JList}
|
||||||
|
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
import java.lang
|
||||||
|
import scala.Tuple2
|
||||||
|
|
||||||
|
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
||||||
|
def wrapRDD: (RDD[T] => This)
|
||||||
|
|
||||||
|
implicit def classManifest: ClassManifest[T]
|
||||||
|
|
||||||
|
def rdd: RDD[T]
|
||||||
|
|
||||||
|
def context = rdd.context
|
||||||
|
|
||||||
|
def id = rdd.id
|
||||||
|
|
||||||
|
def getStorageLevel = rdd.getStorageLevel
|
||||||
|
|
||||||
|
def iterator(split: Split): java.util.Iterator[T] = asJavaIterator(rdd.iterator(split))
|
||||||
|
|
||||||
|
// Transformations (return a new RDD)
|
||||||
|
|
||||||
|
def map[R](f: JFunction[T, R]): JavaRDD[R] =
|
||||||
|
new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
|
||||||
|
|
||||||
|
def map[R](f: DoubleFunction[T]): JavaDoubleRDD =
|
||||||
|
new JavaDoubleRDD(rdd.map(x => f(x).doubleValue()))
|
||||||
|
|
||||||
|
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
|
||||||
|
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
|
||||||
|
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
|
||||||
|
}
|
||||||
|
|
||||||
|
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
def fn = (x: T) => f.apply(x).asScala
|
||||||
|
JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType())
|
||||||
|
}
|
||||||
|
|
||||||
|
def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
def fn = (x: T) => f.apply(x).asScala
|
||||||
|
new JavaDoubleRDD(new PartitioningPreservingMappedRDD(rdd.flatMap(fn),
|
||||||
|
((x: java.lang.Double) => x.doubleValue())))
|
||||||
|
}
|
||||||
|
|
||||||
|
def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
def fn = (x: T) => f.apply(x).asScala
|
||||||
|
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
|
||||||
|
new JavaPairRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
|
||||||
|
}
|
||||||
|
|
||||||
|
def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] =
|
||||||
|
JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest,
|
||||||
|
other.classManifest)
|
||||||
|
|
||||||
|
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
|
||||||
|
implicit val kcm: ClassManifest[K] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
|
||||||
|
implicit val vcm: ClassManifest[JList[T]] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
|
||||||
|
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm)
|
||||||
|
}
|
||||||
|
|
||||||
|
def groupBy[K](f: JFunction[T, K], numSplits: Int): JavaPairRDD[K, JList[T]] = {
|
||||||
|
implicit val kcm: ClassManifest[K] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
|
||||||
|
implicit val vcm: ClassManifest[JList[T]] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
|
||||||
|
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numSplits)(f.returnType)))(kcm, vcm)
|
||||||
|
}
|
||||||
|
|
||||||
|
def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
|
||||||
|
|
||||||
|
def pipe(command: JList[String]): JavaRDD[String] =
|
||||||
|
rdd.pipe(asScalaBuffer(command))
|
||||||
|
|
||||||
|
def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
|
||||||
|
rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
|
||||||
|
|
||||||
|
// Actions (launch a job to return a value to the user program)
|
||||||
|
|
||||||
|
def foreach(f: VoidFunction[T]) {
|
||||||
|
val cleanF = rdd.context.clean(f)
|
||||||
|
rdd.foreach(cleanF)
|
||||||
|
}
|
||||||
|
|
||||||
|
def collect(): JList[T] = {
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
val arr: java.util.Collection[T] = rdd.collect().toSeq
|
||||||
|
new java.util.ArrayList(arr)
|
||||||
|
}
|
||||||
|
|
||||||
|
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
|
||||||
|
|
||||||
|
def fold(zeroValue: T)(f: JFunction2[T, T, T]): T =
|
||||||
|
rdd.fold(zeroValue)(f)
|
||||||
|
|
||||||
|
def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],
|
||||||
|
combOp: JFunction2[U, U, U]): U =
|
||||||
|
rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType)
|
||||||
|
|
||||||
|
def count() = rdd.count()
|
||||||
|
|
||||||
|
def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
|
||||||
|
rdd.countApprox(timeout, confidence)
|
||||||
|
|
||||||
|
def countApprox(timeout: Long): PartialResult[BoundedDouble] =
|
||||||
|
rdd.countApprox(timeout)
|
||||||
|
|
||||||
|
def countByValue(): java.util.Map[T, java.lang.Long] =
|
||||||
|
mapAsJavaMap(rdd.countByValue().map((x => (x._1, new lang.Long(x._2)))))
|
||||||
|
|
||||||
|
def countByValueApprox(
|
||||||
|
timeout: Long,
|
||||||
|
confidence: Double
|
||||||
|
): PartialResult[java.util.Map[T, BoundedDouble]] =
|
||||||
|
rdd.countByValueApprox(timeout, confidence).map(mapAsJavaMap)
|
||||||
|
|
||||||
|
def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] =
|
||||||
|
rdd.countByValueApprox(timeout).map(mapAsJavaMap)
|
||||||
|
|
||||||
|
def take(num: Int): JList[T] = {
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
val arr: java.util.Collection[T] = rdd.take(num).toSeq
|
||||||
|
new java.util.ArrayList(arr)
|
||||||
|
}
|
||||||
|
|
||||||
|
def takeSample(withReplacement: Boolean, num: Int, seed: Int): JList[T] = {
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
val arr: java.util.Collection[T] = rdd.takeSample(withReplacement, num, seed).toSeq
|
||||||
|
new java.util.ArrayList(arr)
|
||||||
|
}
|
||||||
|
|
||||||
|
def first(): T = rdd.first()
|
||||||
|
|
||||||
|
def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
|
||||||
|
|
||||||
|
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
|
||||||
|
}
|
219
core/src/main/scala/spark/api/java/JavaSparkContext.scala
Normal file
219
core/src/main/scala/spark/api/java/JavaSparkContext.scala
Normal file
|
@ -0,0 +1,219 @@
|
||||||
|
package spark.api.java
|
||||||
|
|
||||||
|
import spark.{AccumulatorParam, RDD, SparkContext}
|
||||||
|
import spark.SparkContext.IntAccumulatorParam
|
||||||
|
import spark.SparkContext.DoubleAccumulatorParam
|
||||||
|
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration
|
||||||
|
import org.apache.hadoop.mapred.InputFormat
|
||||||
|
import org.apache.hadoop.mapred.JobConf
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
||||||
|
|
||||||
|
|
||||||
|
import scala.collection.JavaConversions
|
||||||
|
|
||||||
|
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
|
||||||
|
|
||||||
|
def this(master: String, frameworkName: String) = this(new SparkContext(master, frameworkName))
|
||||||
|
|
||||||
|
val env = sc.env
|
||||||
|
|
||||||
|
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
|
||||||
|
implicit val cm: ClassManifest[T] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
|
||||||
|
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
|
||||||
|
}
|
||||||
|
|
||||||
|
def parallelize[T](list: java.util.List[T]): JavaRDD[T] =
|
||||||
|
parallelize(list, sc.defaultParallelism)
|
||||||
|
|
||||||
|
|
||||||
|
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
|
||||||
|
: JavaPairRDD[K, V] = {
|
||||||
|
implicit val kcm: ClassManifest[K] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
|
||||||
|
implicit val vcm: ClassManifest[V] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
|
||||||
|
JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
|
||||||
|
}
|
||||||
|
|
||||||
|
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]]): JavaPairRDD[K, V] =
|
||||||
|
parallelizePairs(list, sc.defaultParallelism)
|
||||||
|
|
||||||
|
def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
|
||||||
|
JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()),
|
||||||
|
numSlices))
|
||||||
|
|
||||||
|
def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =
|
||||||
|
parallelizeDoubles(list, sc.defaultParallelism)
|
||||||
|
|
||||||
|
def textFile(path: String): JavaRDD[String] = sc.textFile(path)
|
||||||
|
|
||||||
|
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
|
||||||
|
|
||||||
|
/**Get an RDD for a Hadoop SequenceFile with given key and value types */
|
||||||
|
def sequenceFile[K, V](path: String,
|
||||||
|
keyClass: Class[K],
|
||||||
|
valueClass: Class[V],
|
||||||
|
minSplits: Int
|
||||||
|
): JavaPairRDD[K, V] = {
|
||||||
|
implicit val kcm = ClassManifest.fromClass(keyClass)
|
||||||
|
implicit val vcm = ClassManifest.fromClass(valueClass)
|
||||||
|
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
|
||||||
|
}
|
||||||
|
|
||||||
|
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
|
||||||
|
JavaPairRDD[K, V] = {
|
||||||
|
implicit val kcm = ClassManifest.fromClass(keyClass)
|
||||||
|
implicit val vcm = ClassManifest.fromClass(valueClass)
|
||||||
|
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
|
||||||
|
* BytesWritable values that contain a serialized partition. This is still an experimental storage
|
||||||
|
* format and may not be supported exactly as is in future Spark releases. It will also be pretty
|
||||||
|
* slow if you use the default serializer (Java serialization), though the nice thing about it is
|
||||||
|
* that there's very little effort required to save arbitrary objects.
|
||||||
|
*/
|
||||||
|
def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
|
||||||
|
implicit val cm: ClassManifest[T] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
|
||||||
|
sc.objectFile(path, minSplits)(cm)
|
||||||
|
}
|
||||||
|
|
||||||
|
def objectFile[T](path: String): JavaRDD[T] = {
|
||||||
|
implicit val cm: ClassManifest[T] =
|
||||||
|
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
|
||||||
|
sc.objectFile(path)(cm)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
|
||||||
|
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
|
||||||
|
* etc).
|
||||||
|
*/
|
||||||
|
def hadoopRDD[K, V, F <: InputFormat[K, V]](
|
||||||
|
conf: JobConf,
|
||||||
|
inputFormatClass: Class[F],
|
||||||
|
keyClass: Class[K],
|
||||||
|
valueClass: Class[V],
|
||||||
|
minSplits: Int
|
||||||
|
): JavaPairRDD[K, V] = {
|
||||||
|
implicit val kcm = ClassManifest.fromClass(keyClass)
|
||||||
|
implicit val vcm = ClassManifest.fromClass(valueClass)
|
||||||
|
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
|
||||||
|
}
|
||||||
|
|
||||||
|
def hadoopRDD[K, V, F <: InputFormat[K, V]](
|
||||||
|
conf: JobConf,
|
||||||
|
inputFormatClass: Class[F],
|
||||||
|
keyClass: Class[K],
|
||||||
|
valueClass: Class[V]
|
||||||
|
): JavaPairRDD[K, V] = {
|
||||||
|
implicit val kcm = ClassManifest.fromClass(keyClass)
|
||||||
|
implicit val vcm = ClassManifest.fromClass(valueClass)
|
||||||
|
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**Get an RDD for a Hadoop file with an arbitrary InputFormat */
|
||||||
|
def hadoopFile[K, V, F <: InputFormat[K, V]](
|
||||||
|
path: String,
|
||||||
|
inputFormatClass: Class[F],
|
||||||
|
keyClass: Class[K],
|
||||||
|
valueClass: Class[V],
|
||||||
|
minSplits: Int
|
||||||
|
): JavaPairRDD[K, V] = {
|
||||||
|
implicit val kcm = ClassManifest.fromClass(keyClass)
|
||||||
|
implicit val vcm = ClassManifest.fromClass(valueClass)
|
||||||
|
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
|
||||||
|
}
|
||||||
|
|
||||||
|
def hadoopFile[K, V, F <: InputFormat[K, V]](
|
||||||
|
path: String,
|
||||||
|
inputFormatClass: Class[F],
|
||||||
|
keyClass: Class[K],
|
||||||
|
valueClass: Class[V]
|
||||||
|
): JavaPairRDD[K, V] = {
|
||||||
|
implicit val kcm = ClassManifest.fromClass(keyClass)
|
||||||
|
implicit val vcm = ClassManifest.fromClass(valueClass)
|
||||||
|
new JavaPairRDD(sc.hadoopFile(path,
|
||||||
|
inputFormatClass, keyClass, valueClass))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
|
||||||
|
* and extra configuration options to pass to the input format.
|
||||||
|
*/
|
||||||
|
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
|
||||||
|
path: String,
|
||||||
|
fClass: Class[F],
|
||||||
|
kClass: Class[K],
|
||||||
|
vClass: Class[V],
|
||||||
|
conf: Configuration): JavaPairRDD[K, V] = {
|
||||||
|
implicit val kcm = ClassManifest.fromClass(kClass)
|
||||||
|
implicit val vcm = ClassManifest.fromClass(vClass)
|
||||||
|
new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
|
||||||
|
* and extra configuration options to pass to the input format.
|
||||||
|
*/
|
||||||
|
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
|
||||||
|
conf: Configuration,
|
||||||
|
fClass: Class[F],
|
||||||
|
kClass: Class[K],
|
||||||
|
vClass: Class[V]): JavaPairRDD[K, V] = {
|
||||||
|
implicit val kcm = ClassManifest.fromClass(kClass)
|
||||||
|
implicit val vcm = ClassManifest.fromClass(vClass)
|
||||||
|
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
def union[T](jrdds: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
|
||||||
|
val rdds: Seq[RDD[T]] = asScalaBuffer(jrdds).map(_.rdd)
|
||||||
|
implicit val cm: ClassManifest[T] = jrdds.head.classManifest
|
||||||
|
sc.union(rdds: _*)(cm)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
def union[K, V](jrdds: java.util.List[JavaPairRDD[K, V]]): JavaPairRDD[K, V] = {
|
||||||
|
val rdds: Seq[RDD[(K, V)]] = asScalaBuffer(jrdds).map(_.rdd)
|
||||||
|
implicit val cm: ClassManifest[(K, V)] = jrdds.head.classManifest
|
||||||
|
implicit val kcm: ClassManifest[K] = jrdds.head.kManifest
|
||||||
|
implicit val vcm: ClassManifest[V] = jrdds.head.vManifest
|
||||||
|
new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
def union(jrdds: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
|
||||||
|
val rdds: Seq[RDD[Double]] = asScalaBuffer(jrdds).map(_.srdd)
|
||||||
|
new JavaDoubleRDD(sc.union(rdds: _*))
|
||||||
|
}
|
||||||
|
|
||||||
|
def intAccumulator(initialValue: Int) = sc.accumulator(initialValue)(IntAccumulatorParam)
|
||||||
|
|
||||||
|
def doubleAccumulator(initialValue: Double) =
|
||||||
|
sc.accumulator(initialValue)(DoubleAccumulatorParam)
|
||||||
|
|
||||||
|
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]) =
|
||||||
|
sc.accumulator(initialValue)(accumulatorParam)
|
||||||
|
|
||||||
|
def broadcast[T](value: T) = sc.broadcast(value)
|
||||||
|
|
||||||
|
def stop() {
|
||||||
|
sc.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
def getSparkHome() = sc.getSparkHome()
|
||||||
|
}
|
||||||
|
|
||||||
|
object JavaSparkContext {
|
||||||
|
implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
|
||||||
|
|
||||||
|
implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
package spark.api.java;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
// See
|
||||||
|
// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
|
||||||
|
abstract class JavaSparkContextVarargsWorkaround {
|
||||||
|
public <T> JavaRDD<T> union(JavaRDD<T> ... rdds) {
|
||||||
|
return union(Arrays.asList(rdds));
|
||||||
|
}
|
||||||
|
|
||||||
|
public JavaDoubleRDD union(JavaDoubleRDD ... rdds) {
|
||||||
|
return union(Arrays.asList(rdds));
|
||||||
|
}
|
||||||
|
|
||||||
|
public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> ... rdds) {
|
||||||
|
return union(Arrays.asList(rdds));
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract public <T> JavaRDD<T> union(List<JavaRDD<T>> rdds);
|
||||||
|
abstract public JavaDoubleRDD union(List<JavaDoubleRDD> rdds);
|
||||||
|
abstract public <K, V> JavaPairRDD<K, V> union(List<JavaPairRDD<K, V>> rdds);
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package spark.api.java.function;
|
||||||
|
|
||||||
|
|
||||||
|
import scala.runtime.AbstractFunction1;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
|
||||||
|
// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
|
||||||
|
public abstract class DoubleFlatMapFunction<T> extends AbstractFunction1<T, Iterable<Double>>
|
||||||
|
implements Serializable {
|
||||||
|
public abstract Iterable<Double> apply(T t);
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package spark.api.java.function;
|
||||||
|
|
||||||
|
|
||||||
|
import scala.runtime.AbstractFunction1;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
// DoubleFunction does not extend Function because some UDF functions, like map,
|
||||||
|
// are overloaded for both Function and DoubleFunction.
|
||||||
|
public abstract class DoubleFunction<T> extends AbstractFunction1<T, Double>
|
||||||
|
implements Serializable {
|
||||||
|
public abstract Double apply(T t);
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
package spark.api.java.function
|
||||||
|
|
||||||
|
abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
|
||||||
|
def apply(x: T) : java.lang.Iterable[R]
|
||||||
|
|
||||||
|
def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
|
||||||
|
}
|
21
core/src/main/scala/spark/api/java/function/Function.java
Normal file
21
core/src/main/scala/spark/api/java/function/Function.java
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
package spark.api.java.function;
|
||||||
|
|
||||||
|
import scala.reflect.ClassManifest;
|
||||||
|
import scala.reflect.ClassManifest$;
|
||||||
|
import scala.runtime.AbstractFunction1;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for functions whose return types do not have special RDDs; DoubleFunction is
|
||||||
|
* handled separately, to allow DoubleRDDs to be constructed when mapping RDDs to doubles.
|
||||||
|
*/
|
||||||
|
public abstract class Function<T, R> extends AbstractFunction1<T, R> implements Serializable {
|
||||||
|
public abstract R apply(T t);
|
||||||
|
|
||||||
|
public ClassManifest<R> returnType() {
|
||||||
|
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
17
core/src/main/scala/spark/api/java/function/Function2.java
Normal file
17
core/src/main/scala/spark/api/java/function/Function2.java
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
package spark.api.java.function;
|
||||||
|
|
||||||
|
import scala.reflect.ClassManifest;
|
||||||
|
import scala.reflect.ClassManifest$;
|
||||||
|
import scala.runtime.AbstractFunction2;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public abstract class Function2<T1, T2, R> extends AbstractFunction2<T1, T2, R>
|
||||||
|
implements Serializable {
|
||||||
|
public ClassManifest<R> returnType() {
|
||||||
|
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract R apply(T1 t1, T2 t2);
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
package spark.api.java.function;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
import scala.reflect.ClassManifest;
|
||||||
|
import scala.reflect.ClassManifest$;
|
||||||
|
import scala.runtime.AbstractFunction1;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
|
||||||
|
// overloaded for both FlatMapFunction and PairFlatMapFunction.
|
||||||
|
public abstract class PairFlatMapFunction<T, K, V> extends AbstractFunction1<T, Iterable<Tuple2<K,
|
||||||
|
V>>> implements Serializable {
|
||||||
|
|
||||||
|
public ClassManifest<K> keyType() {
|
||||||
|
return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClassManifest<V> valueType() {
|
||||||
|
return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract Iterable<Tuple2<K, V>> apply(T t);
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
package spark.api.java.function;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
import scala.reflect.ClassManifest;
|
||||||
|
import scala.reflect.ClassManifest$;
|
||||||
|
import scala.runtime.AbstractFunction1;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
// PairFunction does not extend Function because some UDF functions, like map,
|
||||||
|
// are overloaded for both Function and PairFunction.
|
||||||
|
public abstract class PairFunction<T, K, V> extends AbstractFunction1<T, Tuple2<K,
|
||||||
|
V>> implements Serializable {
|
||||||
|
|
||||||
|
public ClassManifest<K> keyType() {
|
||||||
|
return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClassManifest<V> valueType() {
|
||||||
|
return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract Tuple2<K, V> apply(T t);
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
package spark.api.java.function
|
||||||
|
|
||||||
|
// This allows Java users to write void methods without having to return Unit.
|
||||||
|
abstract class VoidFunction[T] extends Serializable {
|
||||||
|
def apply(t: T) : Unit
|
||||||
|
}
|
||||||
|
|
||||||
|
// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
|
||||||
|
// return Unit), so it is implicitly converted to a Function1[T, Unit]:
|
||||||
|
object VoidFunction {
|
||||||
|
implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f(x))
|
||||||
|
}
|
|
@ -57,6 +57,32 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transform this PartialResult into a PartialResult of type T.
|
||||||
|
*/
|
||||||
|
def map[T](f: R => T) : PartialResult[T] = {
|
||||||
|
new PartialResult[T](f(initialVal), isFinal) {
|
||||||
|
override def getFinalValue() : T = synchronized {
|
||||||
|
f(PartialResult.this.getFinalValue())
|
||||||
|
}
|
||||||
|
override def onComplete(handler: T => Unit): PartialResult[T] = synchronized {
|
||||||
|
PartialResult.this.onComplete(handler.compose(f)).map(f)
|
||||||
|
}
|
||||||
|
override def onFail(handler: Exception => Unit) {
|
||||||
|
synchronized {
|
||||||
|
PartialResult.this.onFail(handler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
override def toString : String = synchronized {
|
||||||
|
PartialResult.this.getFinalValueInternal() match {
|
||||||
|
case Some(value) => "(final: " + f(value) + ")"
|
||||||
|
case None => "(partial: " + initialValue + ")"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
def getFinalValueInternal() = PartialResult.this.getFinalValueInternal().map(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private[spark] def setFinalValue(value: R) {
|
private[spark] def setFinalValue(value: R) {
|
||||||
synchronized {
|
synchronized {
|
||||||
if (finalValue != None) {
|
if (finalValue != None) {
|
||||||
|
@ -70,6 +96,8 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def getFinalValueInternal() = finalValue
|
||||||
|
|
||||||
private[spark] def setFailure(exception: Exception) {
|
private[spark] def setFailure(exception: Exception) {
|
||||||
synchronized {
|
synchronized {
|
||||||
if (failure != None) {
|
if (failure != None) {
|
||||||
|
|
|
@ -5,7 +5,7 @@ package spark.util
|
||||||
* numerically robust way. Includes support for merging two StatCounters. Based on Welford and
|
* numerically robust way. Includes support for merging two StatCounters. Based on Welford and
|
||||||
* Chan's algorithms described at http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance.
|
* Chan's algorithms described at http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance.
|
||||||
*/
|
*/
|
||||||
class StatCounter(values: TraversableOnce[Double]) {
|
class StatCounter(values: TraversableOnce[Double]) extends Serializable {
|
||||||
private var n: Long = 0 // Running count of our values
|
private var n: Long = 0 // Running count of our values
|
||||||
private var mu: Double = 0 // Running mean of our values
|
private var mu: Double = 0 // Running mean of our values
|
||||||
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
|
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
|
||||||
|
|
465
core/src/test/scala/spark/JavaAPISuite.java
Normal file
465
core/src/test/scala/spark/JavaAPISuite.java
Normal file
|
@ -0,0 +1,465 @@
|
||||||
|
package spark;
|
||||||
|
|
||||||
|
import com.google.common.base.Charsets;
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
||||||
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import spark.api.java.JavaDoubleRDD;
|
||||||
|
import spark.api.java.JavaPairRDD;
|
||||||
|
import spark.api.java.JavaRDD;
|
||||||
|
import spark.api.java.JavaSparkContext;
|
||||||
|
import spark.api.java.function.*;
|
||||||
|
import spark.partial.BoundedDouble;
|
||||||
|
import spark.partial.PartialResult;
|
||||||
|
import spark.util.StatCounter;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
// The test suite itself is Serializable so that anonymous Function implementations can be
|
||||||
|
// serialized, as an alternative to converting these anonymous classes to static inner classes;
|
||||||
|
// see http://stackoverflow.com/questions/758570/.
|
||||||
|
public class JavaAPISuite implements Serializable {
|
||||||
|
private transient JavaSparkContext sc;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
sc = new JavaSparkContext("local", "JavaAPISuite");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
sc.stop();
|
||||||
|
sc = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ReverseIntComparator implements Comparator<Integer>, Serializable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(Integer a, Integer b) {
|
||||||
|
if (a > b) return -1;
|
||||||
|
else if (a < b) return 1;
|
||||||
|
else return 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void sparkContextUnion() {
|
||||||
|
// Union of non-specialized JavaRDDs
|
||||||
|
List<String> strings = Arrays.asList("Hello", "World");
|
||||||
|
JavaRDD<String> s1 = sc.parallelize(strings);
|
||||||
|
JavaRDD<String> s2 = sc.parallelize(strings);
|
||||||
|
// Varargs
|
||||||
|
JavaRDD<String> sUnion = sc.union(s1, s2);
|
||||||
|
Assert.assertEquals(4, sUnion.count());
|
||||||
|
// List
|
||||||
|
List<JavaRDD<String>> srdds = new ArrayList<JavaRDD<String>>();
|
||||||
|
srdds.add(s1);
|
||||||
|
srdds.add(s2);
|
||||||
|
sUnion = sc.union(srdds);
|
||||||
|
Assert.assertEquals(4, sUnion.count());
|
||||||
|
|
||||||
|
// Union of JavaDoubleRDDs
|
||||||
|
List<Double> doubles = Arrays.asList(1.0, 2.0);
|
||||||
|
JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
|
||||||
|
JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
|
||||||
|
JavaDoubleRDD dUnion = sc.union(d1, d2);
|
||||||
|
Assert.assertEquals(4, dUnion.count());
|
||||||
|
|
||||||
|
// Union of JavaPairRDDs
|
||||||
|
List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
|
||||||
|
pairs.add(new Tuple2<Integer, Integer>(1, 2));
|
||||||
|
pairs.add(new Tuple2<Integer, Integer>(3, 4));
|
||||||
|
JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
|
||||||
|
JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
|
||||||
|
JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
|
||||||
|
Assert.assertEquals(4, pUnion.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void sortByKey() {
|
||||||
|
List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
|
||||||
|
pairs.add(new Tuple2<Integer, Integer>(0, 4));
|
||||||
|
pairs.add(new Tuple2<Integer, Integer>(3, 2));
|
||||||
|
pairs.add(new Tuple2<Integer, Integer>(-1, 1));
|
||||||
|
|
||||||
|
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
|
||||||
|
|
||||||
|
// Default comparator
|
||||||
|
JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
|
||||||
|
Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
|
||||||
|
List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
|
||||||
|
Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
|
||||||
|
Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
|
||||||
|
|
||||||
|
// Custom comparator
|
||||||
|
sortedRDD = rdd.sortByKey(new ReverseIntComparator(), false);
|
||||||
|
Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
|
||||||
|
sortedPairs = sortedRDD.collect();
|
||||||
|
Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
|
||||||
|
Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void foreach() {
|
||||||
|
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
|
||||||
|
rdd.foreach(new VoidFunction<String>() {
|
||||||
|
@Override
|
||||||
|
public void apply(String s) {
|
||||||
|
System.out.println(s);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void groupBy() {
|
||||||
|
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
|
||||||
|
Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean apply(Integer x) {
|
||||||
|
return x % 2 == 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
|
||||||
|
Assert.assertEquals(2, oddsAndEvens.count());
|
||||||
|
Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
|
||||||
|
Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
|
||||||
|
|
||||||
|
oddsAndEvens = rdd.groupBy(isOdd, 1);
|
||||||
|
Assert.assertEquals(2, oddsAndEvens.count());
|
||||||
|
Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
|
||||||
|
Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void cogroup() {
|
||||||
|
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
|
||||||
|
new Tuple2<String, String>("Apples", "Fruit"),
|
||||||
|
new Tuple2<String, String>("Oranges", "Fruit"),
|
||||||
|
new Tuple2<String, String>("Oranges", "Citrus")
|
||||||
|
));
|
||||||
|
JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
|
||||||
|
new Tuple2<String, Integer>("Oranges", 2),
|
||||||
|
new Tuple2<String, Integer>("Apples", 3)
|
||||||
|
));
|
||||||
|
JavaPairRDD<String, Tuple2<List<String>, List<Integer>>> cogrouped = categories.cogroup(prices);
|
||||||
|
Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString());
|
||||||
|
Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString());
|
||||||
|
|
||||||
|
cogrouped.collect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void foldReduce() {
|
||||||
|
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
|
||||||
|
Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer apply(Integer a, Integer b) {
|
||||||
|
return a + b;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
int sum = rdd.fold(0, add);
|
||||||
|
Assert.assertEquals(33, sum);
|
||||||
|
|
||||||
|
sum = rdd.reduce(add);
|
||||||
|
Assert.assertEquals(33, sum);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void approximateResults() {
|
||||||
|
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
|
||||||
|
Map<Integer, Long> countsByValue = rdd.countByValue();
|
||||||
|
Assert.assertEquals(2, countsByValue.get(1).longValue());
|
||||||
|
Assert.assertEquals(1, countsByValue.get(13).longValue());
|
||||||
|
|
||||||
|
PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1);
|
||||||
|
Map<Integer, BoundedDouble> finalValue = approx.getFinalValue();
|
||||||
|
Assert.assertEquals(2.0, finalValue.get(1).mean(), 0.01);
|
||||||
|
Assert.assertEquals(1.0, finalValue.get(13).mean(), 0.01);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void take() {
|
||||||
|
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
|
||||||
|
Assert.assertEquals(1, rdd.first().intValue());
|
||||||
|
List<Integer> firstTwo = rdd.take(2);
|
||||||
|
List<Integer> sample = rdd.takeSample(false, 2, 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void cartesian() {
|
||||||
|
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
|
||||||
|
JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
|
||||||
|
JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
|
||||||
|
Assert.assertEquals(new Tuple2<String, Double>("Hello", 1.0), cartesian.first());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void javaDoubleRDD() {
|
||||||
|
JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
|
||||||
|
JavaDoubleRDD distinct = rdd.distinct();
|
||||||
|
Assert.assertEquals(5, distinct.count());
|
||||||
|
JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean apply(Double x) {
|
||||||
|
return x > 2.0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.assertEquals(3, filter.count());
|
||||||
|
JavaDoubleRDD union = rdd.union(rdd);
|
||||||
|
Assert.assertEquals(12, union.count());
|
||||||
|
union = union.cache();
|
||||||
|
Assert.assertEquals(12, union.count());
|
||||||
|
|
||||||
|
Assert.assertEquals(20, rdd.sum(), 0.01);
|
||||||
|
StatCounter stats = rdd.stats();
|
||||||
|
Assert.assertEquals(20, stats.sum(), 0.01);
|
||||||
|
Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
|
||||||
|
Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
|
||||||
|
Assert.assertEquals(6.22222, rdd.variance(), 0.01);
|
||||||
|
Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
|
||||||
|
|
||||||
|
Double first = rdd.first();
|
||||||
|
List<Double> take = rdd.take(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void map() {
|
||||||
|
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
|
||||||
|
JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Double apply(Integer x) {
|
||||||
|
return 1.0 * x;
|
||||||
|
}
|
||||||
|
}).cache();
|
||||||
|
JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
|
||||||
|
@Override
|
||||||
|
public Tuple2<Integer, Integer> apply(Integer x) {
|
||||||
|
return new Tuple2<Integer, Integer>(x, x);
|
||||||
|
}
|
||||||
|
}).cache();
|
||||||
|
JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
|
||||||
|
@Override
|
||||||
|
public String apply(Integer x) {
|
||||||
|
return x.toString();
|
||||||
|
}
|
||||||
|
}).cache();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void flatMap() {
|
||||||
|
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
|
||||||
|
"The quick brown fox jumps over the lazy dog."));
|
||||||
|
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
|
||||||
|
@Override
|
||||||
|
public Iterable<String> apply(String x) {
|
||||||
|
return Arrays.asList(x.split(" "));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.assertEquals("Hello", words.first());
|
||||||
|
Assert.assertEquals(11, words.count());
|
||||||
|
|
||||||
|
JavaPairRDD<String, String> pairs = rdd.flatMap(
|
||||||
|
new PairFlatMapFunction<String, String, String>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterable<Tuple2<String, String>> apply(String s) {
|
||||||
|
List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
|
||||||
|
for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word));
|
||||||
|
return pairs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
|
||||||
|
Assert.assertEquals(11, pairs.count());
|
||||||
|
|
||||||
|
JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
|
||||||
|
@Override
|
||||||
|
public Iterable<Double> apply(String s) {
|
||||||
|
List<Double> lengths = new LinkedList<Double>();
|
||||||
|
for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
|
||||||
|
return lengths;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Double x = doubles.first();
|
||||||
|
Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
|
||||||
|
Assert.assertEquals(11, pairs.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
// File input / output tests are largely adapted from FileSuite:
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void textFiles() throws IOException {
|
||||||
|
File tempDir = Files.createTempDir();
|
||||||
|
String outputDir = new File(tempDir, "output").getAbsolutePath();
|
||||||
|
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
|
||||||
|
rdd.saveAsTextFile(outputDir);
|
||||||
|
// Read the plain text file and check it's OK
|
||||||
|
File outputFile = new File(outputDir, "part-00000");
|
||||||
|
String content = Files.toString(outputFile, Charsets.UTF_8);
|
||||||
|
Assert.assertEquals("1\n2\n3\n4\n", content);
|
||||||
|
// Also try reading it in as a text file RDD
|
||||||
|
List<String> expected = Arrays.asList("1", "2", "3", "4");
|
||||||
|
JavaRDD<String> readRDD = sc.textFile(outputDir);
|
||||||
|
Assert.assertEquals(expected, readRDD.collect());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void sequenceFile() {
|
||||||
|
File tempDir = Files.createTempDir();
|
||||||
|
String outputDir = new File(tempDir, "output").getAbsolutePath();
|
||||||
|
List<Tuple2<Integer, String>> pairs = Arrays.asList(
|
||||||
|
new Tuple2<Integer, String>(1, "a"),
|
||||||
|
new Tuple2<Integer, String>(2, "aa"),
|
||||||
|
new Tuple2<Integer, String>(3, "aaa")
|
||||||
|
);
|
||||||
|
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
|
||||||
|
|
||||||
|
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
|
||||||
|
@Override
|
||||||
|
public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) {
|
||||||
|
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
|
||||||
|
}
|
||||||
|
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
|
||||||
|
|
||||||
|
// Try reading the output back as an object file
|
||||||
|
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
|
||||||
|
Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
|
||||||
|
@Override
|
||||||
|
public Tuple2<Integer, String> apply(Tuple2<IntWritable, Text> pair) {
|
||||||
|
return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.assertEquals(pairs, readRDD.collect());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void writeWithNewAPIHadoopFile() {
|
||||||
|
File tempDir = Files.createTempDir();
|
||||||
|
String outputDir = new File(tempDir, "output").getAbsolutePath();
|
||||||
|
List<Tuple2<Integer, String>> pairs = Arrays.asList(
|
||||||
|
new Tuple2<Integer, String>(1, "a"),
|
||||||
|
new Tuple2<Integer, String>(2, "aa"),
|
||||||
|
new Tuple2<Integer, String>(3, "aaa")
|
||||||
|
);
|
||||||
|
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
|
||||||
|
|
||||||
|
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
|
||||||
|
@Override
|
||||||
|
public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) {
|
||||||
|
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
|
||||||
|
}
|
||||||
|
}).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
|
||||||
|
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
|
||||||
|
|
||||||
|
JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class,
|
||||||
|
Text.class);
|
||||||
|
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
|
||||||
|
String>() {
|
||||||
|
@Override
|
||||||
|
public String apply(Tuple2<IntWritable, Text> x) {
|
||||||
|
return x.toString();
|
||||||
|
}
|
||||||
|
}).collect().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void readWithNewAPIHadoopFile() throws IOException {
|
||||||
|
File tempDir = Files.createTempDir();
|
||||||
|
String outputDir = new File(tempDir, "output").getAbsolutePath();
|
||||||
|
List<Tuple2<Integer, String>> pairs = Arrays.asList(
|
||||||
|
new Tuple2<Integer, String>(1, "a"),
|
||||||
|
new Tuple2<Integer, String>(2, "aa"),
|
||||||
|
new Tuple2<Integer, String>(3, "aaa")
|
||||||
|
);
|
||||||
|
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
|
||||||
|
|
||||||
|
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
|
||||||
|
@Override
|
||||||
|
public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) {
|
||||||
|
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
|
||||||
|
}
|
||||||
|
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
|
||||||
|
|
||||||
|
JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
|
||||||
|
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class,
|
||||||
|
Text.class, new Job().getConfiguration());
|
||||||
|
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
|
||||||
|
String>() {
|
||||||
|
@Override
|
||||||
|
public String apply(Tuple2<IntWritable, Text> x) {
|
||||||
|
return x.toString();
|
||||||
|
}
|
||||||
|
}).collect().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void objectFilesOfInts() {
|
||||||
|
File tempDir = Files.createTempDir();
|
||||||
|
String outputDir = new File(tempDir, "output").getAbsolutePath();
|
||||||
|
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
|
||||||
|
rdd.saveAsObjectFile(outputDir);
|
||||||
|
// Try reading the output back as an object file
|
||||||
|
List<Integer> expected = Arrays.asList(1, 2, 3, 4);
|
||||||
|
JavaRDD<Integer> readRDD = sc.objectFile(outputDir);
|
||||||
|
Assert.assertEquals(expected, readRDD.collect());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void objectFilesOfComplexTypes() {
|
||||||
|
File tempDir = Files.createTempDir();
|
||||||
|
String outputDir = new File(tempDir, "output").getAbsolutePath();
|
||||||
|
List<Tuple2<Integer, String>> pairs = Arrays.asList(
|
||||||
|
new Tuple2<Integer, String>(1, "a"),
|
||||||
|
new Tuple2<Integer, String>(2, "aa"),
|
||||||
|
new Tuple2<Integer, String>(3, "aaa")
|
||||||
|
);
|
||||||
|
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
|
||||||
|
rdd.saveAsObjectFile(outputDir);
|
||||||
|
// Try reading the output back as an object file
|
||||||
|
JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir);
|
||||||
|
Assert.assertEquals(pairs, readRDD.collect());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void hadoopFile() {
|
||||||
|
File tempDir = Files.createTempDir();
|
||||||
|
String outputDir = new File(tempDir, "output").getAbsolutePath();
|
||||||
|
List<Tuple2<Integer, String>> pairs = Arrays.asList(
|
||||||
|
new Tuple2<Integer, String>(1, "a"),
|
||||||
|
new Tuple2<Integer, String>(2, "aa"),
|
||||||
|
new Tuple2<Integer, String>(3, "aaa")
|
||||||
|
);
|
||||||
|
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
|
||||||
|
|
||||||
|
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
|
||||||
|
@Override
|
||||||
|
public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) {
|
||||||
|
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
|
||||||
|
}
|
||||||
|
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
|
||||||
|
|
||||||
|
JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
|
||||||
|
SequenceFileInputFormat.class, IntWritable.class, Text.class);
|
||||||
|
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
|
||||||
|
String>() {
|
||||||
|
@Override
|
||||||
|
public String apply(Tuple2<IntWritable, Text> x) {
|
||||||
|
return x.toString();
|
||||||
|
}
|
||||||
|
}).collect().toString());
|
||||||
|
}
|
||||||
|
}
|
127
examples/src/main/java/spark/examples/JavaLR.java
Normal file
127
examples/src/main/java/spark/examples/JavaLR.java
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
package spark.examples;
|
||||||
|
|
||||||
|
import scala.util.Random;
|
||||||
|
import spark.api.java.JavaSparkContext;
|
||||||
|
import spark.api.java.function.Function;
|
||||||
|
import spark.api.java.function.Function2;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class JavaLR {
|
||||||
|
|
||||||
|
static int N = 10000; // Number of data points
|
||||||
|
static int D = 10; // Number of dimensions
|
||||||
|
static double R = 0.7; // Scaling factor
|
||||||
|
static int ITERATIONS = 5;
|
||||||
|
static Random rand = new Random(42);
|
||||||
|
|
||||||
|
static class DataPoint implements Serializable {
|
||||||
|
public DataPoint(double[] x, int y) {
|
||||||
|
this.x = x;
|
||||||
|
this.y = y;
|
||||||
|
}
|
||||||
|
double[] x;
|
||||||
|
int y;
|
||||||
|
}
|
||||||
|
|
||||||
|
static DataPoint generatePoint(int i) {
|
||||||
|
int y = (i % 2 == 0) ? -1 : 1;
|
||||||
|
double[] x = new double[D];
|
||||||
|
for (int j = 0; j < D; j++) {
|
||||||
|
x[j] = rand.nextGaussian() + y * R;
|
||||||
|
}
|
||||||
|
return new DataPoint(x, y);
|
||||||
|
}
|
||||||
|
|
||||||
|
static List<DataPoint> generateData() {
|
||||||
|
List<DataPoint> points = new ArrayList<DataPoint>(N);
|
||||||
|
for (int i = 0; i < N; i++) {
|
||||||
|
points.add(generatePoint(i));
|
||||||
|
}
|
||||||
|
return points;
|
||||||
|
}
|
||||||
|
|
||||||
|
static class VectorSum extends Function2<double[], double[], double[]> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public double[] apply(double[] a, double[] b) {
|
||||||
|
double[] result = new double[D];
|
||||||
|
for (int j = 0; j < D; j++) {
|
||||||
|
result[j] = a[j] + b[j];
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ComputeGradient extends Function<DataPoint, double[]> {
|
||||||
|
|
||||||
|
double[] weights;
|
||||||
|
|
||||||
|
public ComputeGradient(double[] weights) {
|
||||||
|
this.weights = weights;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public double[] apply(DataPoint p) {
|
||||||
|
double[] gradient = new double[D];
|
||||||
|
for (int i = 0; i < D; i++) {
|
||||||
|
double dot = dot(weights, p.x);
|
||||||
|
gradient[i] = (1 / (1 + Math.exp(-p.y * dot)) - 1) * p.y * p.x[i];
|
||||||
|
}
|
||||||
|
return gradient;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static double dot(double[] a, double[] b) {
|
||||||
|
double x = 0;
|
||||||
|
for (int i = 0; i < D; i++) {
|
||||||
|
x += a[i] * b[i];
|
||||||
|
}
|
||||||
|
return x;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void printWeights(double[] a) {
|
||||||
|
System.out.println(Arrays.toString(a));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
if (args.length == 0) {
|
||||||
|
System.err.println("Usage: JavaLR <host> [<slices>]");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR");
|
||||||
|
Integer numSlices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
|
||||||
|
List<DataPoint> data = generateData();
|
||||||
|
|
||||||
|
// Initialize w to a random value
|
||||||
|
double[] w = new double[D];
|
||||||
|
for (int i = 0; i < D; i++) {
|
||||||
|
w[i] = 2 * rand.nextDouble() - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.print("Initial w: ");
|
||||||
|
printWeights(w);
|
||||||
|
|
||||||
|
for (int i = 1; i <= ITERATIONS; i++) {
|
||||||
|
System.out.println("On iteration " + i);
|
||||||
|
|
||||||
|
double[] gradient = sc.parallelize(data, numSlices).map(
|
||||||
|
new ComputeGradient(w)
|
||||||
|
).reduce(new VectorSum());
|
||||||
|
|
||||||
|
for (int j = 0; j < D; j++) {
|
||||||
|
w[j] -= gradient[j];
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.print("Final w: ");
|
||||||
|
printWeights(w);
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
}
|
76
examples/src/main/java/spark/examples/JavaTC.java
Normal file
76
examples/src/main/java/spark/examples/JavaTC.java
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
package spark.examples;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
import scala.util.Random;
|
||||||
|
import spark.api.java.JavaPairRDD;
|
||||||
|
import spark.api.java.JavaSparkContext;
|
||||||
|
import spark.api.java.function.PairFunction;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class JavaTC {
|
||||||
|
|
||||||
|
static int numEdges = 200;
|
||||||
|
static int numVertices = 100;
|
||||||
|
static Random rand = new Random(42);
|
||||||
|
|
||||||
|
static List<Tuple2<Integer, Integer>> generateGraph() {
|
||||||
|
Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges);
|
||||||
|
while (edges.size() < numEdges) {
|
||||||
|
int from = rand.nextInt(numVertices);
|
||||||
|
int to = rand.nextInt(numVertices);
|
||||||
|
Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to);
|
||||||
|
if (from != to) edges.add(e);
|
||||||
|
}
|
||||||
|
return new ArrayList(edges);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
|
||||||
|
Integer, Integer> {
|
||||||
|
static ProjectFn INSTANCE = new ProjectFn();
|
||||||
|
@Override
|
||||||
|
public Tuple2<Integer, Integer> apply(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
|
||||||
|
return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
if (args.length == 0) {
|
||||||
|
System.err.println("Usage: JavaTC <host> [<slices>]");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC");
|
||||||
|
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
|
||||||
|
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices);
|
||||||
|
|
||||||
|
// Linear transitive closure: each round grows paths by one edge,
|
||||||
|
// by joining the graph's edges with the already-discovered paths.
|
||||||
|
// e.g. join the path (y, z) from the TC with the edge (x, y) from
|
||||||
|
// the graph to obtain the path (x, z).
|
||||||
|
|
||||||
|
// Because join() joins on keys, the edges are stored in reversed order.
|
||||||
|
JavaPairRDD<Integer, Integer> edges = tc.map(new PairFunction<Tuple2<Integer, Integer>,
|
||||||
|
Integer, Integer>() {
|
||||||
|
@Override
|
||||||
|
public Tuple2<Integer, Integer> apply(Tuple2<Integer, Integer> e) {
|
||||||
|
return new Tuple2<Integer, Integer>(e._2(), e._1());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
long oldCount = 0;
|
||||||
|
do {
|
||||||
|
oldCount = tc.count();
|
||||||
|
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
|
||||||
|
// then project the result to obtain the new (x, z) paths.
|
||||||
|
|
||||||
|
tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct();
|
||||||
|
} while (tc.count() != oldCount);
|
||||||
|
|
||||||
|
System.out.println("TC has " + tc.count() + " edges.");
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
}
|
38
examples/src/main/java/spark/examples/JavaTest.java
Normal file
38
examples/src/main/java/spark/examples/JavaTest.java
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
package spark.examples;
|
||||||
|
|
||||||
|
import spark.api.java.JavaDoubleRDD;
|
||||||
|
import spark.api.java.JavaRDD;
|
||||||
|
import spark.api.java.JavaSparkContext;
|
||||||
|
import spark.api.java.function.DoubleFunction;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class JavaTest {
|
||||||
|
|
||||||
|
public static class MapFunction extends DoubleFunction<String> {
|
||||||
|
@Override
|
||||||
|
public Double apply(String s) {
|
||||||
|
return java.lang.Double.parseDouble(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
JavaSparkContext ctx = new JavaSparkContext("local", "JavaTest");
|
||||||
|
JavaRDD<String> lines = ctx.textFile("numbers.txt", 1).cache();
|
||||||
|
List<String> lineArr = lines.collect();
|
||||||
|
|
||||||
|
for (String line : lineArr) {
|
||||||
|
System.out.println(line);
|
||||||
|
}
|
||||||
|
|
||||||
|
JavaDoubleRDD data = lines.map(new MapFunction()).cache();
|
||||||
|
|
||||||
|
System.out.println("output");
|
||||||
|
List<Double> output = data.collect();
|
||||||
|
for (Double num : output) {
|
||||||
|
System.out.println(num);
|
||||||
|
}
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
}
|
61
examples/src/main/java/spark/examples/JavaWordCount.java
Normal file
61
examples/src/main/java/spark/examples/JavaWordCount.java
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
package spark.examples;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
import scala.collection.immutable.StringOps;
|
||||||
|
import spark.api.java.JavaPairRDD;
|
||||||
|
import spark.api.java.JavaRDD;
|
||||||
|
import spark.api.java.JavaSparkContext;
|
||||||
|
import spark.api.java.function.FlatMapFunction;
|
||||||
|
import spark.api.java.function.Function2;
|
||||||
|
import spark.api.java.function.PairFunction;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class JavaWordCount {
|
||||||
|
|
||||||
|
public static class SplitFunction extends FlatMapFunction<String, String> {
|
||||||
|
@Override
|
||||||
|
public Iterable<String> apply(String s) {
|
||||||
|
StringOps op = new StringOps(s);
|
||||||
|
return Arrays.asList(op.split(' '));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MapFunction extends PairFunction<String, String, Integer> {
|
||||||
|
@Override
|
||||||
|
public Tuple2<String, Integer> apply(String s) {
|
||||||
|
return new Tuple2(s, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ReduceFunction extends Function2<Integer, Integer, Integer> {
|
||||||
|
@Override
|
||||||
|
public Integer apply(Integer i1, Integer i2) {
|
||||||
|
return i1 + i2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
JavaSparkContext ctx = new JavaSparkContext("local", "JavaWordCount");
|
||||||
|
JavaRDD<String> lines = ctx.textFile("numbers.txt", 1).cache();
|
||||||
|
List<String> lineArr = lines.collect();
|
||||||
|
|
||||||
|
for (String line : lineArr) {
|
||||||
|
System.out.println(line);
|
||||||
|
}
|
||||||
|
|
||||||
|
JavaRDD<String> words = lines.flatMap(new SplitFunction());
|
||||||
|
|
||||||
|
JavaPairRDD<String, Integer> splits = words.map(new MapFunction());
|
||||||
|
|
||||||
|
JavaPairRDD<String, Integer> counts = splits.reduceByKey(new ReduceFunction());
|
||||||
|
|
||||||
|
System.out.println("output");
|
||||||
|
List<Tuple2<String, Integer>> output = counts.collect();
|
||||||
|
for (Tuple2 tuple : output) {
|
||||||
|
System.out.print(tuple._1 + ": ");
|
||||||
|
System.out.println(tuple._2);
|
||||||
|
}
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
}
|
53
examples/src/main/scala/spark/examples/SparkTC.scala
Normal file
53
examples/src/main/scala/spark/examples/SparkTC.scala
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package spark.examples
|
||||||
|
|
||||||
|
import spark._
|
||||||
|
import SparkContext._
|
||||||
|
import scala.util.Random
|
||||||
|
import scala.collection.mutable
|
||||||
|
|
||||||
|
object SparkTC {
|
||||||
|
|
||||||
|
val numEdges = 200
|
||||||
|
val numVertices = 100
|
||||||
|
val rand = new Random(42)
|
||||||
|
|
||||||
|
def generateGraph = {
|
||||||
|
val edges: mutable.Set[(Int, Int)] = mutable.Set.empty
|
||||||
|
while (edges.size < numEdges) {
|
||||||
|
val from = rand.nextInt(numVertices)
|
||||||
|
val to = rand.nextInt(numVertices)
|
||||||
|
if (from != to) edges.+=((from, to))
|
||||||
|
}
|
||||||
|
edges.toSeq
|
||||||
|
}
|
||||||
|
|
||||||
|
def main(args: Array[String]) {
|
||||||
|
if (args.length == 0) {
|
||||||
|
System.err.println("Usage: SparkTC <host> [<slices>]")
|
||||||
|
System.exit(1)
|
||||||
|
}
|
||||||
|
val spark = new SparkContext(args(0), "SparkTC")
|
||||||
|
val slices = if (args.length > 1) args(1).toInt else 2
|
||||||
|
var tc = spark.parallelize(generateGraph, slices)
|
||||||
|
|
||||||
|
// Linear transitive closure: each round grows paths by one edge,
|
||||||
|
// by joining the graph's edges with the already-discovered paths.
|
||||||
|
// e.g. join the path (y, z) from the TC with the edge (x, y) from
|
||||||
|
// the graph to obtain the path (x, z).
|
||||||
|
|
||||||
|
// Because join() joins on keys, the edges are stored in reversed order.
|
||||||
|
val edges = tc.map(x => (x._2, x._1))
|
||||||
|
|
||||||
|
// This join is iterated until a fixed point is reached.
|
||||||
|
var oldCount = 0L
|
||||||
|
do {
|
||||||
|
oldCount = tc.count()
|
||||||
|
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
|
||||||
|
// then project the result to obtain the new (x, z) paths.
|
||||||
|
tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct()
|
||||||
|
} while (tc.count() != oldCount)
|
||||||
|
|
||||||
|
println("TC has " + tc.count() + " edges.")
|
||||||
|
System.exit(0)
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,7 +31,8 @@ object SparkBuild extends Build {
|
||||||
libraryDependencies ++= Seq(
|
libraryDependencies ++= Seq(
|
||||||
"org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
|
"org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
|
||||||
"org.scalatest" %% "scalatest" % "1.6.1" % "test",
|
"org.scalatest" %% "scalatest" % "1.6.1" % "test",
|
||||||
"org.scalacheck" %% "scalacheck" % "1.9" % "test"
|
"org.scalacheck" %% "scalacheck" % "1.9" % "test",
|
||||||
|
"com.novocode" % "junit-interface" % "0.8" % "test"
|
||||||
),
|
),
|
||||||
parallelExecution := false,
|
parallelExecution := false,
|
||||||
/* Workaround for issue #206 (fixed after SBT 0.11.0) */
|
/* Workaround for issue #206 (fixed after SBT 0.11.0) */
|
||||||
|
|
Loading…
Reference in a new issue