Merge branch 'dev' of github.com:mesos/spark into dev
This commit is contained in:
commit
1183b30941
|
@ -208,23 +208,19 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
|
|||
// TODO: fetch any remote copy of the split that may be available
|
||||
// TODO: also register a listener for when it unloads
|
||||
logInfo("Computing partition " + split)
|
||||
val elements = new ArrayBuffer[Any]
|
||||
elements ++= rdd.compute(split)
|
||||
try {
|
||||
// BlockManager will iterate over results from compute to create RDD
|
||||
blockManager.put(key, rdd.compute(split), storageLevel, true)
|
||||
// Try to put this block in the blockManager
|
||||
blockManager.put(key, elements, storageLevel, true)
|
||||
//future.apply() // Wait for the reply from the cache tracker
|
||||
blockManager.get(key) match {
|
||||
case Some(values) =>
|
||||
return values.asInstanceOf[Iterator[T]]
|
||||
case None =>
|
||||
logWarning("loading partition failed after computing it " + key)
|
||||
return null
|
||||
}
|
||||
} finally {
|
||||
loading.synchronized {
|
||||
loading.remove(key)
|
||||
loading.notifyAll()
|
||||
}
|
||||
}
|
||||
return elements.iterator.asInstanceOf[Iterator[T]]
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -461,7 +461,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
|
|||
return buf.toArray
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Return the first element in this RDD.
|
||||
*/
|
||||
def first(): T = take(1) match {
|
||||
|
|
|
@ -22,8 +22,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
|
|||
|
||||
import JavaDoubleRDD.fromRDD
|
||||
|
||||
/** Persist this RDD with the default storage level (MEMORY_ONLY). */
|
||||
def cache(): JavaDoubleRDD = fromRDD(srdd.cache())
|
||||
|
||||
/**
|
||||
* Set this RDD's storage level to persist its values across operations after the first time
|
||||
* it is computed. Can only be called once on each RDD.
|
||||
*/
|
||||
def persist(newLevel: StorageLevel): JavaDoubleRDD = fromRDD(srdd.persist(newLevel))
|
||||
|
||||
// first() has to be overriden here in order for its return type to be Double instead of Object.
|
||||
|
@ -31,38 +36,63 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
|
|||
|
||||
// Transformations (return a new RDD)
|
||||
|
||||
/**
|
||||
* Return a new RDD containing the distinct elements in this RDD.
|
||||
*/
|
||||
def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct())
|
||||
|
||||
/**
|
||||
* Return a new RDD containing the distinct elements in this RDD.
|
||||
*/
|
||||
def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits))
|
||||
|
||||
/**
|
||||
* Return a new RDD containing only the elements that satisfy a predicate.
|
||||
*/
|
||||
def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD =
|
||||
fromRDD(srdd.filter(x => f(x).booleanValue()))
|
||||
|
||||
/**
|
||||
* Return a sampled subset of this RDD.
|
||||
*/
|
||||
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD =
|
||||
fromRDD(srdd.sample(withReplacement, fraction, seed))
|
||||
|
||||
/**
|
||||
* Return the union of this RDD and another one. Any identical elements will appear multiple
|
||||
* times (use `.distinct()` to eliminate them).
|
||||
*/
|
||||
def union(other: JavaDoubleRDD): JavaDoubleRDD = fromRDD(srdd.union(other.srdd))
|
||||
|
||||
// Double RDD functions
|
||||
|
||||
/** Return the sum of the elements in this RDD. */
|
||||
def sum(): Double = srdd.sum()
|
||||
|
||||
/** Return a [[spark.StatCounter]] describing the elements in this RDD. */
|
||||
def stats(): StatCounter = srdd.stats()
|
||||
|
||||
/** Return the mean of the elements in this RDD. */
|
||||
def mean(): Double = srdd.mean()
|
||||
|
||||
/** Return the variance of the elements in this RDD. */
|
||||
def variance(): Double = srdd.variance()
|
||||
|
||||
/** Return the standard deviation of the elements in this RDD. */
|
||||
def stdev(): Double = srdd.stdev()
|
||||
|
||||
/** Return the approximate mean of the elements in this RDD. */
|
||||
def meanApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
|
||||
srdd.meanApprox(timeout, confidence)
|
||||
|
||||
/** Return the approximate mean of the elements in this RDD. */
|
||||
def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout)
|
||||
|
||||
/** Return the approximate sum of the elements in this RDD. */
|
||||
def sumApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
|
||||
srdd.sumApprox(timeout, confidence)
|
||||
|
||||
|
||||
/** Return the approximate sum of the elements in this RDD. */
|
||||
def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout)
|
||||
}
|
||||
|
||||
|
|
|
@ -34,23 +34,44 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
|
||||
// Common RDD functions
|
||||
|
||||
/** Persist this RDD with the default storage level (MEMORY_ONLY). */
|
||||
def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache())
|
||||
|
||||
/**
|
||||
* Set this RDD's storage level to persist its values across operations after the first time
|
||||
* it is computed. Can only be called once on each RDD.
|
||||
*/
|
||||
def persist(newLevel: StorageLevel): JavaPairRDD[K, V] =
|
||||
new JavaPairRDD[K, V](rdd.persist(newLevel))
|
||||
|
||||
// Transformations (return a new RDD)
|
||||
|
||||
/**
|
||||
* Return a new RDD containing the distinct elements in this RDD.
|
||||
*/
|
||||
def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct())
|
||||
|
||||
/**
|
||||
* Return a new RDD containing the distinct elements in this RDD.
|
||||
*/
|
||||
def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits))
|
||||
|
||||
/**
|
||||
* Return a new RDD containing only the elements that satisfy a predicate.
|
||||
*/
|
||||
def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
|
||||
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
|
||||
|
||||
/**
|
||||
* Return a sampled subset of this RDD.
|
||||
*/
|
||||
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] =
|
||||
new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))
|
||||
|
||||
/**
|
||||
* Return the union of this RDD and another one. Any identical elements will appear multiple
|
||||
* times (use `.distinct()` to eliminate them).
|
||||
*/
|
||||
def union(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
|
||||
new JavaPairRDD[K, V](rdd.union(other.rdd))
|
||||
|
||||
|
@ -61,7 +82,21 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
override def first(): (K, V) = rdd.first()
|
||||
|
||||
// Pair RDD functions
|
||||
|
||||
|
||||
/**
|
||||
* Generic function to combine the elements for each key using a custom set of aggregation
|
||||
* functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
|
||||
* "combined type" C * Note that V and C can be different -- for example, one might group an
|
||||
* RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
|
||||
* functions:
|
||||
*
|
||||
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
|
||||
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
|
||||
* - `mergeCombiners`, to combine two C's into a single one.
|
||||
*
|
||||
* In addition, users can control the partitioning of the output RDD, and whether to perform
|
||||
* map-side aggregation (if a mapper can produce multiple items with the same key).
|
||||
*/
|
||||
def combineByKey[C](createCombiner: Function[V, C],
|
||||
mergeValue: JFunction2[C, V, C],
|
||||
mergeCombiners: JFunction2[C, C, C],
|
||||
|
@ -76,50 +111,113 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
))
|
||||
}
|
||||
|
||||
/**
|
||||
* Simplified version of combineByKey that hash-partitions the output RDD.
|
||||
*/
|
||||
def combineByKey[C](createCombiner: JFunction[V, C],
|
||||
mergeValue: JFunction2[C, V, C],
|
||||
mergeCombiners: JFunction2[C, C, C],
|
||||
numSplits: Int): JavaPairRDD[K, C] =
|
||||
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
|
||||
|
||||
/**
|
||||
* Merge the values for each key using an associative reduce function. This will also perform
|
||||
* the merging locally on each mapper before sending results to a reducer, similarly to a
|
||||
* "combiner" in MapReduce.
|
||||
*/
|
||||
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
|
||||
fromRDD(rdd.reduceByKey(partitioner, func))
|
||||
|
||||
/**
|
||||
* Merge the values for each key using an associative reduce function, but return the results
|
||||
* immediately to the master as a Map. This will also perform the merging locally on each mapper
|
||||
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
|
||||
*/
|
||||
def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] =
|
||||
mapAsJavaMap(rdd.reduceByKeyLocally(func))
|
||||
|
||||
/** Count the number of elements for each key, and return the result to the master as a Map. */
|
||||
def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey())
|
||||
|
||||
/**
|
||||
* (Experimental) Approximate version of countByKey that can return a partial result if it does
|
||||
* not finish within a timeout.
|
||||
*/
|
||||
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
|
||||
rdd.countByKeyApprox(timeout).map(mapAsJavaMap)
|
||||
|
||||
/**
|
||||
* (Experimental) Approximate version of countByKey that can return a partial result if it does
|
||||
* not finish within a timeout.
|
||||
*/
|
||||
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
|
||||
: PartialResult[java.util.Map[K, BoundedDouble]] =
|
||||
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
|
||||
|
||||
/**
|
||||
* Merge the values for each key using an associative reduce function. This will also perform
|
||||
* the merging locally on each mapper before sending results to a reducer, similarly to a
|
||||
* "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits.
|
||||
*/
|
||||
def reduceByKey(func: JFunction2[V, V, V], numSplits: Int): JavaPairRDD[K, V] =
|
||||
fromRDD(rdd.reduceByKey(func, numSplits))
|
||||
|
||||
/**
|
||||
* Group the values for each key in the RDD into a single sequence. Allows controlling the
|
||||
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
|
||||
*/
|
||||
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
|
||||
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
|
||||
|
||||
/**
|
||||
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
|
||||
* resulting RDD with into `numSplits` partitions.
|
||||
*/
|
||||
def groupByKey(numSplits: Int): JavaPairRDD[K, JList[V]] =
|
||||
fromRDD(groupByResultToJava(rdd.groupByKey(numSplits)))
|
||||
|
||||
/**
|
||||
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
|
||||
* is true, Spark will group values of the same key together on the map side before the
|
||||
* repartitioning, to only send each key over the network once. If a large number of
|
||||
* duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
|
||||
* be set to true.
|
||||
*/
|
||||
def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
|
||||
fromRDD(rdd.partitionBy(partitioner))
|
||||
|
||||
/**
|
||||
* Merge the values for each key using an associative reduce function. This will also perform
|
||||
* the merging locally on each mapper before sending results to a reducer, similarly to a
|
||||
* "combiner" in MapReduce.
|
||||
*/
|
||||
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
|
||||
fromRDD(rdd.join(other, partitioner))
|
||||
|
||||
/**
|
||||
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
|
||||
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
|
||||
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
|
||||
* partition the output RDD.
|
||||
*/
|
||||
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
|
||||
: JavaPairRDD[K, (V, Option[W])] =
|
||||
fromRDD(rdd.leftOuterJoin(other, partitioner))
|
||||
|
||||
/**
|
||||
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
|
||||
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
|
||||
* pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
|
||||
* partition the output RDD.
|
||||
*/
|
||||
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
|
||||
: JavaPairRDD[K, (Option[V], W)] =
|
||||
fromRDD(rdd.rightOuterJoin(other, partitioner))
|
||||
|
||||
/**
|
||||
* Simplified version of combineByKey that hash-partitions the resulting RDD using the default
|
||||
* parallelism level.
|
||||
*/
|
||||
def combineByKey[C](createCombiner: JFunction[V, C],
|
||||
mergeValue: JFunction2[C, V, C],
|
||||
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
|
||||
|
@ -128,40 +226,94 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners))
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge the values for each key using an associative reduce function. This will also perform
|
||||
* the merging locally on each mapper before sending results to a reducer, similarly to a
|
||||
* "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level.
|
||||
*/
|
||||
def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
|
||||
val partitioner = rdd.defaultPartitioner(rdd)
|
||||
fromRDD(reduceByKey(partitioner, func))
|
||||
}
|
||||
|
||||
/**
|
||||
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
|
||||
* resulting RDD with the default parallelism level.
|
||||
*/
|
||||
def groupByKey(): JavaPairRDD[K, JList[V]] =
|
||||
fromRDD(groupByResultToJava(rdd.groupByKey()))
|
||||
|
||||
/**
|
||||
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
|
||||
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
|
||||
* (k, v2) is in `other`. Performs a hash join across the cluster.
|
||||
*/
|
||||
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] =
|
||||
fromRDD(rdd.join(other))
|
||||
|
||||
/**
|
||||
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
|
||||
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
|
||||
* (k, v2) is in `other`. Performs a hash join across the cluster.
|
||||
*/
|
||||
def join[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, W)] =
|
||||
fromRDD(rdd.join(other, numSplits))
|
||||
|
||||
/**
|
||||
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
|
||||
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
|
||||
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
|
||||
* using the default level of parallelism.
|
||||
*/
|
||||
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] =
|
||||
fromRDD(rdd.leftOuterJoin(other))
|
||||
|
||||
/**
|
||||
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
|
||||
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
|
||||
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
|
||||
* into `numSplits` partitions.
|
||||
*/
|
||||
def leftOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, Option[W])] =
|
||||
fromRDD(rdd.leftOuterJoin(other, numSplits))
|
||||
|
||||
/**
|
||||
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
|
||||
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
|
||||
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
|
||||
* RDD using the default parallelism level.
|
||||
*/
|
||||
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] =
|
||||
fromRDD(rdd.rightOuterJoin(other))
|
||||
|
||||
/**
|
||||
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
|
||||
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
|
||||
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
|
||||
* RDD into the given number of partitions.
|
||||
*/
|
||||
def rightOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (Option[V], W)] =
|
||||
fromRDD(rdd.rightOuterJoin(other, numSplits))
|
||||
|
||||
/**
|
||||
* Return the key-value pairs in this RDD to the master as a Map.
|
||||
*/
|
||||
def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap())
|
||||
|
||||
/**
|
||||
* Pass each value in the key-value pair RDD through a map function without changing the keys;
|
||||
* this also retains the original RDD's partitioning.
|
||||
*/
|
||||
def mapValues[U](f: Function[V, U]): JavaPairRDD[K, U] = {
|
||||
implicit val cm: ClassManifest[U] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
|
||||
fromRDD(rdd.mapValues(f))
|
||||
}
|
||||
|
||||
/**
|
||||
* Pass each value in the key-value pair RDD through a flatMap function without changing the
|
||||
* keys; this also retains the original RDD's partitioning.
|
||||
*/
|
||||
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
|
||||
import scala.collection.JavaConverters._
|
||||
def fn = (x: V) => f.apply(x).asScala
|
||||
|
@ -170,37 +322,68 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
fromRDD(rdd.flatMapValues(fn))
|
||||
}
|
||||
|
||||
/**
|
||||
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
|
||||
* list of values for that key in `this` as well as `other`.
|
||||
*/
|
||||
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
|
||||
: JavaPairRDD[K, (JList[V], JList[W])] =
|
||||
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
|
||||
|
||||
/**
|
||||
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
|
||||
* tuple with the list of values for that key in `this`, `other1` and `other2`.
|
||||
*/
|
||||
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
|
||||
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
|
||||
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
|
||||
|
||||
/**
|
||||
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
|
||||
* list of values for that key in `this` as well as `other`.
|
||||
*/
|
||||
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
|
||||
fromRDD(cogroupResultToJava(rdd.cogroup(other)))
|
||||
|
||||
/**
|
||||
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
|
||||
* tuple with the list of values for that key in `this`, `other1` and `other2`.
|
||||
*/
|
||||
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
|
||||
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
|
||||
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
|
||||
|
||||
/**
|
||||
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
|
||||
* list of values for that key in `this` as well as `other`.
|
||||
*/
|
||||
def cogroup[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (JList[V], JList[W])]
|
||||
= fromRDD(cogroupResultToJava(rdd.cogroup(other, numSplits)))
|
||||
|
||||
/**
|
||||
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
|
||||
* tuple with the list of values for that key in `this`, `other1` and `other2`.
|
||||
*/
|
||||
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numSplits: Int)
|
||||
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
|
||||
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numSplits)))
|
||||
|
||||
/** Alias for cogroup. */
|
||||
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
|
||||
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
|
||||
|
||||
/** Alias for cogroup. */
|
||||
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
|
||||
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
|
||||
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
|
||||
|
||||
/**
|
||||
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
|
||||
* RDD has a known partitioner by only searching the partition that the key maps to.
|
||||
*/
|
||||
def lookup(key: K): JList[V] = seqAsJavaList(rdd.lookup(key))
|
||||
|
||||
/** Output the RDD to any Hadoop-supported file system. */
|
||||
def saveAsHadoopFile[F <: OutputFormat[_, _]](
|
||||
path: String,
|
||||
keyClass: Class[_],
|
||||
|
@ -210,6 +393,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
|
||||
}
|
||||
|
||||
/** Output the RDD to any Hadoop-supported file system. */
|
||||
def saveAsHadoopFile[F <: OutputFormat[_, _]](
|
||||
path: String,
|
||||
keyClass: Class[_],
|
||||
|
@ -218,6 +402,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
|
||||
}
|
||||
|
||||
/** Output the RDD to any Hadoop-supported file system. */
|
||||
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
|
||||
path: String,
|
||||
keyClass: Class[_],
|
||||
|
@ -227,6 +412,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
|
||||
}
|
||||
|
||||
/** Output the RDD to any Hadoop-supported file system. */
|
||||
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
|
||||
path: String,
|
||||
keyClass: Class[_],
|
||||
|
@ -235,6 +421,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
|
||||
}
|
||||
|
||||
/**
|
||||
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
|
||||
* that storage system. The JobConf should set an OutputFormat and any output paths required
|
||||
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
|
||||
* MapReduce job.
|
||||
*/
|
||||
def saveAsHadoopDataset(conf: JobConf) {
|
||||
rdd.saveAsHadoopDataset(conf)
|
||||
}
|
||||
|
@ -243,13 +435,31 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
|
|||
// Ordered RDD Functions
|
||||
def sortByKey(): JavaPairRDD[K, V] = sortByKey(true)
|
||||
|
||||
/**
|
||||
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
|
||||
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
|
||||
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
|
||||
* order of the keys).
|
||||
*/
|
||||
def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = {
|
||||
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
|
||||
sortByKey(comp, true)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
|
||||
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
|
||||
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
|
||||
* order of the keys).
|
||||
*/
|
||||
def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V] = sortByKey(comp, true)
|
||||
|
||||
/**
|
||||
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
|
||||
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
|
||||
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
|
||||
* order of the keys).
|
||||
*/
|
||||
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
|
||||
class KeyOrdering(val a: K) extends Ordered[K] {
|
||||
override def compare(b: K) = comp.compare(a, b)
|
||||
|
|
|
@ -11,22 +11,43 @@ JavaRDDLike[T, JavaRDD[T]] {
|
|||
|
||||
// Common RDD functions
|
||||
|
||||
/** Persist this RDD with the default storage level (MEMORY_ONLY). */
|
||||
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
|
||||
|
||||
/**
|
||||
* Set this RDD's storage level to persist its values across operations after the first time
|
||||
* it is computed. Can only be called once on each RDD.
|
||||
*/
|
||||
def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel))
|
||||
|
||||
// Transformations (return a new RDD)
|
||||
|
||||
/**
|
||||
* Return a new RDD containing the distinct elements in this RDD.
|
||||
*/
|
||||
def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct())
|
||||
|
||||
/**
|
||||
* Return a new RDD containing the distinct elements in this RDD.
|
||||
*/
|
||||
def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits))
|
||||
|
||||
|
||||
/**
|
||||
* Return a new RDD containing only the elements that satisfy a predicate.
|
||||
*/
|
||||
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
|
||||
wrapRDD(rdd.filter((x => f(x).booleanValue())))
|
||||
|
||||
/**
|
||||
* Return a sampled subset of this RDD.
|
||||
*/
|
||||
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
|
||||
wrapRDD(rdd.sample(withReplacement, fraction, seed))
|
||||
|
||||
|
||||
/**
|
||||
* Return the union of this RDD and another one. Any identical elements will appear multiple
|
||||
* times (use `.distinct()` to eliminate them).
|
||||
*/
|
||||
def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd))
|
||||
|
||||
}
|
||||
|
|
|
@ -19,41 +19,71 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
|
||||
def rdd: RDD[T]
|
||||
|
||||
/** Set of partitions in this RDD. */
|
||||
def splits: JList[Split] = new java.util.ArrayList(rdd.splits.toSeq)
|
||||
|
||||
/** The [[spark.SparkContext]] that this RDD was created on. */
|
||||
def context: SparkContext = rdd.context
|
||||
|
||||
|
||||
/** A unique ID for this RDD (within its SparkContext). */
|
||||
def id: Int = rdd.id
|
||||
|
||||
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
|
||||
def getStorageLevel: StorageLevel = rdd.getStorageLevel
|
||||
|
||||
/**
|
||||
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
|
||||
* This should ''not'' be called by users directly, but is available for implementors of custom
|
||||
* subclasses of RDD.
|
||||
*/
|
||||
def iterator(split: Split): java.util.Iterator[T] = asJavaIterator(rdd.iterator(split))
|
||||
|
||||
// Transformations (return a new RDD)
|
||||
|
||||
/**
|
||||
* Return a new RDD by applying a function to all elements of this RDD.
|
||||
*/
|
||||
def map[R](f: JFunction[T, R]): JavaRDD[R] =
|
||||
new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
|
||||
|
||||
/**
|
||||
* Return a new RDD by applying a function to all elements of this RDD.
|
||||
*/
|
||||
def map[R](f: DoubleFunction[T]): JavaDoubleRDD =
|
||||
new JavaDoubleRDD(rdd.map(x => f(x).doubleValue()))
|
||||
|
||||
/**
|
||||
* Return a new RDD by applying a function to all elements of this RDD.
|
||||
*/
|
||||
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
|
||||
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
|
||||
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new RDD by first applying a function to all elements of this
|
||||
* RDD, and then flattening the results.
|
||||
*/
|
||||
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
|
||||
import scala.collection.JavaConverters._
|
||||
def fn = (x: T) => f.apply(x).asScala
|
||||
JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType())
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new RDD by first applying a function to all elements of this
|
||||
* RDD, and then flattening the results.
|
||||
*/
|
||||
def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
|
||||
import scala.collection.JavaConverters._
|
||||
def fn = (x: T) => f.apply(x).asScala
|
||||
new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new RDD by first applying a function to all elements of this
|
||||
* RDD, and then flattening the results.
|
||||
*/
|
||||
def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
|
||||
import scala.collection.JavaConverters._
|
||||
def fn = (x: T) => f.apply(x).asScala
|
||||
|
@ -61,22 +91,35 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new RDD by applying a function to each partition of this RDD.
|
||||
*/
|
||||
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
|
||||
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
|
||||
JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType())
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return a new RDD by applying a function to each partition of this RDD.
|
||||
*/
|
||||
def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
|
||||
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
|
||||
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new RDD by applying a function to each partition of this RDD.
|
||||
*/
|
||||
def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]):
|
||||
JavaPairRDD[K, V] = {
|
||||
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
|
||||
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an RDD created by coalescing all elements within each partition into an array.
|
||||
*/
|
||||
def glom(): JavaRDD[JList[T]] =
|
||||
new JavaRDD(rdd.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
|
||||
|
||||
|
@ -84,6 +127,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest,
|
||||
other.classManifest)
|
||||
|
||||
/**
|
||||
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
|
||||
* mapping to that key.
|
||||
*/
|
||||
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
|
||||
implicit val kcm: ClassManifest[K] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
|
||||
|
@ -92,6 +139,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
|
||||
* mapping to that key.
|
||||
*/
|
||||
def groupBy[K](f: JFunction[T, K], numSplits: Int): JavaPairRDD[K, JList[T]] = {
|
||||
implicit val kcm: ClassManifest[K] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
|
||||
|
@ -100,56 +151,114 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numSplits)(f.returnType)))(kcm, vcm)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
*/
|
||||
def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
*/
|
||||
def pipe(command: JList[String]): JavaRDD[String] =
|
||||
rdd.pipe(asScalaBuffer(command))
|
||||
|
||||
/**
|
||||
* Return an RDD created by piping elements to a forked external process.
|
||||
*/
|
||||
def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
|
||||
rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
|
||||
|
||||
// Actions (launch a job to return a value to the user program)
|
||||
|
||||
|
||||
/**
|
||||
* Applies a function f to all elements of this RDD.
|
||||
*/
|
||||
def foreach(f: VoidFunction[T]) {
|
||||
val cleanF = rdd.context.clean(f)
|
||||
rdd.foreach(cleanF)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an array that contains all of the elements in this RDD.
|
||||
*/
|
||||
def collect(): JList[T] = {
|
||||
import scala.collection.JavaConversions._
|
||||
val arr: java.util.Collection[T] = rdd.collect().toSeq
|
||||
new java.util.ArrayList(arr)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reduces the elements of this RDD using the specified associative binary operator.
|
||||
*/
|
||||
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
|
||||
|
||||
/**
|
||||
* Aggregate the elements of each partition, and then the results for all the partitions, using a
|
||||
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
|
||||
* modify t1 and return it as its result value to avoid object allocation; however, it should not
|
||||
* modify t2.
|
||||
*/
|
||||
def fold(zeroValue: T)(f: JFunction2[T, T, T]): T =
|
||||
rdd.fold(zeroValue)(f)
|
||||
|
||||
/**
|
||||
* Aggregate the elements of each partition, and then the results for all the partitions, using
|
||||
* given combine functions and a neutral "zero value". This function can return a different result
|
||||
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
|
||||
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
|
||||
* allowed to modify and return their first argument instead of creating a new U to avoid memory
|
||||
* allocation.
|
||||
*/
|
||||
def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],
|
||||
combOp: JFunction2[U, U, U]): U =
|
||||
rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType)
|
||||
|
||||
/**
|
||||
* Return the number of elements in the RDD.
|
||||
*/
|
||||
def count(): Long = rdd.count()
|
||||
|
||||
/**
|
||||
* (Experimental) Approximate version of count() that returns a potentially incomplete result
|
||||
* within a timeout, even if not all tasks have finished.
|
||||
*/
|
||||
def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
|
||||
rdd.countApprox(timeout, confidence)
|
||||
|
||||
/**
|
||||
* (Experimental) Approximate version of count() that returns a potentially incomplete result
|
||||
* within a timeout, even if not all tasks have finished.
|
||||
*/
|
||||
def countApprox(timeout: Long): PartialResult[BoundedDouble] =
|
||||
rdd.countApprox(timeout)
|
||||
|
||||
/**
|
||||
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
|
||||
* combine step happens locally on the master, equivalent to running a single reduce task.
|
||||
*/
|
||||
def countByValue(): java.util.Map[T, java.lang.Long] =
|
||||
mapAsJavaMap(rdd.countByValue().map((x => (x._1, new lang.Long(x._2)))))
|
||||
|
||||
/**
|
||||
* (Experimental) Approximate version of countByValue().
|
||||
*/
|
||||
def countByValueApprox(
|
||||
timeout: Long,
|
||||
confidence: Double
|
||||
): PartialResult[java.util.Map[T, BoundedDouble]] =
|
||||
rdd.countByValueApprox(timeout, confidence).map(mapAsJavaMap)
|
||||
|
||||
/**
|
||||
* (Experimental) Approximate version of countByValue().
|
||||
*/
|
||||
def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] =
|
||||
rdd.countByValueApprox(timeout).map(mapAsJavaMap)
|
||||
|
||||
/**
|
||||
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
|
||||
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
|
||||
* whole RDD instead.
|
||||
*/
|
||||
def take(num: Int): JList[T] = {
|
||||
import scala.collection.JavaConversions._
|
||||
val arr: java.util.Collection[T] = rdd.take(num).toSeq
|
||||
|
@ -162,9 +271,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
|
|||
new java.util.ArrayList(arr)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the first element in this RDD.
|
||||
*/
|
||||
def first(): T = rdd.first()
|
||||
|
||||
/**
|
||||
* Save this RDD as a text file, using string representations of elements.
|
||||
*/
|
||||
def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
|
||||
|
||||
/**
|
||||
* Save this RDD as a SequenceFile of serialized objects.
|
||||
*/
|
||||
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
|
||||
}
|
||||
|
|
|
@ -18,26 +18,47 @@ import scala.collection.JavaConversions
|
|||
|
||||
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
|
||||
|
||||
/**
|
||||
* @constructor Returns a new SparkContext.
|
||||
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
|
||||
* @param frameworkName A name for your job, to display on the cluster web UI
|
||||
*/
|
||||
def this(master: String, frameworkName: String) = this(new SparkContext(master, frameworkName))
|
||||
|
||||
/**
|
||||
* @constructor Returns a new SparkContext.
|
||||
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
|
||||
* @param frameworkName A name for your job, to display on the cluster web UI
|
||||
* @param sparkHome The SPARK_HOME directory on the slave nodes
|
||||
* @param jarFile A path to a local jar file containing this job
|
||||
*/
|
||||
def this(master: String, frameworkName: String, sparkHome: String, jarFile: String) =
|
||||
this(new SparkContext(master, frameworkName, sparkHome, Seq(jarFile)))
|
||||
|
||||
/**
|
||||
* @constructor Returns a new SparkContext.
|
||||
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
|
||||
* @param frameworkName A name for your job, to display on the cluster web UI
|
||||
* @param sparkHome The SPARK_HOME directory on the slave nodes
|
||||
* @param jars A set of jar files relating to this job
|
||||
*/
|
||||
def this(master: String, frameworkName: String, sparkHome: String, jars: Array[String]) =
|
||||
this(new SparkContext(master, frameworkName, sparkHome, jars.toSeq))
|
||||
|
||||
val env = sc.env
|
||||
|
||||
/** Distribute a local Scala collection to form an RDD. */
|
||||
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
|
||||
implicit val cm: ClassManifest[T] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
|
||||
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
|
||||
}
|
||||
|
||||
/** Distribute a local Scala collection to form an RDD. */
|
||||
def parallelize[T](list: java.util.List[T]): JavaRDD[T] =
|
||||
parallelize(list, sc.defaultParallelism)
|
||||
|
||||
|
||||
/** Distribute a local Scala collection to form an RDD. */
|
||||
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
|
||||
: JavaPairRDD[K, V] = {
|
||||
implicit val kcm: ClassManifest[K] =
|
||||
|
@ -47,21 +68,32 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
|||
JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
|
||||
}
|
||||
|
||||
/** Distribute a local Scala collection to form an RDD. */
|
||||
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]]): JavaPairRDD[K, V] =
|
||||
parallelizePairs(list, sc.defaultParallelism)
|
||||
|
||||
/** Distribute a local Scala collection to form an RDD. */
|
||||
def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
|
||||
JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()),
|
||||
numSlices))
|
||||
|
||||
/** Distribute a local Scala collection to form an RDD. */
|
||||
def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =
|
||||
parallelizeDoubles(list, sc.defaultParallelism)
|
||||
|
||||
/**
|
||||
* Read a text file from HDFS, a local file system (available on all nodes), or any
|
||||
* Hadoop-supported file system URI, and return it as an RDD of Strings.
|
||||
*/
|
||||
def textFile(path: String): JavaRDD[String] = sc.textFile(path)
|
||||
|
||||
/**
|
||||
* Read a text file from HDFS, a local file system (available on all nodes), or any
|
||||
* Hadoop-supported file system URI, and return it as an RDD of Strings.
|
||||
*/
|
||||
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
|
||||
|
||||
/**Get an RDD for a Hadoop SequenceFile with given key and value types */
|
||||
/**Get an RDD for a Hadoop SequenceFile with given key and value types. */
|
||||
def sequenceFile[K, V](path: String,
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
|
@ -72,6 +104,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
|||
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
|
||||
}
|
||||
|
||||
/**Get an RDD for a Hadoop SequenceFile. */
|
||||
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
|
||||
JavaPairRDD[K, V] = {
|
||||
implicit val kcm = ClassManifest.fromClass(keyClass)
|
||||
|
@ -92,6 +125,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
|||
sc.objectFile(path, minSplits)(cm)
|
||||
}
|
||||
|
||||
/**
|
||||
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
|
||||
* BytesWritable values that contain a serialized partition. This is still an experimental storage
|
||||
* format and may not be supported exactly as is in future Spark releases. It will also be pretty
|
||||
* slow if you use the default serializer (Java serialization), though the nice thing about it is
|
||||
* that there's very little effort required to save arbitrary objects.
|
||||
*/
|
||||
def objectFile[T](path: String): JavaRDD[T] = {
|
||||
implicit val cm: ClassManifest[T] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
|
||||
|
@ -180,12 +220,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
|||
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
|
||||
}
|
||||
|
||||
/** Build the union of two or more RDDs. */
|
||||
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
|
||||
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
|
||||
implicit val cm: ClassManifest[T] = first.classManifest
|
||||
sc.union(rdds)(cm)
|
||||
}
|
||||
|
||||
/** Build the union of two or more RDDs. */
|
||||
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
|
||||
: JavaPairRDD[K, V] = {
|
||||
val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
|
||||
|
@ -195,26 +237,49 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
|||
new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
|
||||
}
|
||||
|
||||
/** Build the union of two or more RDDs. */
|
||||
override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
|
||||
val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
|
||||
new JavaDoubleRDD(sc.union(rdds))
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an [[spark.Accumulator]] integer variable, which tasks can "add" values
|
||||
* to using the `+=` method. Only the master can access the accumulator's `value`.
|
||||
*/
|
||||
def intAccumulator(initialValue: Int): Accumulator[Int] =
|
||||
sc.accumulator(initialValue)(IntAccumulatorParam)
|
||||
|
||||
/**
|
||||
* Create an [[spark.Accumulator]] double variable, which tasks can "add" values
|
||||
* to using the `+=` method. Only the master can access the accumulator's `value`.
|
||||
*/
|
||||
def doubleAccumulator(initialValue: Double): Accumulator[Double] =
|
||||
sc.accumulator(initialValue)(DoubleAccumulatorParam)
|
||||
|
||||
/**
|
||||
* Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
|
||||
* to using the `+=` method. Only the master can access the accumulator's `value`.
|
||||
*/
|
||||
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
|
||||
sc.accumulator(initialValue)(accumulatorParam)
|
||||
|
||||
/**
|
||||
* Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
|
||||
* reading it in distributed functions. The variable will be sent to each cluster only once.
|
||||
*/
|
||||
def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value)
|
||||
|
||||
/** Shut down the SparkContext. */
|
||||
def stop() {
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Spark's home location from either a value set through the constructor,
|
||||
* or the spark.home Java property, or the SPARK_HOME environment variable
|
||||
* (in that order of preference). If neither of these is set, return None.
|
||||
*/
|
||||
def getSparkHome(): Option[String] = sc.getSparkHome()
|
||||
}
|
||||
|
||||
|
|
|
@ -141,12 +141,12 @@ private[spark] class Executor extends Logging {
|
|||
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
|
||||
// Fetch missing dependencies
|
||||
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
|
||||
logInfo("Fetching " + name)
|
||||
logInfo("Fetching " + name + " with timestamp " + timestamp)
|
||||
Utils.fetchFile(name, new File("."))
|
||||
currentFiles(name) = timestamp
|
||||
}
|
||||
for ((name, timestamp) <- newJars if currentFiles.getOrElse(name, -1L) < timestamp) {
|
||||
logInfo("Fetching " + name)
|
||||
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
|
||||
logInfo("Fetching " + name + " with timestamp " + timestamp)
|
||||
Utils.fetchFile(name, new File("."))
|
||||
currentJars(name) = timestamp
|
||||
// Add it to our class loader
|
||||
|
|
|
@ -109,12 +109,12 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
|
|||
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
|
||||
// Fetch missing dependencies
|
||||
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
|
||||
logInfo("Fetching " + name)
|
||||
logInfo("Fetching " + name + " with timestamp " + timestamp)
|
||||
Utils.fetchFile(name, new File("."))
|
||||
currentFiles(name) = timestamp
|
||||
}
|
||||
for ((name, timestamp) <- newJars if currentFiles.getOrElse(name, -1L) < timestamp) {
|
||||
logInfo("Fetching " + name)
|
||||
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
|
||||
logInfo("Fetching " + name + " with timestamp " + timestamp)
|
||||
Utils.fetchFile(name, new File("."))
|
||||
currentJars(name) = timestamp
|
||||
// Add it to our class loader
|
||||
|
|
|
@ -237,7 +237,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
diskStore.getValues(blockId) match {
|
||||
case Some(iterator) =>
|
||||
// Put the block back in memory before returning it
|
||||
memoryStore.putValues(blockId, iterator, level, true).data match {
|
||||
// TODO: Consider creating a putValues that also takes in a iterator ?
|
||||
val elements = new ArrayBuffer[Any]
|
||||
elements ++= iterator
|
||||
memoryStore.putValues(blockId, elements, level, true).data match {
|
||||
case Left(iterator2) =>
|
||||
return Some(iterator2)
|
||||
case _ =>
|
||||
|
@ -529,11 +532,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
}
|
||||
}
|
||||
|
||||
def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
|
||||
: Long = {
|
||||
val elements = new ArrayBuffer[Any]
|
||||
elements ++= values
|
||||
put(blockId, elements, level, tellMaster)
|
||||
}
|
||||
|
||||
/**
|
||||
* Put a new block of values to the block manager. Returns its (estimated) size in bytes.
|
||||
*/
|
||||
def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true)
|
||||
: Long = {
|
||||
def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
|
||||
tellMaster: Boolean = true) : Long = {
|
||||
|
||||
if (blockId == null) {
|
||||
throw new IllegalArgumentException("Block Id is null")
|
||||
|
@ -766,7 +776,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
|
||||
* store reaches its limit and needs to free up space.
|
||||
*/
|
||||
def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
|
||||
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
|
||||
logInfo("Dropping block " + blockId + " from memory")
|
||||
locker.getLock(blockId).synchronized {
|
||||
val info = blockInfo.get(blockId)
|
||||
|
@ -774,8 +784,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
if (level.useDisk && !diskStore.contains(blockId)) {
|
||||
logInfo("Writing block " + blockId + " to disk")
|
||||
data match {
|
||||
case Left(iterator) =>
|
||||
diskStore.putValues(blockId, iterator, level, false)
|
||||
case Left(elements) =>
|
||||
diskStore.putValues(blockId, elements, level, false)
|
||||
case Right(bytes) =>
|
||||
diskStore.putBytes(blockId, bytes, level)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package spark.storage
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import spark.Logging
|
||||
|
||||
|
@ -18,8 +19,8 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
|
|||
* @return a PutResult that contains the size of the data, as well as the values put if
|
||||
* returnValues is true (if not, the result's data field can be null)
|
||||
*/
|
||||
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean)
|
||||
: PutResult
|
||||
def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
|
||||
returnValues: Boolean) : PutResult
|
||||
|
||||
/**
|
||||
* Return the size of a block in bytes.
|
||||
|
|
|
@ -3,11 +3,15 @@ package spark.storage
|
|||
import java.nio.ByteBuffer
|
||||
import java.io.{File, FileOutputStream, RandomAccessFile}
|
||||
import java.nio.channels.FileChannel.MapMode
|
||||
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
|
||||
import java.util.{Random, Date}
|
||||
import spark.Utils
|
||||
import java.text.SimpleDateFormat
|
||||
|
||||
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import spark.Utils
|
||||
|
||||
/**
|
||||
* Stores BlockManager blocks on disk.
|
||||
*/
|
||||
|
@ -45,7 +49,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
|
||||
override def putValues(
|
||||
blockId: String,
|
||||
values: Iterator[Any],
|
||||
values: ArrayBuffer[Any],
|
||||
level: StorageLevel,
|
||||
returnValues: Boolean)
|
||||
: PutResult = {
|
||||
|
@ -56,7 +60,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
val fileOut = blockManager.wrapForCompression(blockId,
|
||||
new FastBufferedOutputStream(new FileOutputStream(file)))
|
||||
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
|
||||
objOut.writeAll(values)
|
||||
objOut.writeAll(values.iterator)
|
||||
objOut.close()
|
||||
val length = file.length()
|
||||
logDebug("Block %s stored as %s file on disk in %d ms".format(
|
||||
|
|
|
@ -46,19 +46,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
|||
|
||||
override def putValues(
|
||||
blockId: String,
|
||||
values: Iterator[Any],
|
||||
values: ArrayBuffer[Any],
|
||||
level: StorageLevel,
|
||||
returnValues: Boolean)
|
||||
: PutResult = {
|
||||
|
||||
if (level.deserialized) {
|
||||
val elements = new ArrayBuffer[Any]
|
||||
elements ++= values
|
||||
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
|
||||
tryToPut(blockId, elements, sizeEstimate, true)
|
||||
PutResult(sizeEstimate, Left(elements.iterator))
|
||||
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
|
||||
tryToPut(blockId, values, sizeEstimate, true)
|
||||
PutResult(sizeEstimate, Left(values.iterator))
|
||||
} else {
|
||||
val bytes = blockManager.dataSerialize(blockId, values)
|
||||
val bytes = blockManager.dataSerialize(blockId, values.iterator)
|
||||
tryToPut(blockId, bytes, bytes.limit, false)
|
||||
PutResult(bytes.limit(), Right(bytes))
|
||||
}
|
||||
|
@ -146,7 +144,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
|||
// Tell the block manager that we couldn't put it in memory so that it can drop it to
|
||||
// disk if the block allows disk storage.
|
||||
val data = if (deserialized) {
|
||||
Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
|
||||
Left(value.asInstanceOf[ArrayBuffer[Any]])
|
||||
} else {
|
||||
Right(value.asInstanceOf[ByteBuffer].duplicate())
|
||||
}
|
||||
|
@ -199,7 +197,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
|||
for (blockId <- selectedBlocks) {
|
||||
val entry = entries.get(blockId)
|
||||
val data = if (entry.deserialized) {
|
||||
Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
|
||||
Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
|
||||
} else {
|
||||
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ class CacheTrackerSuite extends FunSuite {
|
|||
} catch {
|
||||
case e: Exception =>
|
||||
throw new SparkException("Error communicating with actor", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("CacheTrackerActor slave initialization & cache status") {
|
||||
|
|
|
@ -27,6 +27,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
|
|||
sc = null
|
||||
}
|
||||
System.clearProperty("spark.reducer.maxMbInFlight")
|
||||
System.clearProperty("spark.storage.memoryFraction")
|
||||
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
|
||||
System.clearProperty("spark.master.port")
|
||||
}
|
||||
|
@ -156,4 +157,13 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
|
|||
assert(data.count() === 1000)
|
||||
assert(data.count() === 1000)
|
||||
}
|
||||
|
||||
test("compute without caching with low memory") {
|
||||
System.setProperty("spark.storage.memoryFraction", "0.0001")
|
||||
sc = new SparkContext(clusterUrl, "test")
|
||||
val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY)
|
||||
assert(data.count() === 4000000)
|
||||
assert(data.count() === 4000000)
|
||||
assert(data.count() === 4000000)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -268,9 +268,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
|||
val list1 = List(new Array[Byte](200), new Array[Byte](200))
|
||||
val list2 = List(new Array[Byte](200), new Array[Byte](200))
|
||||
val list3 = List(new Array[Byte](200), new Array[Byte](200))
|
||||
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
|
||||
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY)
|
||||
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY)
|
||||
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
|
||||
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
|
||||
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
|
||||
assert(store.get("list2") != None, "list2 was not in store")
|
||||
assert(store.get("list2").get.size == 2)
|
||||
assert(store.get("list3") != None, "list3 was not in store")
|
||||
|
@ -279,7 +279,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
|||
assert(store.get("list2") != None, "list2 was not in store")
|
||||
assert(store.get("list2").get.size == 2)
|
||||
// At this point list2 was gotten last, so LRU will getSingle rid of list3
|
||||
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
|
||||
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
|
||||
assert(store.get("list1") != None, "list1 was not in store")
|
||||
assert(store.get("list1").get.size == 2)
|
||||
assert(store.get("list2") != None, "list2 was not in store")
|
||||
|
@ -294,9 +294,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
|||
val list3 = List(new Array[Byte](200), new Array[Byte](200))
|
||||
val list4 = List(new Array[Byte](200), new Array[Byte](200))
|
||||
// First store list1 and list2, both in memory, and list3, on disk only
|
||||
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER)
|
||||
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER)
|
||||
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY)
|
||||
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
|
||||
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
|
||||
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
|
||||
// At this point LRU should not kick in because list3 is only on disk
|
||||
assert(store.get("list1") != None, "list2 was not in store")
|
||||
assert(store.get("list1").get.size === 2)
|
||||
|
@ -311,7 +311,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
|||
assert(store.get("list3") != None, "list1 was not in store")
|
||||
assert(store.get("list3").get.size === 2)
|
||||
// Now let's add in list4, which uses both disk and memory; list1 should drop out
|
||||
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER)
|
||||
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
|
||||
assert(store.get("list1") === None, "list1 was in store")
|
||||
assert(store.get("list2") != None, "list3 was not in store")
|
||||
assert(store.get("list2").get.size === 2)
|
||||
|
|
|
@ -3,6 +3,6 @@ markdown: kramdown
|
|||
|
||||
# These allow the documentation to be updated with nerw releases
|
||||
# of Spark, Scala, and Mesos.
|
||||
SPARK_VERSION: 0.6.0
|
||||
SPARK_VERSION: 0.6.0-SNAPSHOT
|
||||
SCALA_VERSION: 2.9.2
|
||||
MESOS_VERSION: 0.9.0-incubating
|
||||
|
|
|
@ -14,8 +14,9 @@ This guide shows the programming model and features of Bagel by walking through
|
|||
To write a Bagel application, you will need to add Spark, its dependencies, and Bagel to your CLASSPATH:
|
||||
|
||||
1. Run `sbt/sbt update` to fetch Spark's dependencies, if you haven't already done so.
|
||||
2. Run `sbt/sbt assembly` to build Spark and its dependencies into one JAR (`core/target/scala_2.8.1/Spark Core-assembly-0.3-SNAPSHOT.jar`) and Bagel into a second JAR (`bagel/target/scala_2.8.1/Bagel-assembly-0.3-SNAPSHOT.jar`).
|
||||
3. Add these two JARs to your CLASSPATH.
|
||||
2. Run `sbt/sbt assembly` to build Spark and its dependencies into one JAR (`core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar`)
|
||||
3. Run `sbt/sbt package` build the Bagel JAR (`bagel/target/scala_{{site.SCALA_VERSION}}/spark-bagel_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar`).
|
||||
4. Add these two JARs to your CLASSPATH.
|
||||
|
||||
## Programming Model
|
||||
|
||||
|
|
|
@ -101,13 +101,9 @@ res9: Long = 15
|
|||
It may seem silly to use a Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
|
||||
|
||||
# A Standalone Job in Scala
|
||||
Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you using other build systems, please reference the Spark assembly JAR in the developer guide. The first step is to publish Spark to our local Ivy/Maven repositories. From the Spark directory:
|
||||
Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you using other build systems, consider using the Spark assembly JAR described in the developer guide.
|
||||
|
||||
{% highlight bash %}
|
||||
$ sbt/sbt publish-local
|
||||
{% endhighlight %}
|
||||
|
||||
Next, we'll create a very simple Spark job in Scala. So simple, in fact, that it's named `SimpleJob.scala`:
|
||||
We'll create a very simple Spark job in Scala. So simple, in fact, that it's named `SimpleJob.scala`:
|
||||
|
||||
{% highlight scala %}
|
||||
/*** SimpleJob.scala ***/
|
||||
|
@ -159,12 +155,9 @@ Lines with a: 8422, Lines with b: 1836
|
|||
This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS.
|
||||
|
||||
# A Standalone Job In Java
|
||||
Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you using other build systems, please reference the Spark assembly JAR in the developer guide. The first step is to publish Spark to our local Ivy/Maven repositories. From the Spark directory:
|
||||
Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you using other build systems, consider using the Spark assembly JAR described in the developer guide.
|
||||
|
||||
{% highlight bash %}
|
||||
$ sbt/sbt publish-local
|
||||
{% endhighlight %}
|
||||
Next, we'll create a very simple Spark job, `SimpleJob.java`:
|
||||
We'll create a very simple Spark job, `SimpleJob.java`:
|
||||
|
||||
{% highlight java %}
|
||||
/*** SimpleJob.java ***/
|
||||
|
|
|
@ -19,7 +19,7 @@ branch of Spark, called `yarn`, which you can do as follows:
|
|||
- In order to distribute Spark within the cluster, it must be packaged into a single JAR file. This can be done by running `sbt/sbt assembly`
|
||||
- Your application code must be packaged into a separate JAR file.
|
||||
|
||||
If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}-SNAPSHOT.jar` file can be generated by running `sbt/sbt package`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.
|
||||
If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}` file can be generated by running `sbt/sbt package`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.
|
||||
|
||||
# Launching Spark on YARN
|
||||
|
||||
|
@ -35,8 +35,8 @@ The command to launch the YARN Client is as follows:
|
|||
|
||||
For example:
|
||||
|
||||
SPARK_JAR=./core/target/spark-core-assembly-{{site.SPARK_VERSION}}-SNAPSHOT.jar ./run spark.deploy.yarn.Client \
|
||||
--jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}-SNAPSHOT.jar \
|
||||
SPARK_JAR=./core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \
|
||||
--jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \
|
||||
--class spark.examples.SparkPi \
|
||||
--args standalone \
|
||||
--num-workers 3 \
|
||||
|
|
|
@ -17,7 +17,13 @@ This guide shows each of these features and walks through some samples. It assum
|
|||
|
||||
# Linking with Spark
|
||||
|
||||
To write a Spark application, you will need to add both Spark and its dependencies to your CLASSPATH. The easiest way to do this is to run `sbt/sbt assembly` to build both Spark and its dependencies into one JAR (`core/target/spark-core-assembly-0.6.0.jar`), then add this to your CLASSPATH. Alternatively, you can publish Spark to the Maven cache on your machine using `sbt/sbt publish-local`. It will be an artifact called `spark-core` under the organization `org.spark-project`.
|
||||
To write a Spark application, you will need to add both Spark and its dependencies to your CLASSPATH. If you use sbt or Maven, Spark is available through Maven Central at:
|
||||
|
||||
groupId = org.spark_project
|
||||
artifactId = spark-core_{{site.SCALA_VERSION}}
|
||||
version = {{site.SPARK_VERSION}}
|
||||
|
||||
For other build systems or environments, you can run `sbt/sbt assembly` to build both Spark and its dependencies into one JAR (`core/target/spark-core-assembly-0.6.0.jar`), then add this to your CLASSPATH.
|
||||
|
||||
In addition, you'll need to import some Spark classes and implicit conversions. Add the following lines at the top of your program:
|
||||
|
||||
|
|
|
@ -4,13 +4,15 @@ import Keys._
|
|||
import sbtassembly.Plugin._
|
||||
import AssemblyKeys._
|
||||
import twirl.sbt.TwirlPlugin._
|
||||
// For Sonatype publishing
|
||||
// import com.jsuereth.pgp.sbtplugin.PgpKeys._
|
||||
|
||||
object SparkBuild extends Build {
|
||||
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
|
||||
// "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop.
|
||||
val HADOOP_VERSION = "0.20.205.0"
|
||||
|
||||
lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel)
|
||||
lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel)
|
||||
|
||||
lazy val core = Project("core", file("core"), settings = coreSettings)
|
||||
|
||||
|
@ -33,7 +35,51 @@ object SparkBuild extends Build {
|
|||
retrieveManaged := true,
|
||||
transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
|
||||
testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))),
|
||||
publishTo <<= baseDirectory { base => Some(Resolver.file("Local", base / "target" / "maven" asFile)(Patterns(true, Resolver.mavenStyleBasePattern))) },
|
||||
|
||||
/* For Sonatype publishing
|
||||
resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
|
||||
"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
|
||||
|
||||
publishMavenStyle := true,
|
||||
|
||||
useGpg in Global := true,
|
||||
|
||||
pomExtra := (
|
||||
<url>http://spark-project.org/</url>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>BSD License</name>
|
||||
<url>https://github.com/mesos/spark/blob/master/LICENSE</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
<scm>
|
||||
<connection>scm:git:git@github.com:mesos/spark.git</connection>
|
||||
<url>scm:git:git@github.com:mesos/spark.git</url>
|
||||
</scm>
|
||||
<developers>
|
||||
<developer>
|
||||
<id>matei</id>
|
||||
<name>Matei Zaharia</name>
|
||||
<email>matei.zaharia@gmail.com</email>
|
||||
<url>http://www.cs.berkeley.edu/~matei</url>
|
||||
<organization>U.C. Berkeley Computer Science</organization>
|
||||
<organizationUrl>http://www.cs.berkeley.edu/</organizationUrl>
|
||||
</developer>
|
||||
</developers>
|
||||
),
|
||||
|
||||
publishTo <<= version { (v: String) =>
|
||||
val nexus = "https://oss.sonatype.org/"
|
||||
if (v.trim.endsWith("SNAPSHOT"))
|
||||
Some("sonatype-snapshots" at nexus + "content/repositories/snapshots")
|
||||
else
|
||||
Some("sonatype-staging" at nexus + "service/local/staging/deploy/maven2")
|
||||
},
|
||||
|
||||
credentials += Credentials(Path.userHome / ".sbt" / "sonatype.credentials"),
|
||||
*/
|
||||
|
||||
libraryDependencies ++= Seq(
|
||||
"org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
|
||||
"org.scalatest" %% "scalatest" % "1.6.1" % "test",
|
||||
|
@ -64,6 +110,7 @@ object SparkBuild extends Build {
|
|||
"Spray Repository" at "http://repo.spray.cc/",
|
||||
"Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/"
|
||||
),
|
||||
|
||||
libraryDependencies ++= Seq(
|
||||
"com.google.guava" % "guava" % "11.0.1",
|
||||
"log4j" % "log4j" % "1.2.16",
|
||||
|
@ -85,6 +132,10 @@ object SparkBuild extends Build {
|
|||
)
|
||||
) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings
|
||||
|
||||
def rootSettings = sharedSettings ++ Seq(
|
||||
publish := {}
|
||||
)
|
||||
|
||||
def replSettings = sharedSettings ++ Seq(
|
||||
name := "spark-repl",
|
||||
libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _)
|
||||
|
@ -103,5 +154,4 @@ object SparkBuild extends Build {
|
|||
case _ => MergeStrategy.first
|
||||
}
|
||||
)
|
||||
|
||||
}
|
||||
|
|
|
@ -11,3 +11,7 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1")
|
|||
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0")
|
||||
|
||||
addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2")
|
||||
|
||||
// For Sonatype publishing
|
||||
// resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
|
||||
// addSbtPlugin("com.jsuereth" % "xsbt-gpg-plugin" % "0.6")
|
||||
|
|
Loading…
Reference in a new issue