Add subtract to JavaRDD, JavaDoubleRDD, and JavaPairRDD.

This commit is contained in:
Stephen Haberman 2013-02-24 00:27:53 -06:00
parent f442e7d83c
commit 37c7a71f9c
3 changed files with 63 additions and 1 deletions

View file

@ -6,8 +6,8 @@ import spark.api.java.function.{Function => JFunction}
import spark.util.StatCounter
import spark.partial.{BoundedDouble, PartialResult}
import spark.storage.StorageLevel
import java.lang.Double
import spark.Partitioner
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
@ -57,6 +57,27 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
*/
def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: JavaDoubleRDD): JavaDoubleRDD =
fromRDD(srdd.subtract(other))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaDoubleRDD, numPartitions: Int): JavaDoubleRDD =
fromRDD(srdd.subtract(other, numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaDoubleRDD, p: Partitioner): JavaDoubleRDD =
fromRDD(srdd.subtract(other, p))
/**
* Return a sampled subset of this RDD.
*/

View file

@ -181,6 +181,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
fromRDD(rdd.subtract(other))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaPairRDD[K, V], numPartitions: Int): JavaPairRDD[K, V] =
fromRDD(rdd.subtract(other, numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] =
fromRDD(rdd.subtract(other, p))
/**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
* is true, Spark will group values of the same key together on the map side before the

View file

@ -55,6 +55,26 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.subtract(other))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaRDD[T], numPartitions: Int): JavaRDD[T] =
wrapRDD(rdd.subtract(other, numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
}
object JavaRDD {