merged with upstream
This commit is contained in:
commit
413b0c1526
|
@ -52,9 +52,7 @@ object Analytics extends Logging {
|
|||
(me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather
|
||||
(a: Double, b: Double) => a + b, // merge
|
||||
1.0,
|
||||
numIter).mapVertices {
|
||||
case Vertex(id, (outDeg, r)) => Vertex(id, r)
|
||||
}
|
||||
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => r }
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -76,9 +74,7 @@ object Analytics extends Logging {
|
|||
(vertex, a: Option[Double]) =>
|
||||
(vertex.data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), vertex.data._2), // apply
|
||||
(me_id, edge) => math.abs(edge.src.data._3 - edge.src.data._2) > tol, // scatter
|
||||
maxIter).mapVertices {
|
||||
case Vertex(vid, data) => Vertex(vid, data._2)
|
||||
}
|
||||
maxIter).mapVertices { case Vertex(vid, data) => data._2 }
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -141,6 +141,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
def mapTriplets[ED2: ClassManifest](
|
||||
map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
|
||||
|
||||
def correctEdges(): Graph[VD, ED]
|
||||
|
||||
/**
|
||||
* Construct a new graph with all the edges reversed. If this graph contains
|
||||
|
@ -252,6 +253,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
*
|
||||
* @tparam U the type of entry in the table of updates
|
||||
* @tparam VD2 the new vertex value type
|
||||
*
|
||||
* @param table the table to join with the vertices in the graph. The table
|
||||
* should contain at most one entry for each vertex.
|
||||
* @param mapFunc the function used to compute the new vertex values. The
|
||||
|
@ -329,24 +331,26 @@ object Graph {
|
|||
import spark.graph.impl._
|
||||
import spark.SparkContext._
|
||||
|
||||
def apply(rawEdges: RDD[(Int,Int)], uniqueEdges: Boolean = true): Graph[Int, Int] = {
|
||||
// Reduce to unique edges
|
||||
val edges =
|
||||
if(uniqueEdges) rawEdges.map{ case (s,t) => ((s,t),1) }.reduceByKey( _ + _ )
|
||||
.map{ case ((s,t), cnt) => Edge(s,t,cnt) }
|
||||
else rawEdges.map{ case (s,t) => Edge(s,t,1) }
|
||||
def apply(rawEdges: RDD[(Int, Int)], uniqueEdges: Boolean = true): Graph[Int, Int] = {
|
||||
// Reduce to unique edges.
|
||||
val edges: RDD[Edge[Int]] =
|
||||
if (uniqueEdges) {
|
||||
rawEdges.map((_, 1)).reduceByKey(_ + _).map { case ((s, t), cnt) => Edge(s, t, cnt) }
|
||||
} else {
|
||||
rawEdges.map { case (s, t) => Edge(s, t, 1) }
|
||||
}
|
||||
// Determine unique vertices
|
||||
val vertices = edges.flatMap{ case Edge(s, t, cnt) => Array((s,1), (t,1)) }.reduceByKey( _ + _ )
|
||||
val vertices: RDD[Vertex[Int]] = edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }
|
||||
.reduceByKey(_ + _)
|
||||
.map{ case (id, deg) => Vertex(id, deg) }
|
||||
// Return graph
|
||||
new GraphImpl(vertices, edges)
|
||||
}
|
||||
|
||||
def apply[VD: ClassManifest, ED: ClassManifest](vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]): Graph[VD, ED] = {
|
||||
def apply[VD: ClassManifest, ED: ClassManifest](
|
||||
vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]): Graph[VD, ED] = {
|
||||
new GraphImpl(vertices, edges)
|
||||
|
||||
}
|
||||
|
||||
|
||||
implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.graph.impl
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.mutable.ArrayBuilder
|
||||
|
||||
import it.unimi.dsi.fastutil.ints.IntArrayList
|
||||
|
||||
|
@ -13,21 +13,25 @@ import spark.graph._
|
|||
private[graph]
|
||||
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] {
|
||||
|
||||
private var _data: Array[ED] = _
|
||||
private var _dataBuilder = ArrayBuilder.make[ED]
|
||||
|
||||
val srcIds: IntArrayList = new IntArrayList
|
||||
val dstIds: IntArrayList = new IntArrayList
|
||||
// TODO: Specialize data.
|
||||
val data: ArrayBuffer[ED] = new ArrayBuffer[ED]
|
||||
|
||||
def data: Array[ED] = _data
|
||||
|
||||
/** Add a new edge to the partition. */
|
||||
def add(src: Vid, dst: Vid, d: ED) {
|
||||
srcIds.add(src)
|
||||
dstIds.add(dst)
|
||||
data += d
|
||||
_dataBuilder += d
|
||||
}
|
||||
|
||||
def trim() {
|
||||
srcIds.trim()
|
||||
dstIds.trim()
|
||||
_data = _dataBuilder.result()
|
||||
}
|
||||
|
||||
def size: Int = srcIds.size
|
||||
|
@ -41,7 +45,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
|||
override def next(): Edge[ED] = {
|
||||
edge.src = srcIds.get(pos)
|
||||
edge.dst = dstIds.get(pos)
|
||||
edge.data = data(pos)
|
||||
edge.data = _data(pos)
|
||||
pos += 1
|
||||
edge
|
||||
}
|
||||
|
|
|
@ -91,6 +91,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
newGraph(vertices, triplets.map(e => Edge(e.src.id, e.dst.id, f(e))))
|
||||
}
|
||||
|
||||
override def correctEdges(): Graph[VD, ED] = {
|
||||
val sc = vertices.context
|
||||
val vset = sc.broadcast(vertices.map(_.id).collect().toSet)
|
||||
val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst))
|
||||
Graph(vertices, newEdges)
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Lower level transformation methods
|
||||
|
@ -110,7 +116,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
|
||||
}, preservesPartitioning = true)
|
||||
|
||||
(new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
|
||||
new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable)
|
||||
.mapPartitions { part =>
|
||||
val (vmap, edges) = part.next()
|
||||
val edgeSansAcc = new EdgeTriplet[VD, ED]()
|
||||
|
@ -150,7 +156,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
}
|
||||
|
||||
/**
|
||||
* Same as mapReduceNeighborhood but map function can return none and there is no default value.
|
||||
* Same as aggregateNeighbors but map function can return none and there is no default value.
|
||||
* As a consequence, the resulting table may be much smaller than the set of vertices.
|
||||
*/
|
||||
override def aggregateNeighbors[VD2: ClassManifest](
|
||||
|
@ -165,7 +171,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
|
||||
}, preservesPartitioning = true)
|
||||
|
||||
(new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
|
||||
new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable)
|
||||
.mapPartitions { part =>
|
||||
val (vmap, edges) = part.next()
|
||||
val edgeSansAcc = new EdgeTriplet[VD, ED]()
|
||||
|
@ -187,8 +193,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
}
|
||||
}
|
||||
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
|
||||
e.dst.data._2 =
|
||||
if (e.dst.data._2.isEmpty) {
|
||||
e.src.data._2 =
|
||||
if (e.src.data._2.isEmpty) {
|
||||
mapFunc(edgeSansAcc.src.id, edgeSansAcc)
|
||||
} else {
|
||||
val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
|
||||
|
@ -218,7 +224,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
}
|
||||
}, preservesPartitioning = true).cache()
|
||||
|
||||
new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable)
|
||||
new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable)
|
||||
}
|
||||
|
||||
override def joinVertices[U: ClassManifest](
|
||||
|
@ -239,7 +245,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
}
|
||||
}, preservesPartitioning = true).cache()
|
||||
|
||||
new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable)
|
||||
new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable)
|
||||
}
|
||||
|
||||
|
||||
|
@ -307,6 +313,7 @@ object GraphImpl {
|
|||
.mapPartitionsWithIndex({ (pid, iter) =>
|
||||
val edgePartition = new EdgePartition[ED]
|
||||
iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) }
|
||||
edgePartition.trim()
|
||||
Iterator((pid, edgePartition))
|
||||
}, preservesPartitioning = true)
|
||||
}
|
||||
|
|
|
@ -3,10 +3,33 @@ package spark.graph
|
|||
import org.scalatest.FunSuite
|
||||
|
||||
import spark.SparkContext
|
||||
import spark.graph.impl.GraphImpl
|
||||
|
||||
|
||||
class GraphSuite extends FunSuite with LocalSparkContext {
|
||||
|
||||
test("aggregateNeighbors") {
|
||||
|
||||
}
|
||||
|
||||
test("joinVertices") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two"), Vertex(3, "three")), 2)
|
||||
val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
|
||||
val g: Graph[String, String] = new GraphImpl(vertices, edges)
|
||||
|
||||
val tbl = sc.parallelize(Seq((1, 10), (2, 20)))
|
||||
val g1 = g.joinVertices(tbl, (v: Vertex[String], u: Int) => v.data + u)
|
||||
|
||||
val v = g1.vertices.collect().sortBy(_.id)
|
||||
assert(v(0).data === "one10")
|
||||
assert(v(1).data === "two20")
|
||||
assert(v(2).data === "three")
|
||||
|
||||
val e = g1.edges.collect()
|
||||
assert(e(0).data === "onetwo")
|
||||
}
|
||||
|
||||
// test("graph partitioner") {
|
||||
// sc = new SparkContext("local", "test")
|
||||
// val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two")))
|
||||
|
|
Loading…
Reference in a new issue