[SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD

Changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]. This change maintains backwards compatibility and better unifies the VertexRDD methods to match each other.

Author: Brennon York <brennon.york@capitalone.com>

Closes #4733 from brennonyork/SPARK-5922 and squashes the following commits:

e800f08 [Brennon York] fixed merge conflicts
b9274af [Brennon York] fixed merge conflicts
f86375c [Brennon York] fixed minor include line
398ddb4 [Brennon York] fixed merge conflicts
aac1810 [Brennon York] updated to aggregateUsingIndex and added test to ensure that method works properly
2af0b88 [Brennon York] removed deprecation line
753c963 [Brennon York] fixed merge conflicts and set preference to use the diff(other: VertexRDD[VD]) method
2c678c6 [Brennon York] added mima exclude to exclude new public diff method from VertexRDD
93186f3 [Brennon York] added back the original diff method to sustain binary compatibility
f18356e [Brennon York] changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]
This commit is contained in:
Brennon York 2015-03-16 01:06:26 -07:00 committed by Ankur Dave
parent aa6536fa3c
commit 45f4c66122
4 changed files with 29 additions and 0 deletions

View file

@ -121,6 +121,15 @@ abstract class VertexRDD[VD](
*/
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
/**
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
* differing values; for values that are different, keeps the values from `other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.
*
* @param other the other RDD[(VertexId, VD)] with which to diff against.
*/
def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
/**
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
* differing values; for values that are different, keeps the values from `other`. This is

View file

@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] (
override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
}
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
val otherPartition = other match {
case other: VertexRDD[_] if this.partitioner == other.partitioner =>

View file

@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite
import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
class VertexRDDSuite extends FunSuite with LocalSparkContext {
@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
}
}
test("diff with RDD[(VertexId, VD)]") {
withSpark { sc =>
val n = 100
val verts = vertices(sc, n).cache()
val flipEvens: RDD[(VertexId, Int)] =
sc.parallelize(0L to 100L)
.map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache()
// diff should keep only the changed vertices
assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
}
}
test("diff vertices with the non-equal number of partitions") {
withSpark { sc =>
val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))

View file

@ -181,6 +181,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
) ++ Seq(
// SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")
)
case v if v.startsWith("1.2") =>