[SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference
Adds a `Graph#minus` method which will return only unique `VertexId`'s from the calling `VertexRDD`. To demonstrate a basic example with pseudocode: ``` Set((0L,0),(1L,1)).minus(Set((1L,1),(2L,2))) > Set((0L,0)) ``` Author: Brennon York <brennon.york@capitalone.com> Closes #5175 from brennonyork/SPARK-6510 and squashes the following commits: 248d5c8 [Brennon York] added minus(VertexRDD[VD]) method to avoid createUsingIndex and updated the mask operations to simplify with andNot call 3fb7cce [Brennon York] updated graphx doc to reflect the addition of minus method 6575d92 [Brennon York] updated mima exclude aaa030b [Brennon York] completed graph#minus functionality 7227c0f [Brennon York] beginning work on minus functionality
This commit is contained in:
parent
aad0032276
commit
39fb579683
|
@ -899,6 +899,8 @@ class VertexRDD[VD] extends RDD[(VertexID, VD)] {
|
|||
// Transform the values without changing the ids (preserves the internal index)
|
||||
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
|
||||
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
|
||||
// Show only vertices unique to this set based on their VertexId's
|
||||
def minus(other: RDD[(VertexId, VD)])
|
||||
// Remove vertices from this set that appear in the other set
|
||||
def diff(other: VertexRDD[VD]): VertexRDD[VD]
|
||||
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
|
||||
|
|
|
@ -121,6 +121,22 @@ abstract class VertexRDD[VD](
|
|||
*/
|
||||
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
|
||||
|
||||
/**
|
||||
* For each VertexId present in both `this` and `other`, minus will act as a set difference
|
||||
* operation returning only those unique VertexId's present in `this`.
|
||||
*
|
||||
* @param other an RDD to run the set operation against
|
||||
*/
|
||||
def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
|
||||
|
||||
/**
|
||||
* For each VertexId present in both `this` and `other`, minus will act as a set difference
|
||||
* operation returning only those unique VertexId's present in `this`.
|
||||
*
|
||||
* @param other a VertexRDD to run the set operation against
|
||||
*/
|
||||
def minus(other: VertexRDD[VD]): VertexRDD[VD]
|
||||
|
||||
/**
|
||||
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
|
||||
* differing values; for values that are different, keeps the values from `other`. This is
|
||||
|
|
|
@ -88,6 +88,21 @@ private[graphx] abstract class VertexPartitionBaseOps
|
|||
this.withMask(newMask)
|
||||
}
|
||||
|
||||
/** Hides the VertexId's that are the same between `this` and `other`. */
|
||||
def minus(other: Self[VD]): Self[VD] = {
|
||||
if (self.index != other.index) {
|
||||
logWarning("Minus operations on two VertexPartitions with different indexes is slow.")
|
||||
minus(createUsingIndex(other.iterator))
|
||||
} else {
|
||||
self.withMask(self.mask.andNot(other.mask))
|
||||
}
|
||||
}
|
||||
|
||||
/** Hides the VertexId's that are the same between `this` and `other`. */
|
||||
def minus(other: Iterator[(VertexId, VD)]): Self[VD] = {
|
||||
minus(createUsingIndex(other))
|
||||
}
|
||||
|
||||
/**
|
||||
* Hides vertices that are the same between this and other. For vertices that are different, keeps
|
||||
* the values from `other`. The indices of `this` and `other` must be the same.
|
||||
|
|
|
@ -103,6 +103,31 @@ class VertexRDDImpl[VD] private[graphx] (
|
|||
override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
|
||||
this.mapVertexPartitions(_.map(f))
|
||||
|
||||
override def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
|
||||
minus(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
|
||||
}
|
||||
|
||||
override def minus (other: VertexRDD[VD]): VertexRDD[VD] = {
|
||||
other match {
|
||||
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
|
||||
this.withPartitionsRDD[VD](
|
||||
partitionsRDD.zipPartitions(
|
||||
other.partitionsRDD, preservesPartitioning = true) {
|
||||
(thisIter, otherIter) =>
|
||||
val thisPart = thisIter.next()
|
||||
val otherPart = otherIter.next()
|
||||
Iterator(thisPart.minus(otherPart))
|
||||
})
|
||||
case _ =>
|
||||
this.withPartitionsRDD[VD](
|
||||
partitionsRDD.zipPartitions(
|
||||
other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
|
||||
(partIter, msgs) => partIter.map(_.minus(msgs))
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
|
||||
diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
|
||||
}
|
||||
|
|
|
@ -47,6 +47,35 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
|
|||
}
|
||||
}
|
||||
|
||||
test("minus") {
|
||||
withSpark { sc =>
|
||||
val vertexA = VertexRDD(sc.parallelize(0 until 75, 2).map(i => (i.toLong, 0))).cache()
|
||||
val vertexB = VertexRDD(sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1))).cache()
|
||||
val vertexC = vertexA.minus(vertexB)
|
||||
assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
test("minus with RDD[(VertexId, VD)]") {
|
||||
withSpark { sc =>
|
||||
val vertexA = VertexRDD(sc.parallelize(0 until 75, 2).map(i => (i.toLong, 0))).cache()
|
||||
val vertexB: RDD[(VertexId, Int)] =
|
||||
sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1)).cache()
|
||||
val vertexC = vertexA.minus(vertexB)
|
||||
assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
test("minus with non-equal number of partitions") {
|
||||
withSpark { sc =>
|
||||
val vertexA = VertexRDD(sc.parallelize(0 until 75, 5).map(i => (i.toLong, 0)))
|
||||
val vertexB = VertexRDD(sc.parallelize(50 until 100, 2).map(i => (i.toLong, 1)))
|
||||
assert(vertexA.partitions.size != vertexB.partitions.size)
|
||||
val vertexC = vertexA.minus(vertexB)
|
||||
assert(vertexC.map(_._1).collect.toSet === (0 until 50).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
test("diff") {
|
||||
withSpark { sc =>
|
||||
val n = 100
|
||||
|
@ -71,7 +100,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
|
|||
}
|
||||
}
|
||||
|
||||
test("diff vertices with the non-equal number of partitions") {
|
||||
test("diff vertices with non-equal number of partitions") {
|
||||
withSpark { sc =>
|
||||
val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))
|
||||
val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => (i.toLong, 1)))
|
||||
|
@ -96,7 +125,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
|
|||
}
|
||||
}
|
||||
|
||||
test("leftJoin vertices with the non-equal number of partitions") {
|
||||
test("leftJoin vertices with non-equal number of partitions") {
|
||||
withSpark { sc =>
|
||||
val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => (i.toLong, 1)))
|
||||
val vertexB = VertexRDD(
|
||||
|
|
|
@ -51,6 +51,9 @@ object MimaExcludes {
|
|||
"org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem](
|
||||
"org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast")
|
||||
) ++ Seq(
|
||||
// SPARK-6510 Add a Graph#minus method acting as Set#difference
|
||||
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
|
||||
)
|
||||
|
||||
case v if v.startsWith("1.3") =>
|
||||
|
|
Loading…
Reference in a new issue