Reimplement Graph.mask using innerJoin
This commit is contained in:
parent
9193a8f788
commit
0f137e8b75
|
@ -60,6 +60,27 @@ class EdgeRDD[@specialized ED: ClassManifest](
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def zipEdgePartitions[ED2: ClassManifest, ED3: ClassManifest]
|
||||||
|
(other: EdgeRDD[ED2])
|
||||||
|
(f: (EdgePartition[ED], EdgePartition[ED2]) => EdgePartition[ED3]): EdgeRDD[ED3] = {
|
||||||
|
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, preservesPartitioning = true) {
|
||||||
|
(thisIter, otherIter) =>
|
||||||
|
val (pid, thisEPart) = thisIter.next()
|
||||||
|
val (_, otherEPart) = otherIter.next()
|
||||||
|
Iterator(Tuple2(pid, f(thisEPart, otherEPart)))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
def innerJoin[ED2: ClassManifest, ED3: ClassManifest]
|
||||||
|
(other: EdgeRDD[ED2])
|
||||||
|
(f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = {
|
||||||
|
val ed2Manifest = classManifest[ED2]
|
||||||
|
val ed3Manifest = classManifest[ED3]
|
||||||
|
zipEdgePartitions(other) { (thisEPart, otherEPart) =>
|
||||||
|
thisEPart.innerJoin(otherEPart)(f)(ed2Manifest, ed3Manifest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def collectVids(): RDD[Vid] = {
|
def collectVids(): RDD[Vid] = {
|
||||||
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
|
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
||||||
* along with their vertex data.
|
* along with their vertex data.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
val edges: RDD[Edge[ED]]
|
val edges: EdgeRDD[ED]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the edges with the vertex data associated with the adjacent
|
* Get the edges with the vertex data associated with the adjacent
|
||||||
|
|
|
@ -98,6 +98,40 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
||||||
builder.toEdgePartition
|
builder.toEdgePartition
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition
|
||||||
|
* containing the resulting edges.
|
||||||
|
*
|
||||||
|
* If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for
|
||||||
|
* each edge, but each time it may be invoked on any corresponding edge in `other`.
|
||||||
|
*
|
||||||
|
* If there are multiple edges with the same src and dst in `other`, `f` will only be invoked
|
||||||
|
* once.
|
||||||
|
*/
|
||||||
|
def innerJoin[ED2: ClassManifest, ED3: ClassManifest]
|
||||||
|
(other: EdgePartition[ED2])
|
||||||
|
(f: (Vid, Vid, ED, ED2) => ED3): EdgePartition[ED3] = {
|
||||||
|
val builder = new EdgePartitionBuilder[ED3]
|
||||||
|
var i = 0
|
||||||
|
var j = 0
|
||||||
|
// For i = index of each edge in `this`...
|
||||||
|
while (i < size && j < other.size) {
|
||||||
|
val srcId = this.srcIds(i)
|
||||||
|
val dstId = this.dstIds(i)
|
||||||
|
// ... forward j to the index of the corresponding edge in `other`, and...
|
||||||
|
while (j < other.size && other.srcIds(j) < srcId) { j += 1 }
|
||||||
|
if (j < other.size && other.srcIds(j) == srcId) {
|
||||||
|
while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 }
|
||||||
|
if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) {
|
||||||
|
// ... run `f` on the matching edge
|
||||||
|
builder.add(srcId, dstId, f(srcId, dstId, this.data(i), other.data(j)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
i += 1
|
||||||
|
}
|
||||||
|
builder.toEdgePartition
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The number of edges in this partition
|
* The number of edges in this partition
|
||||||
*
|
*
|
||||||
|
|
|
@ -216,7 +216,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
} // end of subgraph
|
} // end of subgraph
|
||||||
|
|
||||||
override def mask[VD2: ClassManifest, ED2: ClassManifest] (
|
override def mask[VD2: ClassManifest, ED2: ClassManifest] (
|
||||||
other: Graph[VD2, ED2]) : Graph[VD, ED] = GraphImpl.mask(this, other)
|
other: Graph[VD2, ED2]): Graph[VD, ED] = {
|
||||||
|
val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
|
||||||
|
val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
|
||||||
|
new GraphImpl(newVerts, newEdges)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
|
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
|
||||||
ClosureCleaner.clean(merge)
|
ClosureCleaner.clean(merge)
|
||||||
|
@ -379,38 +384,6 @@ object GraphImpl {
|
||||||
new EdgeRDD(edges)
|
new EdgeRDD(edges)
|
||||||
}
|
}
|
||||||
|
|
||||||
def mask[VD: ClassManifest, ED: ClassManifest, VD2: ClassManifest, ED2: ClassManifest] (
|
|
||||||
thisGraph: Graph[VD, ED], otherGraph: Graph[VD2, ED2]) : Graph[VD, ED] = {
|
|
||||||
// basically vertices.join(other.vertices)
|
|
||||||
// written this way to take advantage of fast join in VertexSetRDDs
|
|
||||||
val newVTable = VertexSetRDD(
|
|
||||||
thisGraph.vertices.leftJoin(otherGraph.vertices)((vid, v, w) => if (w.isEmpty) None else Some(v))
|
|
||||||
.filter{case (vid, opt) => !opt.isEmpty}
|
|
||||||
.map{case (vid, opt) => (vid, opt.get)}
|
|
||||||
)
|
|
||||||
|
|
||||||
// TODO(amatsukawa): safer way to downcast? case matching perhaps?
|
|
||||||
val thisImpl = thisGraph.asInstanceOf[GraphImpl[VD, ED]]
|
|
||||||
val otherImpl = otherGraph.asInstanceOf[GraphImpl[VD2, ED2]]
|
|
||||||
val newETable = thisImpl.eTable.zipPartitions(otherImpl.eTable) {
|
|
||||||
// extract two edge partitions, keep all edges in in this partition that is
|
|
||||||
// also in the other partition
|
|
||||||
(thisIter, otherIter) =>
|
|
||||||
val (_, otherEPart) = otherIter.next()
|
|
||||||
val otherEdges = otherEPart.iterator.map(e => (e.srcId, e.dstId)).toSet
|
|
||||||
val (pid, thisEPart) = thisIter.next()
|
|
||||||
val newEPartBuilder = new EdgePartitionBuilder[ED]
|
|
||||||
thisEPart.foreach { e =>
|
|
||||||
if (otherEdges.contains((e.srcId, e.dstId)))
|
|
||||||
newEPartBuilder.add(e.srcId, e.dstId, e.attr)
|
|
||||||
}
|
|
||||||
Iterator((pid, newEPartBuilder.toEdgePartition))
|
|
||||||
}.partitionBy(thisImpl.eTable.partitioner.get)
|
|
||||||
|
|
||||||
val newVertexPlacement = new VertexPlacement(newETable, newVTable)
|
|
||||||
new GraphImpl(newVTable, newETable, newVertexPlacement, thisImpl.partitioner)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
|
private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
|
||||||
edges: EdgeRDD[ED],
|
edges: EdgeRDD[ED],
|
||||||
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
|
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
|
||||||
|
|
|
@ -5,7 +5,9 @@ import scala.util.Random
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
import org.apache.spark.SparkContext
|
import org.apache.spark.SparkContext
|
||||||
|
import org.apache.spark.graph.Graph._
|
||||||
import org.apache.spark.graph.LocalSparkContext._
|
import org.apache.spark.graph.LocalSparkContext._
|
||||||
|
import org.apache.spark.graph.impl.EdgePartition
|
||||||
import org.apache.spark.graph.impl.EdgePartitionBuilder
|
import org.apache.spark.graph.impl.EdgePartitionBuilder
|
||||||
import org.apache.spark.rdd._
|
import org.apache.spark.rdd._
|
||||||
|
|
||||||
|
@ -183,7 +185,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("projectGraph") {
|
test("mask") {
|
||||||
withSpark(new SparkContext("local", "test")) { sc =>
|
withSpark(new SparkContext("local", "test")) { sc =>
|
||||||
val n = 5
|
val n = 5
|
||||||
val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
|
val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
|
||||||
|
@ -207,7 +209,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test ("filterGraph") {
|
test ("filter") {
|
||||||
withSpark(new SparkContext("local", "test")) { sc =>
|
withSpark(new SparkContext("local", "test")) { sc =>
|
||||||
val n = 5
|
val n = 5
|
||||||
val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
|
val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
|
||||||
|
@ -215,7 +217,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
||||||
val graph: Graph[Int, Int] = Graph(vertices, edges)
|
val graph: Graph[Int, Int] = Graph(vertices, edges)
|
||||||
val filteredGraph = graph.filter(
|
val filteredGraph = graph.filter(
|
||||||
graph => {
|
graph => {
|
||||||
val degrees: VertexSetRDD[Int] = graph.outDegrees
|
val degrees: VertexRDD[Int] = graph.outDegrees
|
||||||
graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
|
graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
|
||||||
},
|
},
|
||||||
vpred = (vid: Vid, deg:Int) => deg > 0
|
vpred = (vid: Vid, deg:Int) => deg > 0
|
||||||
|
@ -278,4 +280,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
||||||
assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
|
assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
|
||||||
assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
|
assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("EdgePartition.innerJoin") {
|
||||||
|
def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
|
||||||
|
val builder = new EdgePartitionBuilder[A]
|
||||||
|
for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) }
|
||||||
|
builder.toEdgePartition
|
||||||
|
}
|
||||||
|
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
|
||||||
|
val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
|
||||||
|
val a = makeEdgePartition(aList)
|
||||||
|
val b = makeEdgePartition(bList)
|
||||||
|
|
||||||
|
assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
|
||||||
|
List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue