Merge pull request #73 from jegonzal/TriangleCount

Triangle count
This commit is contained in:
Reynold Xin 2013-11-22 17:02:40 -08:00
commit 18ce7e940b
13 changed files with 556 additions and 320 deletions

View file

@ -139,7 +139,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
def getPos(k: T): Int = { def getPos(k: T): Int = {
var pos = hashcode(hasher.hash(k)) & _mask var pos = hashcode(hasher.hash(k)) & _mask
var i = 1 var i = 1
while (true) { val maxProbe = _data.size
while (i < maxProbe) {
if (!_bitset.get(pos)) { if (!_bitset.get(pos)) {
return INVALID_POS return INVALID_POS
} else if (k == _data(pos)) { } else if (k == _data(pos)) {

View file

@ -2,8 +2,6 @@ package org.apache.spark.graph
import org.apache.spark._ import org.apache.spark._
/** /**
* The Analytics object contains a collection of basic graph analytics * The Analytics object contains a collection of basic graph analytics
* algorithms that operate largely on the graph structure. * algorithms that operate largely on the graph structure.
@ -77,7 +75,7 @@ object Analytics extends Logging {
def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double = def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]) = def sendMessage(edge: EdgeTriplet[Double, Double]) =
Array((edge.dstId, edge.srcAttr * edge.attr)) Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank // The initial message received by all vertices in PageRank
val initialMessage = 0.0 val initialMessage = 0.0
@ -153,8 +151,10 @@ object Analytics extends Logging {
} }
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
if (edge.srcAttr._2 > tol) { if (edge.srcAttr._2 > tol) {
Array((edge.dstId, edge.srcAttr._2 * edge.attr)) Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
} else { Array.empty[(Vid, Double)] } } else {
Iterator.empty
}
} }
def messageCombiner(a: Double, b: Double): Double = a + b def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank // The initial message received by all vertices in PageRank
@ -188,11 +188,11 @@ object Analytics extends Logging {
def sendMessage(edge: EdgeTriplet[Vid, ED]) = { def sendMessage(edge: EdgeTriplet[Vid, ED]) = {
if (edge.srcAttr < edge.dstAttr) { if (edge.srcAttr < edge.dstAttr) {
Array((edge.dstId, edge.srcAttr)) Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) { } else if (edge.srcAttr > edge.dstAttr) {
Array((edge.srcId, edge.dstAttr)) Iterator((edge.srcId, edge.dstAttr))
} else { } else {
Array.empty[(Vid, Vid)] Iterator.empty
} }
} }
val initialMessage = Long.MaxValue val initialMessage = Long.MaxValue
@ -204,6 +204,79 @@ object Analytics extends Logging {
} // end of connectedComponents } // end of connectedComponents
/**
* Compute the number of triangles passing through each vertex.
*
* The algorithm is relatively straightforward and can be computed in
* three steps:
*
* 1) Compute the set of neighbors for each vertex
* 2) For each edge compute the intersection of the sets and send the
* count to both vertices.
* 3) Compute the sum at each vertex and divide by two since each
* triangle is counted twice.
*
*
* @param graph a graph with `sourceId` less than `destId`
* @tparam VD
* @tparam ED
* @return
*/
def triangleCount[VD: ClassManifest, ED: ClassManifest](rawGraph: Graph[VD,ED]):
Graph[Int, ED] = {
// Remove redundant edges
val graph = rawGraph.groupEdges( (a,b) => a ).cache
// Construct set representations of the neighborhoods
val nbrSets: VertexSetRDD[VertexSet] =
graph.collectNeighborIds(EdgeDirection.Both).mapValuesWithKeys { (vid, nbrs) =>
val set = new VertexSet(4)
var i = 0
while (i < nbrs.size) {
// prevent self cycle
if(nbrs(i) != vid) {
set.add(nbrs(i))
}
i += 1
}
set
}
// join the sets with the graph
val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
}
// Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(Vid, Int)] = {
assert(et.srcAttr != null)
assert(et.dstAttr != null)
val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
(et.srcAttr, et.dstAttr)
} else {
(et.dstAttr, et.srcAttr)
}
val iter = smallSet.iterator()
var counter: Int = 0
while (iter.hasNext) {
val vid = iter.next
if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 }
}
Iterator((et.srcId, counter), (et.dstId, counter))
}
// compute the intersection along edges
val counters: VertexSetRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice
graph.outerJoinVertices(counters) {
(vid, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// double count should be even (divisible by two)
assert((dblCount & 1) == 0)
dblCount / 2
}
} // end of TriangleCount
def main(args: Array[String]) = { def main(args: Array[String]) = {
val host = args(0) val host = args(0)
@ -277,8 +350,8 @@ object Analytics extends Logging {
val sc = new SparkContext(host, "PageRank(" + fname + ")") val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = GraphLoader.textFile(sc, fname, a => 1.0F, val graph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() minEdgePartitions = numEPart).cache()
val startTime = System.currentTimeMillis val startTime = System.currentTimeMillis
logInfo("GRAPHX: starting tasks") logInfo("GRAPHX: starting tasks")
@ -326,16 +399,35 @@ object Analytics extends Logging {
println("======================================") println("======================================")
val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
//val graph = GraphLoader.textFile(sc, fname, a => 1.0F) val graph = GraphLoader.edgeListFile(sc, fname,
val graph = GraphLoader.textFile(sc, fname, a => 1.0F, minEdgePartitions = numEPart).cache()
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
val cc = Analytics.connectedComponents(graph) val cc = Analytics.connectedComponents(graph)
//val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter)
println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
sc.stop() sc.stop()
} }
case "triangles" => {
var numVPart = 4
var numEPart = 4
options.foreach{
case ("numEPart", v) => numEPart = v.toInt
case ("numVPart", v) => numVPart = v.toInt
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
println("======================================")
println("| Triangle Count |")
println("--------------------------------------")
val sc = new SparkContext(host, "TriangleCount(" + fname + ")")
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
minEdgePartitions = numEPart).cache()
val triangles = Analytics.triangleCount(graph)
println("Triangles: " + triangles.vertices.map {
case (vid,data) => data.toLong
}.reduce(_+_) / 3)
sc.stop()
}
// //
// case "shortestpath" => { // case "shortestpath" => {
// //

View file

@ -61,7 +61,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* type Color = Int * type Color = Int
* val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv") * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv")
* val numInvalid = graph.edgesWithVertices() * val numInvalid = graph.edgesWithVertices()
* .map(e => if(e.src.data == e.dst.data) 1 else 0).sum * .map(e => if (e.src.data == e.dst.data) 1 else 0).sum
* }}} * }}}
* *
* @see edges() If only the edge data and adjacent vertex ids are * @see edges() If only the edge data and adjacent vertex ids are
@ -74,7 +74,6 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def persist(newLevel: StorageLevel): Graph[VD, ED] def persist(newLevel: StorageLevel): Graph[VD, ED]
/** /**
* Return a graph that is cached when first created. This is used to * Return a graph that is cached when first created. This is used to
* pin a graph in memory enabling multiple queries to reuse the same * pin a graph in memory enabling multiple queries to reuse the same
@ -84,14 +83,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
*/ */
def cache(): Graph[VD, ED] def cache(): Graph[VD, ED]
/** /**
* Compute statistics describing the graph representation. * Compute statistics describing the graph representation.
*/ */
def statistics: Map[String, Any] def statistics: Map[String, Any]
/** /**
* Construct a new graph where each vertex value has been * Construct a new graph where each vertex value has been
* transformed by the map function. * transformed by the map function.
@ -110,7 +106,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")
* val root = 42 * val root = 42
* var bfsGraph = rawGraph * var bfsGraph = rawGraph
* .mapVertices[Int]((vid, data) => if(vid == root) 0 else Math.MaxValue) * .mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)
* }}} * }}}
* *
*/ */
@ -160,9 +156,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* }}} * }}}
* *
*/ */
def mapTriplets[ED2: ClassManifest]( def mapTriplets[ED2: ClassManifest](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
/** /**
* Construct a new graph with all the edges reversed. If this graph * Construct a new graph with all the edges reversed. If this graph
@ -172,7 +166,6 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
*/ */
def reverse: Graph[VD, ED] def reverse: Graph[VD, ED]
/** /**
* This function takes a vertex and edge predicate and constructs * This function takes a vertex and edge predicate and constructs
* the subgraph that consists of vertices and edges that satisfy the * the subgraph that consists of vertices and edges that satisfy the
@ -198,35 +191,6 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
/**
* groupEdgeTriplets is used to merge multiple edges that have the
* same source and destination vertex into a single edge. The user
* supplied function is applied to each directed pair of vertices
* (u, v) and has access to all EdgeTriplets
*
* {e: for all e in E where e.src = u and e.dst = v}
*
* This function is identical to
* [[org.apache.spark.graph.Graph.groupEdges]] except that this
* function provides the user-supplied function with an iterator
* over EdgeTriplets, which contain the vertex data, whereas
* groupEdges provides the user-supplied function with an iterator
* over Edges, which only contain the vertex IDs.
*
* @tparam ED2 the type of the resulting edge data after grouping
*
* @param f the user supplied function to merge multiple EdgeTriplets
* into a single ED2 object
*
* @return Graph[VD,ED2] The resulting graph with a single Edge for each
* source, dest vertex pair.
*
*/
def groupEdgeTriplets[ED2: ClassManifest](f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2]
/** /**
* This function merges multiple edges between two vertices into a * This function merges multiple edges between two vertices into a
* single Edge. See * single Edge. See
@ -235,14 +199,13 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* *
* @tparam ED2 the type of the resulting edge data after grouping. * @tparam ED2 the type of the resulting edge data after grouping.
* *
* @param f the user supplied function to merge multiple Edges * @param f the user supplied commutative associative function to merge
* into a single ED2 object. * edge attributes for duplicate edges.
* *
* @return Graph[VD,ED2] The resulting graph with a single Edge for * @return Graph[VD,ED2] The resulting graph with a single Edge for
* each source, dest vertex pair. * each source, dest vertex pair.
*/ */
def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): Graph[VD,ED2] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
/** /**
* The mapReduceTriplets function is used to compute statistics * The mapReduceTriplets function is used to compute statistics
@ -277,11 +240,10 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* *
*/ */
def mapReduceTriplets[A: ClassManifest]( def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
reduceFunc: (A, A) => A) reduceFunc: (A, A) => A)
: VertexSetRDD[A] : VertexSetRDD[A]
/** /**
* Join the vertices with an RDD and then apply a function from the * Join the vertices with an RDD and then apply a function from the
* the vertex and RDD entry to a new vertex value and type. The * the vertex and RDD entry to a new vertex value and type. The
@ -315,7 +277,6 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
(mapFunc: (Vid, VD, Option[U]) => VD2) (mapFunc: (Vid, VD, Option[U]) => VD2)
: Graph[VD2, ED] : Graph[VD2, ED]
// Save a copy of the GraphOps object so there is always one unique GraphOps object // Save a copy of the GraphOps object so there is always one unique GraphOps object
// for a given Graph object, and thus the lazy vals in GraphOps would work as intended. // for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
val ops = new GraphOps(this) val ops = new GraphOps(this)
@ -335,20 +296,15 @@ object Graph {
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
/** /**
* Construct a graph from a collection of edges encoded as vertex id * Construct a graph from a collection of edges encoded as vertex id pairs.
* pairs. Duplicate directed edges are merged to a single edge with
* weight equal to the number of duplicate edges. The returned
* vertex attribute is the number of edges adjacent to that vertex
* (i.e., the undirected degree).
* *
* @param rawEdges the RDD containing the set of edges in the graph * @param rawEdges the RDD containing the set of edges in the graph
* *
* @return a graph with edge attributes containing the count of * @return a graph with edge attributes containing the count of duplicate edges.
* duplicate edges and vertex attributes containing the total degree
* of each vertex.
*/ */
def apply(rawEdges: RDD[(Vid, Vid)]): Graph[Int, Int] = { Graph(rawEdges, true) } def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): Graph[VD, Int] = {
Graph(rawEdges, defaultValue, false, RandomVertexCut())
}
/** /**
* Construct a graph from a collection of edges encoded as vertex id * Construct a graph from a collection of edges encoded as vertex id
@ -364,23 +320,51 @@ object Graph {
* attributes containing the total degree of each vertex. * attributes containing the total degree of each vertex.
* *
*/ */
def apply(rawEdges: RDD[(Vid, Vid)], uniqueEdges: Boolean): Graph[Int, Int] = { def apply[VD: ClassManifest](
// Reduce to unique edges. rawEdges: RDD[(Vid, Vid)],
val edges: RDD[Edge[Int]] = defaultValue: VD,
if (uniqueEdges) { uniqueEdges: Boolean,
rawEdges.map((_, 1)).reduceByKey(_ + _).map { case ((s, t), cnt) => Edge(s, t, cnt) } partitionStrategy: PartitionStrategy):
} else { Graph[VD, Int] = {
rawEdges.map { case (s, t) => Edge(s, t, 1) } val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
} val graph = GraphImpl(edges, defaultValue, partitionStrategy)
// Determine unique vertices if (uniqueEdges) {
/** @todo Should this reduceByKey operation be indexed? */ graph.groupEdges((a,b) => a+b)
val vertices: RDD[(Vid, Int)] = } else {
edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }.reduceByKey(_ + _) graph
}
// Return graph
GraphImpl(vertices, edges, 0)
} }
/**
* Construct a graph from a collection of edges.
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED] = {
Graph(edges, defaultValue, RandomVertexCut())
}
/**
* Construct a graph from a collection of edges.
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
defaultValue: VD,
partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
GraphImpl(edges, defaultValue, partitionStrategy)
}
/** /**
* Construct a graph from a collection attributed vertices and * Construct a graph from a collection attributed vertices and
@ -400,32 +384,30 @@ object Graph {
vertices: RDD[(Vid,VD)], vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]]): Graph[VD, ED] = { edges: RDD[Edge[ED]]): Graph[VD, ED] = {
val defaultAttr: VD = null.asInstanceOf[VD] val defaultAttr: VD = null.asInstanceOf[VD]
Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a) Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a, RandomVertexCut())
} }
/** /**
* Construct a graph from a collection attributed vertices and * Construct a graph from a collection attributed vertices and
* edges. Duplicate vertices are combined using the `mergeFunc` and * edges. Duplicate vertices are combined using the `mergeFunc` and
* vertices found in the edge collection but not in the input * vertices found in the edge collection but not in the input
* vertices are the default attribute `defautVertexAttr`. * vertices are the default attribute `defautVertexAttr`.
* *
* @note Duplicate vertices are removed arbitrarily .
*
* @tparam VD the vertex attribute type * @tparam VD the vertex attribute type
* @tparam ED the edge attribute type * @tparam ED the edge attribute type
* @param vertices the "set" of vertices and their attributes * @param vertices the "set" of vertices and their attributes
* @param edges the collection of edges in the graph * @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for * @param defaultVertexAttr the default vertex attribute to use for
* vertices that are mentioned in `edges` but not in `vertices * vertices that are mentioned in `edges` but not in `vertices`
* @param mergeFunc the function used to merge duplicate vertices
* in the `vertices` collection.
* *
*/ */
def apply[VD: ClassManifest, ED: ClassManifest]( def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid,VD)], vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]], edges: RDD[Edge[ED]],
defaultVertexAttr: VD): Graph[VD, ED] = { defaultVertexAttr: VD): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, (a,b) => a) Graph(vertices, edges, defaultVertexAttr, (a,b) => a, RandomVertexCut())
} }
/** /**
@ -442,14 +424,17 @@ object Graph {
* vertices that are mentioned in `edges` but not in `vertices * vertices that are mentioned in `edges` but not in `vertices
* @param mergeFunc the function used to merge duplicate vertices * @param mergeFunc the function used to merge duplicate vertices
* in the `vertices` collection. * in the `vertices` collection.
* @param partitionStrategy the partition strategy to use when
* partitioning the edges.
* *
*/ */
def apply[VD: ClassManifest, ED: ClassManifest]( def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid,VD)], vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]], edges: RDD[Edge[ED]],
defaultVertexAttr: VD, defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD): Graph[VD, ED] = { mergeFunc: (VD, VD) => VD,
GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc) partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc, partitionStrategy)
} }
/** /**

View file

@ -20,43 +20,84 @@ object GraphLoader {
* @param minEdgePartitions the number of partitions for the * @param minEdgePartitions the number of partitions for the
* the Edge RDD * the Edge RDD
* *
* @todo remove minVertexPartitions arg
*/ */
def textFile[ED: ClassManifest]( def textFile[ED: ClassManifest](
sc: SparkContext, sc: SparkContext,
path: String, path: String,
edgeParser: Array[String] => ED, edgeParser: Array[String] => ED,
minEdgePartitions: Int = 1, minEdgePartitions: Int = 1,
minVertexPartitions: Int = 1, partitionStrategy: PartitionStrategy = RandomVertexCut()):
partitionStrategy: PartitionStrategy = RandomVertexCut()): GraphImpl[Int, ED] = { Graph[Int, ED] = {
// Parse the edge data table // Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).flatMap { line => val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
if (!line.isEmpty && line(0) != '#') { iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
val lineArray = line.split("\\s+") val lineArray = line.split("\\s+")
if(lineArray.length < 2) { if(lineArray.length < 2) {
println("Invalid line: " + line) println("Invalid line: " + line)
assert(false) assert(false)
} }
val source = lineArray(0) val source = lineArray(0).trim.toLong
val target = lineArray(1) val target = lineArray(1).trim.toLong
val tail = lineArray.drop(2) val tail = lineArray.drop(2)
val edata = edgeParser(tail) val edata = edgeParser(tail)
Array(Edge(source.trim.toInt, target.trim.toInt, edata)) Edge(source, target, edata)
} else { })
Array.empty[Edge[ED]] val defaultVertexAttr = 1
} Graph(edges, defaultVertexAttr, partitionStrategy)
}.cache()
val graph = fromEdges(edges, partitionStrategy)
graph
} }
private def fromEdges[ED: ClassManifest]( /**
edges: RDD[Edge[ED]], * Load a graph from an edge list formatted file with each line containing
partitionStrategy: PartitionStrategy): GraphImpl[Int, ED] = { * two integers: a source Id and a target Id.
val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) } *
.reduceByKey(_ + _) * @example A file in the following format:
GraphImpl(vertices, edges, 0, (a: Int, b: Int) => a, partitionStrategy) * {{{
} * # Comment Line
* # Source Id <\t> Target Id
* 1 -5
* 1 2
* 2 7
* 1 8
* }}}
*
* If desired the edges can be automatically oriented in the positive
* direction (source Id < target Id) by setting `canonicalOrientation` to
* true
*
* @param sc
* @param path the path to the file (e.g., /Home/data/file or hdfs://file)
* @param canonicalOrientation whether to orient edges in the positive
* direction.
* @param minEdgePartitions the number of partitions for the
* the Edge RDD
* @tparam ED
* @return
*/
def edgeListFile[ED: ClassManifest](
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1,
partitionStrategy: PartitionStrategy = RandomVertexCut()):
Graph[Int, Int] = {
// Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
val lineArray = line.split("\\s+")
if(lineArray.length < 2) {
println("Invalid line: " + line)
assert(false)
}
val source = lineArray(0).trim.toLong
val target = lineArray(1).trim.toLong
if (canonicalOrientation && target > source) {
Edge(target, source, 1)
} else {
Edge(source, target, 1)
}
})
val defaultVertexAttr = 1
Graph(edges, defaultVertexAttr, partitionStrategy)
} // end of edgeListFile
} }

View file

@ -3,7 +3,7 @@ package org.apache.spark.graph
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import org.apache.spark.util.ClosureCleaner import org.apache.spark.util.ClosureCleaner
import org.apache.spark.SparkException
/** /**
@ -61,11 +61,11 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
*/ */
private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = { private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = {
if (edgeDirection == EdgeDirection.In) { if (edgeDirection == EdgeDirection.In) {
graph.mapReduceTriplets(et => Array((et.dstId,1)), _+_) graph.mapReduceTriplets(et => Iterator((et.dstId,1)), _ + _)
} else if (edgeDirection == EdgeDirection.Out) { } else if (edgeDirection == EdgeDirection.Out) {
graph.mapReduceTriplets(et => Array((et.srcId,1)), _+_) graph.mapReduceTriplets(et => Iterator((et.srcId,1)), _ + _)
} else { // EdgeDirection.both } else { // EdgeDirection.both
graph.mapReduceTriplets(et => Array((et.srcId,1), (et.dstId,1)), _+_) graph.mapReduceTriplets(et => Iterator((et.srcId,1), (et.dstId,1)), _ + _)
} }
} }
@ -133,11 +133,10 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
} else { Option.empty[A] } } else { Option.empty[A] }
// construct the return array // construct the return array
(src, dst) match { (src, dst) match {
case (None, None) => Array.empty[(Vid, A)] case (None, None) => Iterator.empty
case (Some(srcA),None) => Array((et.srcId, srcA)) case (Some(srcA),None) => Iterator((et.srcId, srcA))
case (None, Some(dstA)) => Array((et.dstId, dstA)) case (None, Some(dstA)) => Iterator((et.dstId, dstA))
case (Some(srcA), Some(dstA)) => case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA))
Array((et.srcId, srcA), (et.dstId, dstA))
} }
} }
@ -156,10 +155,23 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
*/ */
def collectNeighborIds(edgeDirection: EdgeDirection) : def collectNeighborIds(edgeDirection: EdgeDirection) :
VertexSetRDD[Array[Vid]] = { VertexSetRDD[Array[Vid]] = {
val nbrs = graph.aggregateNeighbors[Array[Vid]]( val nbrs =
(vid, edge) => Some(Array(edge.otherVertexId(vid))), if (edgeDirection == EdgeDirection.Both) {
(a, b) => a ++ b, graph.mapReduceTriplets[Array[Vid]](
edgeDirection) mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _
)
} else if (edgeDirection == EdgeDirection.Out) {
graph.mapReduceTriplets[Array[Vid]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId))),
reduceFunc = _ ++ _)
} else if (edgeDirection == EdgeDirection.In) {
graph.mapReduceTriplets[Array[Vid]](
mapFunc = et => Iterator((et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _)
} else {
throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.")
}
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[Vid]) } graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[Vid]) }
} // end of collectNeighborIds } // end of collectNeighborIds

View file

@ -93,7 +93,7 @@ object Pregel {
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, numIter: Int)( (graph: Graph[VD, ED], initialMsg: A, numIter: Int)(
vprog: (Vid, VD, A) => VD, vprog: (Vid, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Array[(Vid,A)], sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
mergeMsg: (A, A) => A) mergeMsg: (A, A) => A)
: Graph[VD, ED] = { : Graph[VD, ED] = {
@ -163,7 +163,7 @@ object Pregel {
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A)( (graph: Graph[VD, ED], initialMsg: A)(
vprog: (Vid, VD, A) => VD, vprog: (Vid, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Array[(Vid,A)], sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
mergeMsg: (A, A) => A) mergeMsg: (A, A) => A)
: Graph[VD, ED] = { : Graph[VD, ED] = {
@ -174,7 +174,7 @@ object Pregel {
} }
} }
def sendMsgFun(edge: EdgeTriplet[(VD,Boolean), ED]): Array[(Vid, A)] = { def sendMsgFun(edge: EdgeTriplet[(VD,Boolean), ED]): Iterator[(Vid, A)] = {
if(edge.srcAttr._2) { if(edge.srcAttr._2) {
val et = new EdgeTriplet[VD, ED] val et = new EdgeTriplet[VD, ED]
et.srcId = edge.srcId et.srcId = edge.srcId
@ -183,7 +183,9 @@ object Pregel {
et.dstAttr = edge.dstAttr._1 et.dstAttr = edge.dstAttr._1
et.attr = edge.attr et.attr = edge.attr
sendMsg(et) sendMsg(et)
} else { Array.empty[(Vid,A)] } } else {
Iterator.empty
}
} }
var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) ) var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) )

View file

@ -171,7 +171,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
var ind = bs.nextSetBit(0) var ind = bs.nextSetBit(0)
while(ind >= 0) { while(ind >= 0) {
val k = index.getValueSafe(ind) val k = index.getValueSafe(ind)
if( cleanPred( (k, oldValues(ind)) ) ) { if (cleanPred((k, oldValues(ind)))) {
newBS.set(ind) newBS.set(ind)
} }
ind = bs.nextSetBit(ind+1) ind = bs.nextSetBit(ind+1)
@ -278,7 +278,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
def zipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, W) => Z): def zipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, W) => Z):
VertexSetRDD[Z] = { VertexSetRDD[Z] = {
val cleanF = index.rdd.context.clean(f) val cleanF = index.rdd.context.clean(f)
if(index != other.index) { if (index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
} }
val newValuesRDD: RDD[ (Array[Z], BitSet) ] = val newValuesRDD: RDD[ (Array[Z], BitSet) ] =
@ -315,7 +315,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]): def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]):
RDD[Z] = { RDD[Z] = {
val cleanF = index.rdd.context.clean(f) val cleanF = index.rdd.context.clean(f)
if(index != other.index) { if (index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
} }
index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) => index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) =>
@ -351,7 +351,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
*/ */
def leftZipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, Option[W]) => Z): def leftZipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, Option[W]) => Z):
VertexSetRDD[Z] = { VertexSetRDD[Z] = {
if(index != other.index) { if (index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
} }
val cleanF = index.rdd.context.clean(f) val cleanF = index.rdd.context.clean(f)
@ -644,6 +644,23 @@ object VertexSetRDD {
new VertexSetIndex(index) new VertexSetIndex(index)
} }
/**
* Create a VertexSetRDD with all vertices initialized to the default value.
*
* @param index an index over the set of vertices
* @param defaultValue the default value to use when initializing the vertices
* @tparam V the type of the vertex attribute
* @return
*/
def apply[V: ClassManifest](index: VertexSetIndex, defaultValue: V): VertexSetRDD[V] = {
// Use the index to build the new values table
val values: RDD[ (Array[V], BitSet) ] = index.rdd.mapPartitions(_.map { index =>
val values = Array.fill(index.capacity)(defaultValue)
val bs = index.getBitSet
(values, bs)
}, preservesPartitioning = true)
new VertexSetRDD(index, values)
} // end of apply
} // end of object VertexSetRDD } // end of object VertexSetRDD

View file

@ -1,19 +1,39 @@
package org.apache.spark.graph.impl package org.apache.spark.graph.impl
import org.apache.spark.graph._ import org.apache.spark.graph._
import org.apache.spark.util.collection.OpenHashMap
/** /**
* A partition of edges in 3 large columnar arrays. * A collection of edges stored in 3 large columnar arrays (src, dst, attribute).
*
* @param srcIds the source vertex id of each edge
* @param dstIds the destination vertex id of each edge
* @param data the attribute associated with each edge
* @tparam ED the edge attribute type.
*/ */
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]( class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
val srcIds: Array[Vid], val srcIds: Array[Vid],
val dstIds: Array[Vid], val dstIds: Array[Vid],
val data: Array[ED]) val data: Array[ED]) {
{
/**
* Reverse all the edges in this partition.
*
* @note No new data structures are created.
*
* @return a new edge partition with all edges reversed.
*/
def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data) def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data)
/**
* Construct a new edge partition by applying the function f to all
* edges in this partition.
*
* @param f a function from an edge to a new attribute
* @tparam ED2 the type of the new attribute
* @return a new edge partition with the result of the function `f`
* applied to each edge
*/
def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = { def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = {
val newData = new Array[ED2](data.size) val newData = new Array[ED2](data.size)
val edge = new Edge[ED]() val edge = new Edge[ED]()
@ -29,6 +49,11 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
new EdgePartition(srcIds, dstIds, newData) new EdgePartition(srcIds, dstIds, newData)
} }
/**
* Apply the function f to all edges in this partition.
*
* @param f an external state mutating user defined function.
*/
def foreach(f: Edge[ED] => Unit) { def foreach(f: Edge[ED] => Unit) {
val edge = new Edge[ED] val edge = new Edge[ED]
val size = data.size val size = data.size
@ -42,8 +67,43 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
} }
} }
/**
* Merge all the edges with the same src and dest id into a single
* edge using the `merge` function
*
* @param merge a commutative associative merge operation
* @return a new edge partition without duplicate edges
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
// Aggregate all matching edges in a hashmap
val agg = new OpenHashMap[(Vid,Vid), ED]
foreach { e => agg.setMerge((e.srcId, e.dstId), e.attr, merge) }
// Populate new srcId, dstId, and data, arrays
val newSrcIds = new Array[Vid](agg.size)
val newDstIds = new Array[Vid](agg.size)
val newData = new Array[ED](agg.size)
var i = 0
agg.foreach { kv =>
newSrcIds(i) = kv._1._1
newDstIds(i) = kv._1._2
newData(i) = kv._2
i += 1
}
new EdgePartition(newSrcIds, newDstIds, newData)
}
/**
* The number of edges in this partition
*
* @return size of the partition
*/
def size: Int = srcIds.size def size: Int = srcIds.size
/**
* Get an iterator over the edges in this partition.
*
* @return an iterator over edges in the partition
*/
def iterator = new Iterator[Edge[ED]] { def iterator = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED] private[this] val edge = new Edge[ED]
private[this] var pos = 0 private[this] var pos = 0

View file

@ -135,8 +135,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
* Display the lineage information for this graph. * Display the lineage information for this graph.
*/ */
def printLineage() = { def printLineage() = {
def traverseLineage(
def traverseLineage(rdd: RDD[_], indent: String = "", visited: Map[Int, String] = Map.empty[Int, String]) { rdd: RDD[_],
indent: String = "",
visited: Map[Int, String] = Map.empty[Int, String]) {
if(visited.contains(rdd.id)) { if(visited.contains(rdd.id)) {
println(indent + visited(rdd.id)) println(indent + visited(rdd.id))
println(indent) println(indent)
@ -147,42 +149,35 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val deps = rdd.dependencies val deps = rdd.dependencies
val partitioner = rdd.partitioner val partitioner = rdd.partitioner
val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0} val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0}
println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner + ", " + numparts +")") println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner +
", " + numparts +")")
println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString) println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString)
println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", ")) println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", "))
deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited)) deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
} }
} }
println("eTable ------------------------------------------") println("eTable ------------------------------------------")
traverseLineage(eTable, " ") traverseLineage(eTable, " ")
var visited = Map(eTable.id -> "eTable") var visited = Map(eTable.id -> "eTable")
println("\n\nvTable.index ------------------------------------") println("\n\nvTable.index ------------------------------------")
traverseLineage(vTable.index.rdd, " ", visited) traverseLineage(vTable.index.rdd, " ", visited)
visited += (vTable.index.rdd.id -> "vTable.index") visited += (vTable.index.rdd.id -> "vTable.index")
println("\n\nvTable.values ------------------------------------") println("\n\nvTable.values ------------------------------------")
traverseLineage(vTable.valuesRDD, " ", visited) traverseLineage(vTable.valuesRDD, " ", visited)
visited += (vTable.valuesRDD.id -> "vTable.values") visited += (vTable.valuesRDD.id -> "vTable.values")
println("\n\nvTable ------------------------------------------") println("\n\nvTable ------------------------------------------")
traverseLineage(vTable, " ", visited) traverseLineage(vTable, " ", visited)
visited += (vTable.id -> "vTable") visited += (vTable.id -> "vTable")
println("\n\nvid2pid.bothAttrs -------------------------------") println("\n\nvid2pid.bothAttrs -------------------------------")
traverseLineage(vid2pid.bothAttrs, " ", visited) traverseLineage(vid2pid.bothAttrs, " ", visited)
visited += (vid2pid.bothAttrs.id -> "vid2pid") visited += (vid2pid.bothAttrs.id -> "vid2pid")
visited += (vid2pid.bothAttrs.valuesRDD.id -> "vid2pid.bothAttrs") visited += (vid2pid.bothAttrs.valuesRDD.id -> "vid2pid.bothAttrs")
println("\n\nlocalVidMap -------------------------------------") println("\n\nlocalVidMap -------------------------------------")
traverseLineage(localVidMap, " ", visited) traverseLineage(localVidMap, " ", visited)
visited += (localVidMap.id -> "localVidMap") visited += (localVidMap.id -> "localVidMap")
println("\n\nvTableReplicatedValues.bothAttrs ----------------") println("\n\nvTableReplicatedValues.bothAttrs ----------------")
traverseLineage(vTableReplicatedValues.bothAttrs, " ", visited) traverseLineage(vTableReplicatedValues.bothAttrs, " ", visited)
visited += (vTableReplicatedValues.bothAttrs.id -> "vTableReplicatedValues.bothAttrs") visited += (vTableReplicatedValues.bothAttrs.id -> "vTableReplicatedValues.bothAttrs")
println("\n\ntriplets ----------------------------------------") println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited) traverseLineage(triplets, " ", visited)
println(visited) println(visited)
@ -210,94 +205,27 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = { vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = {
/** @todo The following code behaves deterministically on each
* vertex predicate but uses additional space. Should we swithc to
* this version
*/
// val predGraph = mapVertices(v => (v.data, vpred(v)))
// val newETable = predGraph.triplets.filter(t =>
// if(v.src.data._2 && v.dst.data._2) {
// val src = Vertex(t.src.id, t.src.data._1)
// val dst = Vertex(t.dst.id, t.dst.data._1)
// epred(new EdgeTriplet[VD, ED](src, dst, t.data))
// } else { false })
// val newVTable = predGraph.vertices.filter(v => v.data._1)
// .map(v => (v.id, v.data._1)).indexed()
// Reuse the partitioner (but not the index) from this graph // Reuse the partitioner (but not the index) from this graph
val newVTable = val newVTable =
VertexSetRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner)) VertexSetRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner))
// Restrict the set of edges to those that satisfy the vertex and the edge predicate. // Restrict the set of edges to those that satisfy the vertex and the edge predicate.
val newETable = createETable( val newETable = createETable(
triplets.filter( triplets.filter(
t => vpred( t.srcId, t.srcAttr ) && vpred( t.dstId, t.dstAttr ) && epred(t) t => vpred( t.srcId, t.srcAttr ) && vpred( t.dstId, t.dstAttr ) && epred(t)
) ).map( t => Edge(t.srcId, t.dstId, t.attr) ), partitioner)
.map( t => Edge(t.srcId, t.dstId, t.attr) ), partitioner)
// Construct the Vid2Pid map. Here we assume that the filter operation // Construct the Vid2Pid map. Here we assume that the filter operation
// behaves deterministically. // behaves deterministically.
// @todo reindex the vertex and edge tables // @todo reindex the vertex and edge tables
val newVid2Pid = new Vid2Pid(newETable, newVTable.index) val newVid2Pid = new Vid2Pid(newETable, newVTable.index)
val newVidMap = createLocalVidMap(newETable) val newVidMap = createLocalVidMap(newETable)
new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable, partitioner) new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable, partitioner)
} } // end of subgraph
override def groupEdgeTriplets[ED2: ClassManifest]( override def groupEdges(merge: (ED, ED) => ED ): Graph[VD,ED] = {
f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = { ClosureCleaner.clean(merge)
partitioner match { val newETable =
case _: CanonicalRandomVertexCut => { eTable.mapPartitions { _.map(p => (p._1, p._2.groupEdges(merge))) }
val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter => new GraphImpl(vTable, vid2pid, localVidMap, newETable, partitioner)
partIter
// TODO(crankshaw) toList requires that the entire edge partition
// can fit in memory right now.
.toList
// groups all ETs in this partition that have the same src and dst
// Because all ETs with the same src and dst will live on the same
// partition due to the canonicalRandomVertexCut partitioner, this
// guarantees that these ET groups will be complete.
.groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) }
.mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
.toList
.toIterator
.map { case ((src, dst), data) => Edge(src, dst, data) }
.toIterator
}
//TODO(crankshaw) eliminate the need to call createETable
val newETable = createETable(newEdges, partitioner)
new GraphImpl(vTable, vid2pid, localVidMap, newETable, partitioner)
}
case _ => throw new SparkException(partitioner.getClass.getName
+ " is incompatible with groupEdgeTriplets")
}
}
override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ):
Graph[VD,ED2] = {
partitioner match {
case _: CanonicalRandomVertexCut => {
val newEdges: RDD[Edge[ED2]] = edges.mapPartitions { partIter =>
partIter.toList
.groupBy { e: Edge[ED] => (e.srcId, e.dstId) }
.mapValues { ts => f(ts.toIterator) }
.toList
.toIterator
.map { case ((src, dst), data) => Edge(src, dst, data) }
}
// TODO(crankshaw) eliminate the need to call createETable
val newETable = createETable(newEdges, partitioner)
new GraphImpl(vTable, vid2pid, localVidMap, newETable, partitioner)
}
case _ => throw new SparkException(partitioner.getClass.getName
+ " is incompatible with groupEdges")
}
} }
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
@ -305,7 +233,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
override def mapReduceTriplets[A: ClassManifest]( override def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
reduceFunc: (A, A) => A) reduceFunc: (A, A) => A)
: VertexSetRDD[A] = : VertexSetRDD[A] =
GraphImpl.mapReduceTriplets(this, mapFunc, reduceFunc) GraphImpl.mapReduceTriplets(this, mapFunc, reduceFunc)
@ -323,27 +251,48 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
object GraphImpl { object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest]( def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]],
edges: RDD[Edge[ED]], defaultValue: VD,
defaultVertexAttr: VD): GraphImpl[VD,ED] = { partitionStrategy: PartitionStrategy):
apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut()) GraphImpl[VD, ED] = {
val etable = createETable(edges, partitionStrategy).cache
// Get the set of all vids
val vids = etable.mapPartitions( iter => {
val (pid, epart) = iter.next()
assert(!iter.hasNext)
epart.iterator.flatMap(e => Iterator(e.srcId, e.dstId))
}, true)
// Index the set of all vids
val index = VertexSetRDD.makeIndex(vids)
// Index the vertices and fill in missing attributes with the default
val vtable = VertexSetRDD(index, defaultValue)
val vid2pid = new Vid2Pid(etable, vtable.index)
val localVidMap = createLocalVidMap(etable)
new GraphImpl(vtable, vid2pid, localVidMap, etable, partitionStrategy)
} }
def apply[VD: ClassManifest, ED: ClassManifest]( // def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)], // vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]], // edges: RDD[Edge[ED]],
defaultVertexAttr: VD, // defaultVertexAttr: VD): GraphImpl[VD,ED] = {
partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = { // apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut())
apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy) // }
}
def apply[VD: ClassManifest, ED: ClassManifest]( // def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)], // vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]], // edges: RDD[Edge[ED]],
defaultVertexAttr: VD, // defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = { // partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = {
apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut()) // apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy)
} // }
// def apply[VD: ClassManifest, ED: ClassManifest](
// vertices: RDD[(Vid, VD)],
// edges: RDD[Edge[ED]],
// defaultVertexAttr: VD,
// mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
// apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut())
// }
def apply[VD: ClassManifest, ED: ClassManifest]( def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)], vertices: RDD[(Vid, VD)],
@ -372,7 +321,6 @@ object GraphImpl {
new GraphImpl(vtable, vid2pid, localVidMap, etable, partitionStrategy) new GraphImpl(vtable, vid2pid, localVidMap, etable, partitionStrategy)
} }
/** /**
* Create the edge table RDD, which is much more efficient for Java heap storage than the * Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]). * normal edges data structure (RDD[(Vid, Vid, ED)]).
@ -419,9 +367,9 @@ object GraphImpl {
} }
def makeTriplets[VD: ClassManifest, ED: ClassManifest]( def makeTriplets[VD: ClassManifest, ED: ClassManifest](
localVidMap: RDD[(Pid, VertexIdToIndexMap)], localVidMap: RDD[(Pid, VertexIdToIndexMap)],
vTableReplicatedValues: RDD[(Pid, Array[VD]) ], vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = { eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
eTable.zipPartitions(localVidMap, vTableReplicatedValues) { eTable.zipPartitions(localVidMap, vTableReplicatedValues) {
(eTableIter, vidMapIter, replicatedValuesIter) => (eTableIter, vidMapIter, replicatedValuesIter) =>
val (_, vidToIndex) = vidMapIter.next() val (_, vidToIndex) = vidMapIter.next()
@ -432,16 +380,16 @@ object GraphImpl {
} }
def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest]( def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
g: GraphImpl[VD, ED], g: GraphImpl[VD, ED],
f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues.bothAttrs){ val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues.bothAttrs) {
(edgePartitionIter, vidToIndexIter, vertexArrayIter) => (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (pid, edgePartition) = edgePartitionIter.next() val (pid, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next() val (_, vidToIndex) = vidToIndexIter.next()
val (_, vertexArray) = vertexArrayIter.next() val (_, vertexArray) = vertexArrayIter.next()
val et = new EdgeTriplet[VD, ED] val et = new EdgeTriplet[VD, ED]
val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray) val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
val newEdgePartition = edgePartition.map{e => val newEdgePartition = edgePartition.map { e =>
et.set(e) et.set(e)
et.srcAttr = vmap(e.srcId) et.srcAttr = vmap(e.srcId)
et.dstAttr = vmap(e.dstId) et.dstAttr = vmap(e.dstId)
@ -453,23 +401,20 @@ object GraphImpl {
} }
def mapReduceTriplets[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( def mapReduceTriplets[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
g: GraphImpl[VD, ED], g: GraphImpl[VD, ED],
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
reduceFunc: (A, A) => A): VertexSetRDD[A] = { reduceFunc: (A, A) => A): VertexSetRDD[A] = {
ClosureCleaner.clean(mapFunc) ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc) ClosureCleaner.clean(reduceFunc)
// For each vertex, replicate its attribute only to partitions where it is // For each vertex, replicate its attribute only to partitions where it is
// in the relevant position in an edge. // in the relevant position in an edge.
val mapFuncUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr") val mapFuncUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapFuncUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr") val mapFuncUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
// Map and preaggregate // Map and preaggregate
val preAgg = g.eTable.zipPartitions( val preAgg = g.eTable.zipPartitions(
g.localVidMap, g.localVidMap,
g.vTableReplicatedValues.get(mapFuncUsesSrcAttr, mapFuncUsesDstAttr) g.vTableReplicatedValues.get(mapFuncUsesSrcAttr, mapFuncUsesDstAttr)
){ ) {
(edgePartitionIter, vidToIndexIter, vertexArrayIter) => (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (_, edgePartition) = edgePartitionIter.next() val (_, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next() val (_, vidToIndex) = vidToIndexIter.next()

View file

@ -65,7 +65,7 @@ object GraphGenerators {
// Right now it just generates a bunch of edges where // Right now it just generates a bunch of edges where
// the edge data is the weight (default 1) // the edge data is the weight (default 1)
def logNormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = { def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = {
// based on Pregel settings // based on Pregel settings
val mu = 4 val mu = 4
val sigma = 1.3 val sigma = 1.3
@ -79,7 +79,7 @@ object GraphGenerators {
v => generateRandomEdges(v._1.toInt, v._2, numVertices) v => generateRandomEdges(v._1.toInt, v._2, numVertices)
} }
GraphImpl(vertices, edges, 0) Graph(vertices, edges, 0)
//println("Vertices:") //println("Vertices:")
//for (v <- vertices) { //for (v <- vertices) {
// println(v.id) // println(v.id)
@ -137,7 +137,7 @@ object GraphGenerators {
def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): GraphImpl[Int, Int] = { def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): Graph[Int, Int] = {
// let N = requestedNumVertices // let N = requestedNumVertices
// the number of vertices is 2^n where n=ceil(log2[N]) // the number of vertices is 2^n where n=ceil(log2[N])
// This ensures that the 4 quadrants are the same size at all recursion levels // This ensures that the 4 quadrants are the same size at all recursion levels
@ -155,12 +155,12 @@ object GraphGenerators {
} }
def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): Graph[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.srcId, 1)) } val vertices = edges.flatMap { edge => List((edge.srcId, 1)) }
.reduceByKey(_ + _) .reduceByKey(_ + _)
.map{ case (vid, degree) => (vid, degree) } .map{ case (vid, degree) => (vid, degree) }
GraphImpl(vertices, edges, 0) Graph(vertices, edges, 0)
} }
/** /**
@ -275,7 +275,7 @@ object GraphGenerators {
*/ */
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
Graph(edges, false) Graph(edges, 1)
} // end of starGraph } // end of starGraph

View file

@ -18,23 +18,23 @@ object GridPageRank {
// Convert row column address into vertex ids (row major order) // Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Int = r * nCols + c def sub2ind(r: Int, c: Int): Int = r * nCols + c
// Make the grid graph // Make the grid graph
for(r <- 0 until nRows; c <- 0 until nCols){ for (r <- 0 until nRows; c <- 0 until nCols) {
val ind = sub2ind(r,c) val ind = sub2ind(r,c)
if(r+1 < nRows) { if (r+1 < nRows) {
outDegree(ind) += 1 outDegree(ind) += 1
inNbrs(sub2ind(r+1,c)) += ind inNbrs(sub2ind(r+1,c)) += ind
} }
if(c+1 < nCols) { if (c+1 < nCols) {
outDegree(ind) += 1 outDegree(ind) += 1
inNbrs(sub2ind(r,c+1)) += ind inNbrs(sub2ind(r,c+1)) += ind
} }
} }
// compute the pagerank // compute the pagerank
var pr = Array.fill(nRows * nCols)(resetProb) var pr = Array.fill(nRows * nCols)(resetProb)
for(iter <- 0 until nIter) { for (iter <- 0 until nIter) {
val oldPr = pr val oldPr = pr
pr = new Array[Double](nRows * nCols) pr = new Array[Double](nRows * nCols)
for(ind <- 0 until (nRows * nCols)) { for (ind <- 0 until (nRows * nCols)) {
pr(ind) = resetProb + (1.0 - resetProb) * pr(ind) = resetProb + (1.0 - resetProb) *
inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
} }
@ -64,7 +64,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
}.map { case (vid, test) => test }.sum }.map { case (vid, test) => test }.sum
assert(notMatching === 0) assert(notMatching === 0)
prGraph2.vertices.foreach(println(_)) prGraph2.vertices.foreach(println(_))
val errors = prGraph2.vertices.map{ case (vid, pr) => val errors = prGraph2.vertices.map { case (vid, pr) =>
val correct = (vid > 0 && pr == resetProb) || val correct = (vid > 0 && pr == resetProb) ||
(vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
if ( !correct ) { 1 } else { 0 } if ( !correct ) { 1 } else { 0 }
@ -132,7 +132,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val chain1 = (0 until 9).map(x => (x, x+1) ) val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) ) val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph(rawEdges) val twoChains = Graph(rawEdges, 1.0)
val ccGraph = Analytics.connectedComponents(twoChains).cache() val ccGraph = Analytics.connectedComponents(twoChains).cache()
val vertices = ccGraph.vertices.collect val vertices = ccGraph.vertices.collect
for ( (id, cc) <- vertices ) { for ( (id, cc) <- vertices ) {
@ -141,9 +141,12 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
} }
val ccMap = vertices.toMap val ccMap = vertices.toMap
println(ccMap) println(ccMap)
for( id <- 0 until 20 ) { for (id <- 0 until 20) {
if(id < 10) { assert(ccMap(id) === 0) } if (id < 10) {
else { assert(ccMap(id) === 10) } assert(ccMap(id) === 0)
} else {
assert(ccMap(id) === 10)
}
} }
} }
} // end of chain connected components } // end of chain connected components
@ -153,22 +156,84 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val chain1 = (0 until 9).map(x => (x, x+1) ) val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) ) val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph(rawEdges).reverse val twoChains = Graph(rawEdges, true).reverse
val ccGraph = Analytics.connectedComponents(twoChains).cache() val ccGraph = Analytics.connectedComponents(twoChains).cache()
val vertices = ccGraph.vertices.collect val vertices = ccGraph.vertices.collect
for ( (id, cc) <- vertices ) { for ( (id, cc) <- vertices ) {
if(id < 10) { assert(cc === 0) } if (id < 10) {
else { assert(cc === 10) } assert(cc === 0)
} else {
assert(cc === 10)
}
} }
val ccMap = vertices.toMap val ccMap = vertices.toMap
println(ccMap) println(ccMap)
for( id <- 0 until 20 ) { for ( id <- 0 until 20 ) {
if(id < 10) { assert(ccMap(id) === 0) } if (id < 10) {
else { assert(ccMap(id) === 10) } assert(ccMap(id) === 0)
} else {
assert(ccMap(id) === 10)
}
} }
} }
} // end of chain connected components } // end of reverse chain connected components
test("Count a single triangle") {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2)
val graph = Graph(rawEdges, true).cache
val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }
}
}
test("Count two triangles") {
withSpark(new SparkContext("local", "test")) { sc =>
val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val rawEdges = sc.parallelize(triangles, 2)
val graph = Graph(rawEdges, true).cache
val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) =>
if (vid == 0) {
assert(count === 2)
} else {
assert(count === 1)
}
}
}
}
test("Count two triangles with bi-directed edges") {
withSpark(new SparkContext("local", "test")) { sc =>
val triangles =
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val revTriangles = triangles.map { case (a,b) => (b,a) }
val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
val graph = Graph(rawEdges, true).cache
val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) =>
if (vid == 0) {
assert(count === 4)
} else {
assert(count === 2)
}
}
}
}
test("Count a single triangle with duplicate edges") {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
val graph = Graph(rawEdges, true).cache
val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }
}
}
} // end of AnalyticsSuite } // end of AnalyticsSuite

View file

@ -15,8 +15,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc => withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L) val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L)
val edges = sc.parallelize(rawEdges) val edges = sc.parallelize(rawEdges)
val graph = Graph(edges) val graph = Graph(edges, 1.0F)
assert( graph.edges.count() === rawEdges.size ) assert(graph.edges.count() === rawEdges.size)
} }
} }
@ -29,8 +29,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert( graph.edges.count() === rawEdges.size ) assert( graph.edges.count() === rawEdges.size )
assert( graph.vertices.count() === 100) assert( graph.vertices.count() === 100)
graph.triplets.map { et => graph.triplets.map { et =>
assert( (et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr) ) assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
assert( (et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr) ) assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr))
} }
} }
} }
@ -38,7 +38,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mapEdges") { test("mapEdges") {
withSpark(new SparkContext("local", "test")) { sc => withSpark(new SparkContext("local", "test")) { sc =>
val n = 3 val n = 3
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)))) val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "defaultValue")
val starWithEdgeAttrs = star.mapEdges(e => e.dstId) val starWithEdgeAttrs = star.mapEdges(e => e.dstId)
// map(_.copy()) is a workaround for https://github.com/amplab/graphx/issues/25 // map(_.copy()) is a workaround for https://github.com/amplab/graphx/issues/25
@ -51,10 +51,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mapReduceTriplets") { test("mapReduceTriplets") {
withSpark(new SparkContext("local", "test")) { sc => withSpark(new SparkContext("local", "test")) { sc =>
val n = 3 val n = 3
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)))) val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0)
val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
val neighborDegreeSums = star.mapReduceTriplets( val neighborDegreeSums = starDeg.mapReduceTriplets(
edge => Array((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)), edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
(a: Int, b: Int) => a + b) (a: Int, b: Int) => a + b)
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
} }
@ -63,7 +63,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("aggregateNeighbors") { test("aggregateNeighbors") {
withSpark(new SparkContext("local", "test")) { sc => withSpark(new SparkContext("local", "test")) { sc =>
val n = 3 val n = 3
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)))) val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1)
val indegrees = star.aggregateNeighbors( val indegrees = star.aggregateNeighbors(
(vid, edge) => Some(1), (vid, edge) => Some(1),
@ -99,6 +99,22 @@ class GraphSuite extends FunSuite with LocalSparkContext {
} }
} }
test("collectNeighborIds") {
withSpark(new SparkContext("local", "test")) { sc =>
val chain = (0 until 100).map(x => (x, (x+1)%100) )
val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) }
val graph = Graph(rawEdges, 1.0)
val nbrs = graph.collectNeighborIds(EdgeDirection.Both)
assert(nbrs.count === chain.size)
assert(graph.numVertices === nbrs.count)
nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) }
nbrs.collect.foreach { case (vid, nbrs) =>
val s = nbrs.toSet
assert(s.contains((vid + 1) % 100))
assert(s.contains(if (vid > 0) vid - 1 else 99 ))
}
}
}
test("VertexSetRDD") { test("VertexSetRDD") {
withSpark(new SparkContext("local", "test")) { sc => withSpark(new SparkContext("local", "test")) { sc =>