Renamed several functions and classes and improved documentation
This commit is contained in:
parent
b0403d3f2b
commit
a80b28a579
|
@ -1,6 +1,12 @@
|
|||
package spark.graph
|
||||
|
||||
|
||||
/**
|
||||
* A single directed edge consisting of a source id, target id,
|
||||
* and the data associated with the Edgee.
|
||||
*
|
||||
* @tparam ED type of the edge attribute
|
||||
*/
|
||||
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
|
||||
var src: Vid = 0,
|
||||
var dst: Vid = 0,
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
package spark.graph
|
||||
|
||||
|
||||
/**
|
||||
* The direction of directed edge relative to a vertex used to select
|
||||
* the set of adjacent neighbors when running a neighborhood query.
|
||||
*/
|
||||
sealed abstract class EdgeDirection {
|
||||
def reverse: EdgeDirection = this match {
|
||||
case EdgeDirection.In => EdgeDirection.In
|
||||
|
@ -11,7 +15,18 @@ sealed abstract class EdgeDirection {
|
|||
|
||||
|
||||
object EdgeDirection {
|
||||
/**
|
||||
* Edges arriving at a vertex.
|
||||
*/
|
||||
case object In extends EdgeDirection
|
||||
|
||||
/**
|
||||
* Edges originating from a vertex
|
||||
*/
|
||||
case object Out extends EdgeDirection
|
||||
|
||||
/**
|
||||
* All edges adjacent to a vertex
|
||||
*/
|
||||
case object Both extends EdgeDirection
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package spark.graph
|
||||
|
||||
|
||||
class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD,
|
||||
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] {
|
||||
class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD,
|
||||
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] {
|
||||
var src: Vertex[VD] = _
|
||||
var dst: Vertex[VD] = _
|
||||
var data: ED = _
|
|
@ -20,6 +20,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
/**
|
||||
* Get the vertices and their data.
|
||||
*
|
||||
* @return An RDD containing the vertices in this graph
|
||||
*
|
||||
* @see Vertex for the vertex type.
|
||||
*
|
||||
* @todo should vertices return tuples instead of vertex objects?
|
||||
|
@ -30,13 +32,14 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
* Get the Edges and their data as an RDD. The entries in the RDD contain
|
||||
* just the source id and target id along with the edge data.
|
||||
*
|
||||
* @return An RDD containing the edges in this graph
|
||||
*
|
||||
* @see Edge for the edge type.
|
||||
* @see edgesWithVertices to get an RDD which contains all the edges along
|
||||
* with their vertex data.
|
||||
*
|
||||
* @todo Should edges return 3 tuples instead of Edge objects? In this case
|
||||
* we could rename EdgeWithVertices to Edge?
|
||||
* we could rename EdgeTriplet to Edge?
|
||||
*/
|
||||
def edges(): RDD[Edge[ED]]
|
||||
|
||||
|
@ -44,6 +47,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
* Get the edges with the vertex data associated with the adjacent pair of
|
||||
* vertices.
|
||||
*
|
||||
* @return An RDD containing edge triplets.
|
||||
*
|
||||
* @example This operation might be used to evaluate a graph coloring where
|
||||
* we would like to check that both vertices are a different color.
|
||||
* {{{
|
||||
|
@ -56,7 +61,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
* @see edges() If only the edge data and adjacent vertex ids are required.
|
||||
*
|
||||
*/
|
||||
def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]]
|
||||
def triplets(): RDD[EdgeTriplet[VD, ED]]
|
||||
|
||||
/**
|
||||
* Return a graph that is cached when first created. This is used to pin a
|
||||
|
@ -133,8 +138,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
* }}}
|
||||
*
|
||||
*/
|
||||
def mapEdgesWithVertices[ED2: ClassManifest](
|
||||
map: EdgeWithVertices[VD, ED] => ED2): Graph[VD, ED2]
|
||||
def mapTriplets[ED2: ClassManifest](
|
||||
map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
|
||||
|
||||
|
||||
/**
|
||||
|
@ -183,7 +188,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
*
|
||||
*/
|
||||
def aggregateNeighbors[VD2: ClassManifest](
|
||||
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
|
||||
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[VD2],
|
||||
mergeFunc: (VD2, VD2) => VD2,
|
||||
direction: EdgeDirection)
|
||||
: RDD[(Vid, VD2)]
|
||||
|
@ -232,7 +237,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
*
|
||||
*/
|
||||
def aggregateNeighbors[VD2: ClassManifest](
|
||||
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
|
||||
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[VD2],
|
||||
reduceFunc: (VD2, VD2) => VD2,
|
||||
default: VD2, // Should this be a function or a value?
|
||||
direction: EdgeDirection)
|
||||
|
|
|
@ -8,7 +8,7 @@ object Pregel {
|
|||
|
||||
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
|
||||
vprog: ( Vertex[VD], A) => VD,
|
||||
sendMsg: (Vid, EdgeWithVertices[VD, ED]) => Option[A],
|
||||
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
|
||||
mergeMsg: (A, A) => A,
|
||||
initialMsg: A,
|
||||
numIter: Int) : Graph[VD, ED] = {
|
||||
|
@ -16,7 +16,7 @@ object Pregel {
|
|||
var g = graph.cache
|
||||
var i = 0
|
||||
|
||||
def mapF(vid: Vid, edge: EdgeWithVertices[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge)
|
||||
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge)
|
||||
|
||||
def runProg(v: Vertex[VD], msg: Option[A]): VD = {
|
||||
if (msg.isEmpty) v.data else vprog(v, msg.get)
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
package spark.graph
|
||||
|
||||
|
||||
/**
|
||||
* A graph vertex consists of a vertex id and attribute.
|
||||
*
|
||||
* @tparam VD the type of the vertex attribute.
|
||||
*/
|
||||
case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] (
|
||||
var id: Vid = 0,
|
||||
var data: VD = nullValue[VD]) {
|
||||
|
|
|
@ -7,7 +7,7 @@ import spark.graph._
|
|||
|
||||
|
||||
private[graph]
|
||||
class EdgeWithVerticesPartition(idx: Int, val vPart: Partition, val ePart: Partition)
|
||||
class EdgeTripletPartition(idx: Int, val vPart: Partition, val ePart: Partition)
|
||||
extends Partition {
|
||||
override val index: Int = idx
|
||||
override def hashCode(): Int = idx
|
||||
|
@ -18,10 +18,10 @@ class EdgeWithVerticesPartition(idx: Int, val vPart: Partition, val ePart: Parti
|
|||
* A RDD that brings together edge data with its associated vertex data.
|
||||
*/
|
||||
private[graph]
|
||||
class EdgeWithVerticesRDD[VD: ClassManifest, ED: ClassManifest](
|
||||
class EdgeTripletRDD[VD: ClassManifest, ED: ClassManifest](
|
||||
vTableReplicated: RDD[(Vid, VD)],
|
||||
eTable: RDD[(Pid, EdgePartition[ED])])
|
||||
extends RDD[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])](eTable.context, Nil) {
|
||||
extends RDD[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])](eTable.context, Nil) {
|
||||
|
||||
println(vTableReplicated.partitioner.get.numPartitions)
|
||||
println(eTable.partitioner.get.numPartitions)
|
||||
|
@ -33,18 +33,18 @@ class EdgeWithVerticesRDD[VD: ClassManifest, ED: ClassManifest](
|
|||
}
|
||||
|
||||
override def getPartitions = Array.tabulate[Partition](eTable.partitions.size) {
|
||||
i => new EdgeWithVerticesPartition(i, eTable.partitions(i), vTableReplicated.partitions(i))
|
||||
i => new EdgeTripletPartition(i, eTable.partitions(i), vTableReplicated.partitions(i))
|
||||
}
|
||||
|
||||
override val partitioner = eTable.partitioner
|
||||
|
||||
override def getPreferredLocations(s: Partition) =
|
||||
eTable.preferredLocations(s.asInstanceOf[EdgeWithVerticesPartition].ePart)
|
||||
eTable.preferredLocations(s.asInstanceOf[EdgeTripletPartition].ePart)
|
||||
|
||||
override def compute(s: Partition, context: TaskContext)
|
||||
: Iterator[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])] = {
|
||||
: Iterator[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])] = {
|
||||
|
||||
val split = s.asInstanceOf[EdgeWithVerticesPartition]
|
||||
val split = s.asInstanceOf[EdgeTripletPartition]
|
||||
|
||||
// Fetch the vertices and put them in a hashmap.
|
||||
// TODO: use primitive hashmaps for primitive VD types.
|
||||
|
@ -55,9 +55,9 @@ class EdgeWithVerticesRDD[VD: ClassManifest, ED: ClassManifest](
|
|||
.asInstanceOf[(Pid, EdgePartition[ED])]
|
||||
|
||||
// Return an iterator that looks up the hash map to find matching vertices for each edge.
|
||||
val iter = new Iterator[EdgeWithVertices[VD, ED]] {
|
||||
val iter = new Iterator[EdgeTriplet[VD, ED]] {
|
||||
private var pos = 0
|
||||
private val e = new EdgeWithVertices[VD, ED]
|
||||
private val e = new EdgeTriplet[VD, ED]
|
||||
e.src = new Vertex[VD]
|
||||
e.dst = new Vertex[VD]
|
||||
|
|
@ -78,8 +78,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
}
|
||||
|
||||
/** Return a RDD that brings edges with its source and destination vertices together. */
|
||||
override def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = {
|
||||
(new EdgeWithVerticesRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 }
|
||||
override def triplets: RDD[EdgeTriplet[VD, ED]] = {
|
||||
(new EdgeTripletRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 }
|
||||
}
|
||||
|
||||
override def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] = {
|
||||
|
@ -90,9 +90,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
newGraph(vertices, edges.map(e => Edge(e.src, e.dst, f(e))))
|
||||
}
|
||||
|
||||
override def mapEdgesWithVertices[ED2: ClassManifest](f: EdgeWithVertices[VD, ED] => ED2):
|
||||
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2):
|
||||
Graph[VD, ED2] = {
|
||||
newGraph(vertices, edgesWithVertices.map(e => Edge(e.src.id, e.dst.id, f(e))))
|
||||
newGraph(vertices, triplets.map(e => Edge(e.src.id, e.dst.id, f(e))))
|
||||
}
|
||||
|
||||
|
||||
|
@ -101,7 +101,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
//////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
override def aggregateNeighbors[VD2: ClassManifest](
|
||||
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
|
||||
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[VD2],
|
||||
reduceFunc: (VD2, VD2) => VD2,
|
||||
default: VD2,
|
||||
gatherDirection: EdgeDirection)
|
||||
|
@ -114,13 +114,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
|
||||
}, preservesPartitioning = true)
|
||||
|
||||
(new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
|
||||
(new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
|
||||
.mapPartitions { part =>
|
||||
val (vmap, edges) = part.next()
|
||||
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
|
||||
val edgeSansAcc = new EdgeTriplet[VD, ED]()
|
||||
edgeSansAcc.src = new Vertex[VD]
|
||||
edgeSansAcc.dst = new Vertex[VD]
|
||||
edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] =>
|
||||
edges.foreach { e: EdgeTriplet[MutableTuple2[VD, Option[VD2]], ED] =>
|
||||
edgeSansAcc.data = e.data
|
||||
edgeSansAcc.src.data = e.src.data._1
|
||||
edgeSansAcc.dst.data = e.dst.data._1
|
||||
|
@ -158,7 +158,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
* As a consequence, the resulting table may be much smaller than the set of vertices.
|
||||
*/
|
||||
override def aggregateNeighbors[VD2: ClassManifest](
|
||||
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
|
||||
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[VD2],
|
||||
reduceFunc: (VD2, VD2) => VD2,
|
||||
gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = {
|
||||
|
||||
|
@ -169,13 +169,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
|
||||
}, preservesPartitioning = true)
|
||||
|
||||
(new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
|
||||
(new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
|
||||
.mapPartitions { part =>
|
||||
val (vmap, edges) = part.next()
|
||||
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
|
||||
val edgeSansAcc = new EdgeTriplet[VD, ED]()
|
||||
edgeSansAcc.src = new Vertex[VD]
|
||||
edgeSansAcc.dst = new Vertex[VD]
|
||||
edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] =>
|
||||
edges.foreach { e: EdgeTriplet[MutableTuple2[VD, Option[VD2]], ED] =>
|
||||
edgeSansAcc.data = e.data
|
||||
edgeSansAcc.src.data = e.src.data._1
|
||||
edgeSansAcc.dst.data = e.dst.data._1
|
||||
|
|
Loading…
Reference in a new issue