Merge pull request #473 from stephenh/subtract

Add RDD.subtract.
This commit is contained in:
Matei Zaharia 2013-02-23 23:25:27 -08:00
commit beb79dbbfd
6 changed files with 218 additions and 1 deletions

View file

@ -30,6 +30,7 @@ import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithIndexRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
import spark.storage.StorageLevel
@ -393,6 +394,26 @@ abstract class RDD[T: ClassManifest](
filter(f.isDefinedAt).map(f)
}
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: RDD[T]): RDD[T] =
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
subtract(other, new HashPartitioner(numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: RDD[T], p: Partitioner): RDD[T] = new SubtractedRDD[T](this, other, p)
/**
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/

View file

@ -6,8 +6,8 @@ import spark.api.java.function.{Function => JFunction}
import spark.util.StatCounter
import spark.partial.{BoundedDouble, PartialResult}
import spark.storage.StorageLevel
import java.lang.Double
import spark.Partitioner
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
@ -57,6 +57,27 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
*/
def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: JavaDoubleRDD): JavaDoubleRDD =
fromRDD(srdd.subtract(other))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaDoubleRDD, numPartitions: Int): JavaDoubleRDD =
fromRDD(srdd.subtract(other, numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaDoubleRDD, p: Partitioner): JavaDoubleRDD =
fromRDD(srdd.subtract(other, p))
/**
* Return a sampled subset of this RDD.
*/

View file

@ -181,6 +181,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
fromRDD(rdd.subtract(other))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaPairRDD[K, V], numPartitions: Int): JavaPairRDD[K, V] =
fromRDD(rdd.subtract(other, numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] =
fromRDD(rdd.subtract(other, p))
/**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
* is true, Spark will group values of the same key together on the map side before the

View file

@ -55,6 +55,26 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.subtract(other))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaRDD[T], numPartitions: Int): JavaRDD[T] =
wrapRDD(rdd.subtract(other, numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
}
object JavaRDD {

View file

@ -0,0 +1,108 @@
package spark.rdd
import java.util.{HashSet => JHashSet}
import scala.collection.JavaConversions._
import spark.RDD
import spark.Partitioner
import spark.Dependency
import spark.TaskContext
import spark.Partition
import spark.SparkEnv
import spark.ShuffleDependency
import spark.OneToOneDependency
/**
* An optimized version of cogroup for set difference/subtraction.
*
* It is possible to implement this operation with just `cogroup`, but
* that is less efficient because all of the entries from `rdd2`, for
* both matching and non-matching values in `rdd1`, are kept in the
* JHashMap until the end.
*
* With this implementation, only the entries from `rdd1` are kept in-memory,
* and the entries from `rdd2` are essentially streamed, as we only need to
* touch each once to decide if the value needs to be removed.
*
* This is particularly helpful when `rdd1` is much smaller than `rdd2`, as
* you can use `rdd1`'s partitioner/partition size and not worry about running
* out of memory because of the size of `rdd2`.
*/
private[spark] class SubtractedRDD[T: ClassManifest](
@transient var rdd1: RDD[T],
@transient var rdd2: RDD[T],
part: Partitioner) extends RDD[T](rdd1.context, Nil) {
override def getDependencies: Seq[Dependency[_]] = {
Seq(rdd1, rdd2).map { rdd =>
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
val mapSideCombinedRDD = rdd.mapPartitions(i => {
val set = new JHashSet[T]()
while (i.hasNext) {
set.add(i.next)
}
set.iterator
}, true)
// ShuffleDependency requires a tuple (k, v), which it will partition by k.
// We need this to partition to map to the same place as the k for
// OneToOneDependency, which means:
// - for already-tupled RDD[(A, B)], into getPartition(a)
// - for non-tupled RDD[C], into getPartition(c)
val part2 = new Partitioner() {
def numPartitions = part.numPartitions
def getPartition(key: Any) = key match {
case (k, v) => part.getPartition(k)
case k => part.getPartition(k)
}
}
new ShuffleDependency(mapSideCombinedRDD.map((_, null)), part2)
}
}
}
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.size) {
// Each CoGroupPartition will depend on rdd1 and rdd2
array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
dependencies(j) match {
case s: ShuffleDependency[_, _] =>
new ShuffleCoGroupSplitDep(s.shuffleId)
case _ =>
new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
}
}.toList)
}
array
}
override val partitioner = Some(part)
override def compute(p: Partition, context: TaskContext): Iterator[T] = {
val partition = p.asInstanceOf[CoGroupPartition]
val set = new JHashSet[T]
def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
for (k <- rdd.iterator(itsSplit, context))
op(k.asInstanceOf[T])
case ShuffleCoGroupSplitDep(shuffleId) =>
for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index))
op(k.asInstanceOf[T])
}
// the first dep is rdd1; add all keys to the set
integrate(partition.deps(0), set.add)
// the second dep is rdd2; remove all of its keys from the set
integrate(partition.deps(1), set.remove)
set.iterator
}
override def clearDependencies() {
super.clearDependencies()
rdd1 = null
rdd2 = null
}
}

View file

@ -234,6 +234,32 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
assert(rdd.keys.collect().toList === List(1, 2))
assert(rdd.values.collect().toList === List("a", "b"))
}
test("subtract") {
sc = new SparkContext("local", "test")
val a = sc.parallelize(Array(1, 2, 3), 2)
val b = sc.parallelize(Array(2, 3, 4), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set(1))
assert(c.partitions.size === a.partitions.size)
}
test("subtract with narrow dependency") {
sc = new SparkContext("local", "test")
// use a deterministic partitioner
val p = new Partitioner() {
def numPartitions = 5
def getPartition(key: Any) = key.asInstanceOf[Int]
}
// partitionBy so we have a narrow dependency
val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList)
// more splits/no partitioner so a shuffle dependency
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set((1, "a"), (3, "c")))
assert(c.partitioner.get === p)
}
}
object ShuffleSuite {