Merge pull request #436 from ankurdave/VertexId-case

Rename VertexID -> VertexId in GraphX
This commit is contained in:
Reynold Xin 2014-01-14 23:17:05 -08:00
commit 3d9e66d92a
33 changed files with 244 additions and 244 deletions

View file

@ -186,7 +186,7 @@ code constructs a graph from a collection of RDDs:
// Assume the SparkContext has already been constructed // Assume the SparkContext has already been constructed
val sc: SparkContext val sc: SparkContext
// Create an RDD for the vertices // Create an RDD for the vertices
val users: RDD[(VertexID, (String, String))] = val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges // Create an RDD for edges
@ -360,7 +360,7 @@ graph contains the following:
{% highlight scala %} {% highlight scala %}
class Graph[VD, ED] { class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
} }
@ -382,7 +382,7 @@ val newGraph = Graph(newVertices, graph.edges)
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr)) val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
{% endhighlight %} {% endhighlight %}
[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexID,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED] [Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
These operators are often used to initialize the graph for a particular computation or project away These operators are often used to initialize the graph for a particular computation or project away
unnecessary properties. For example, given a graph with the out-degrees as the vertex properties unnecessary properties. For example, given a graph with the out-degrees as the vertex properties
@ -408,7 +408,7 @@ add more in the future. The following is a list of the basic structural operato
class Graph[VD, ED] { class Graph[VD, ED] {
def reverse: Graph[VD, ED] def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexID, VD) => Boolean): Graph[VD, ED] vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
} }
@ -427,11 +427,11 @@ satisfy the edge predicate *and connect vertices that satisfy the vertex predica
operator can be used in number of situations to restrict the graph to the vertices and edges of operator can be used in number of situations to restrict the graph to the vertices and edges of
interest or eliminate broken links. For example in the following code we remove broken links: interest or eliminate broken links. For example in the following code we remove broken links:
[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexID,VD)⇒Boolean):Graph[VD,ED] [Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]
{% highlight scala %} {% highlight scala %}
// Create an RDD for the vertices // Create an RDD for the vertices
val users: RDD[(VertexID, (String, String))] = val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student")))) (4L, ("peter", "student"))))
@ -494,9 +494,9 @@ using the *join* operators. Below we list the key join operators:
{% highlight scala %} {% highlight scala %}
class Graph[VD, ED] { class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD) def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED] : Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2) def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED] : Graph[VD2, ED]
} }
{% endhighlight %} {% endhighlight %}
@ -506,7 +506,7 @@ returns a new graph with the vertex properties obtained by applying the user def
to the result of the joined vertices. Vertices without a matching value in the RDD retain their to the result of the joined vertices. Vertices without a matching value in the RDD retain their
original value. original value.
[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexID,U)])((VertexID,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED] [GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
> Note that if the RDD contains more than one value for a given vertex only one will be used. It > Note that if the RDD contains more than one value for a given vertex only one will be used. It
> is therefore recommended that the input RDD be first made unique using the following which will > is therefore recommended that the input RDD be first made unique using the following which will
@ -525,7 +525,7 @@ property type. Because not all vertices may have a matching value in the input
function takes an `Option` type. For example, we can setup a graph for PageRank by initializing function takes an `Option` type. For example, we can setup a graph for PageRank by initializing
vertex properties with their `outDegree`. vertex properties with their `outDegree`.
[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexID,U)])((VertexID,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED] [Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
{% highlight scala %} {% highlight scala %}
@ -559,7 +559,7 @@ PageRank Value, shortest path to the source, and smallest reachable vertex id).
### Map Reduce Triplets (mapReduceTriplets) ### Map Reduce Triplets (mapReduceTriplets)
<a name="mrTriplets"></a> <a name="mrTriplets"></a>
[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A] [Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexId,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
The core (heavily optimized) aggregation primitive in GraphX is the The core (heavily optimized) aggregation primitive in GraphX is the
[`mapReduceTriplets`][Graph.mapReduceTriplets] operator: [`mapReduceTriplets`][Graph.mapReduceTriplets] operator:
@ -567,7 +567,7 @@ The core (heavily optimized) aggregation primitive in GraphX is the
{% highlight scala %} {% highlight scala %}
class Graph[VD, ED] { class Graph[VD, ED] {
def mapReduceTriplets[A]( def mapReduceTriplets[A](
map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduce: (A, A) => A) reduce: (A, A) => A)
: VertexRDD[A] : VertexRDD[A]
} }
@ -649,13 +649,13 @@ compute the max in, out, and total degrees:
{% highlight scala %} {% highlight scala %}
// Define a reduce operation to compute the highest degree vertex // Define a reduce operation to compute the highest degree vertex
def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = { def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b if (a._2 > b._2) a else b
} }
// Compute the max degrees // Compute the max degrees
val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max) val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max) val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max) val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
{% endhighlight %} {% endhighlight %}
### Collecting Neighbors ### Collecting Neighbors
@ -665,14 +665,14 @@ attributes at each vertex. This can be easily accomplished using the
[`collectNeighborIds`][GraphOps.collectNeighborIds] and the [`collectNeighborIds`][GraphOps.collectNeighborIds] and the
[`collectNeighbors`][GraphOps.collectNeighbors] operators. [`collectNeighbors`][GraphOps.collectNeighbors] operators.
[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]] [GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]]
[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]] [GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]
{% highlight scala %} {% highlight scala %}
class GraphOps[VD, ED] { class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ] def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
} }
{% endhighlight %} {% endhighlight %}
@ -716,7 +716,7 @@ messages remaining.
The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch* The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
of its implementation (note calls to graph.cache have been removed): of its implementation (note calls to graph.cache have been removed):
[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexID,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED] [GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
{% highlight scala %} {% highlight scala %}
class GraphOps[VD, ED] { class GraphOps[VD, ED] {
@ -724,8 +724,8 @@ class GraphOps[VD, ED] {
(initialMsg: A, (initialMsg: A,
maxIter: Int = Int.MaxValue, maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out) activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexID, VD, A) => VD, (vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A) mergeMsg: (A, A) => A)
: Graph[VD, ED] = { : Graph[VD, ED] = {
// Receive the initial message at each vertex // Receive the initial message at each vertex
@ -770,7 +770,7 @@ import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances // A graph with edge attributes containing distances
val graph: Graph[Int, Double] = val graph: Graph[Int, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble) GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexID = 42 // The ultimate source val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity. // Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)( val sssp = initialGraph.pregel(Double.PositiveInfinity)(
@ -817,7 +817,7 @@ It creates a `Graph` from the specified edges, automatically creating any vertic
{% highlight scala %} {% highlight scala %}
object Graph { object Graph {
def apply[VD, ED]( def apply[VD, ED](
vertices: RDD[(VertexID, VD)], vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]], edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null) defaultVertexAttr: VD = null)
: Graph[VD, ED] : Graph[VD, ED]
@ -827,7 +827,7 @@ object Graph {
defaultValue: VD): Graph[VD, ED] defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD]( def fromEdgeTuples[VD](
rawEdges: RDD[(VertexID, VertexID)], rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD, defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
@ -843,8 +843,8 @@ object Graph {
[PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy$ [PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy$
[GraphLoader.edgeListFile]: api/graphx/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int] [GraphLoader.edgeListFile]: api/graphx/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]
[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexID,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED] [Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexID,VertexID)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int] [Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
[Graph.fromEdges]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED] [Graph.fromEdges]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
# Vertex and Edge RDDs # Vertex and Edge RDDs
@ -868,17 +868,17 @@ additional functionality:
{% highlight scala %} {% highlight scala %}
class VertexRDD[VD] extends RDD[(VertexID, VD)] { class VertexRDD[VD] extends RDD[(VertexID, VD)] {
// Filter the vertex set but preserves the internal index // Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index) // Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2] def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2] def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set // Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD] def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially) // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD. // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
} }
{% endhighlight %} {% endhighlight %}
@ -896,7 +896,7 @@ both aggregate and then subsequently index the `RDD[(VertexID, A)]`. For exampl
{% highlight scala %} {% highlight scala %}
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1))) val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB // There should be 200 entries in rddB
rddB.count rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _) val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
@ -922,7 +922,7 @@ def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure // Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED] def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy. // Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
{% endhighlight %} {% endhighlight %}
In most applications we have found that operations on the `EdgeRDD` are accomplished through the In most applications we have found that operations on the `EdgeRDD` are accomplished through the

View file

@ -28,8 +28,8 @@ package org.apache.spark.graphx
* @param attr The attribute associated with the edge * @param attr The attribute associated with the edge
*/ */
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
var srcId: VertexID = 0, var srcId: VertexId = 0,
var dstId: VertexID = 0, var dstId: VertexId = 0,
var attr: ED = null.asInstanceOf[ED]) var attr: ED = null.asInstanceOf[ED])
extends Serializable { extends Serializable {
@ -39,7 +39,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @param vid the id one of the two vertices on the edge. * @param vid the id one of the two vertices on the edge.
* @return the id of the other vertex on the edge. * @return the id of the other vertex on the edge.
*/ */
def otherVertexId(vid: VertexID): VertexID = def otherVertexId(vid: VertexId): VertexId =
if (srcId == vid) dstId else { assert(dstId == vid); srcId } if (srcId == vid) dstId else { assert(dstId == vid); srcId }
/** /**
@ -50,7 +50,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @return the relative direction of the edge to the corresponding * @return the relative direction of the edge to the corresponding
* vertex. * vertex.
*/ */
def relativeDirection(vid: VertexID): EdgeDirection = def relativeDirection(vid: VertexId): EdgeDirection =
if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In } if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
} }

View file

@ -102,7 +102,7 @@ class EdgeRDD[@specialized ED: ClassTag](
*/ */
def innerJoin[ED2: ClassTag, ED3: ClassTag] def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2]) (other: EdgeRDD[ED2])
(f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = { (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
val ed2Tag = classTag[ED2] val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3] val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) { new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
@ -113,7 +113,7 @@ class EdgeRDD[@specialized ED: ClassTag](
}) })
} }
private[graphx] def collectVertexIDs(): RDD[VertexID] = { private[graphx] def collectVertexIds(): RDD[VertexId] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
} }
} }

View file

@ -50,7 +50,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id one of the two vertices on the edge * @param vid the id one of the two vertices on the edge
* @return the attribute for the other vertex on the edge * @return the attribute for the other vertex on the edge
*/ */
def otherVertexAttr(vid: VertexID): VD = def otherVertexAttr(vid: VertexId): VD =
if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr } if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr }
/** /**
@ -59,7 +59,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id of one of the two vertices on the edge * @param vid the id of one of the two vertices on the edge
* @return the attr for the vertex with that id * @return the attr for the vertex with that id
*/ */
def vertexAttr(vid: VertexID): VD = def vertexAttr(vid: VertexId): VD =
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr } if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()

View file

@ -126,7 +126,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* }}} * }}}
* *
*/ */
def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED] def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
/** /**
* Transforms each edge attribute in the graph using the map function. The map function is not * Transforms each edge attribute in the graph using the map function. The map function is not
@ -242,7 +242,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/ */
def subgraph( def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true), epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexID, VD) => Boolean = ((v, d) => true)) vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED] : Graph[VD, ED]
/** /**
@ -292,7 +292,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* vertex * vertex
* {{{ * {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph") * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
* val inDeg: RDD[(VertexID, Int)] = * val inDeg: RDD[(VertexId, Int)] =
* mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _) * mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
* }}} * }}}
* *
@ -304,7 +304,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* *
*/ */
def mapReduceTriplets[A: ClassTag]( def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A, reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A] : VertexRDD[A]
@ -328,14 +328,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* *
* {{{ * {{{
* val rawGraph: Graph[_, _] = Graph.textFile("webgraph") * val rawGraph: Graph[_, _] = Graph.textFile("webgraph")
* val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees() * val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees()
* val graph = rawGraph.outerJoinVertices(outDeg) { * val graph = rawGraph.outerJoinVertices(outDeg) {
* (vid, data, optDeg) => optDeg.getOrElse(0) * (vid, data, optDeg) => optDeg.getOrElse(0)
* } * }
* }}} * }}}
*/ */
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(mapFunc: (VertexID, VD, Option[U]) => VD2) (mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED] : Graph[VD2, ED]
/** /**
@ -364,7 +364,7 @@ object Graph {
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex. * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
*/ */
def fromEdgeTuples[VD: ClassTag]( def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexID, VertexID)], rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD, defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
{ {
@ -405,7 +405,7 @@ object Graph {
* mentioned in edges but not in vertices * mentioned in edges but not in vertices
*/ */
def apply[VD: ClassTag, ED: ClassTag]( def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexID, VD)], vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]], edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr) GraphImpl(vertices, edges, defaultVertexAttr)

View file

@ -33,7 +33,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]]) kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]]) kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]]) kryo.register(classOf[VertexBroadcastMsg[Object]])
kryo.register(classOf[(VertexID, Object)]) kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object]]) kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet]) kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap]) kryo.register(classOf[VertexIdToIndexMap])

View file

@ -80,19 +80,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* *
* @return the set of neighboring ids for each vertex * @return the set of neighboring ids for each vertex
*/ */
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = { def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
val nbrs = val nbrs =
if (edgeDirection == EdgeDirection.Either) { if (edgeDirection == EdgeDirection.Either) {
graph.mapReduceTriplets[Array[VertexID]]( graph.mapReduceTriplets[Array[VertexId]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))), mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _ reduceFunc = _ ++ _
) )
} else if (edgeDirection == EdgeDirection.Out) { } else if (edgeDirection == EdgeDirection.Out) {
graph.mapReduceTriplets[Array[VertexID]]( graph.mapReduceTriplets[Array[VertexId]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId))), mapFunc = et => Iterator((et.srcId, Array(et.dstId))),
reduceFunc = _ ++ _) reduceFunc = _ ++ _)
} else if (edgeDirection == EdgeDirection.In) { } else if (edgeDirection == EdgeDirection.In) {
graph.mapReduceTriplets[Array[VertexID]]( graph.mapReduceTriplets[Array[VertexId]](
mapFunc = et => Iterator((et.dstId, Array(et.srcId))), mapFunc = et => Iterator((et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _) reduceFunc = _ ++ _)
} else { } else {
@ -100,7 +100,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)") "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
} }
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse(Array.empty[VertexID]) nbrsOpt.getOrElse(Array.empty[VertexId])
} }
} // end of collectNeighborIds } // end of collectNeighborIds
@ -116,8 +116,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* *
* @return the vertex set of neighboring vertex attributes for each vertex * @return the vertex set of neighboring vertex attributes for each vertex
*/ */
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = { def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]]( val nbrs = graph.mapReduceTriplets[Array[(VertexId,VD)]](
edge => { edge => {
val msgToSrc = (edge.srcId, Array((edge.dstId, edge.dstAttr))) val msgToSrc = (edge.srcId, Array((edge.dstId, edge.dstAttr)))
val msgToDst = (edge.dstId, Array((edge.srcId, edge.srcAttr))) val msgToDst = (edge.dstId, Array((edge.srcId, edge.srcAttr)))
@ -133,7 +133,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
(a, b) => a ++ b) (a, b) => a ++ b)
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse(Array.empty[(VertexID, VD)]) nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
} }
} // end of collectNeighbor } // end of collectNeighbor
@ -164,9 +164,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* }}} * }}}
* *
*/ */
def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD) def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = { : Graph[VD, ED] = {
val uf = (id: VertexID, data: VD, o: Option[U]) => { val uf = (id: VertexId, data: VD, o: Option[U]) => {
o match { o match {
case Some(u) => mapFunc(id, data, u) case Some(u) => mapFunc(id, data, u)
case None => data case None => data
@ -197,7 +197,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* val degrees: VertexRDD[Int] = graph.outDegrees * val degrees: VertexRDD[Int] = graph.outDegrees
* graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} * graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
* }, * },
* vpred = (vid: VertexID, deg:Int) => deg > 0 * vpred = (vid: VertexId, deg:Int) => deg > 0
* ) * )
* }}} * }}}
* *
@ -205,7 +205,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def filter[VD2: ClassTag, ED2: ClassTag]( def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2], preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true, epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = { vpred: (VertexId, VD2) => Boolean = (v:VertexId, d:VD2) => true): Graph[VD, ED] = {
graph.mask(preprocess(graph).subgraph(epred, vpred)) graph.mask(preprocess(graph).subgraph(epred, vpred))
} }
@ -260,8 +260,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
initialMsg: A, initialMsg: A,
maxIterations: Int = Int.MaxValue, maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)( activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexID, VD, A) => VD, vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
mergeMsg: (A, A) => A) mergeMsg: (A, A) => A)
: Graph[VD, ED] = { : Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
@ -293,7 +293,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* *
* @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
*/ */
def connectedComponents(): Graph[VertexID, ED] = { def connectedComponents(): Graph[VertexId, ED] = {
ConnectedComponents.run(graph) ConnectedComponents.run(graph)
} }
@ -312,7 +312,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* *
* @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents$#run]] * @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents$#run]]
*/ */
def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = { def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = {
StronglyConnectedComponents.run(graph, numIter) StronglyConnectedComponents.run(graph, numIter)
} }
} // end of GraphOps } // end of GraphOps

View file

@ -23,7 +23,7 @@ package org.apache.spark.graphx
*/ */
trait PartitionStrategy extends Serializable { trait PartitionStrategy extends Serializable {
/** Returns the partition number for a given edge. */ /** Returns the partition number for a given edge. */
def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
} }
/** /**
@ -73,9 +73,9 @@ object PartitionStrategy {
* is used. * is used.
*/ */
case object EdgePartition2D extends PartitionStrategy { case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: VertexID = 1125899906842597L val mixingPrime: VertexId = 1125899906842597L
val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts (col * ceilSqrtNumParts + row) % numParts
@ -87,8 +87,8 @@ object PartitionStrategy {
* source. * source.
*/ */
case object EdgePartition1D extends PartitionStrategy { case object EdgePartition1D extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val mixingPrime: VertexID = 1125899906842597L val mixingPrime: VertexId = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts (math.abs(src) * mixingPrime).toInt % numParts
} }
} }
@ -99,7 +99,7 @@ object PartitionStrategy {
* random vertex cut that colocates all same-direction edges between two vertices. * random vertex cut that colocates all same-direction edges between two vertices.
*/ */
case object RandomVertexCut extends PartitionStrategy { case object RandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
math.abs((src, dst).hashCode()) % numParts math.abs((src, dst).hashCode()) % numParts
} }
} }
@ -111,7 +111,7 @@ object PartitionStrategy {
* regardless of direction. * regardless of direction.
*/ */
case object CanonicalRandomVertexCut extends PartitionStrategy { case object CanonicalRandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val lower = math.min(src, dst) val lower = math.min(src, dst)
val higher = math.max(src, dst) val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts math.abs((lower, higher).hashCode()) % numParts

View file

@ -40,9 +40,9 @@ import scala.reflect.ClassTag
* // Set the vertex attributes to the initial pagerank values * // Set the vertex attributes to the initial pagerank values
* .mapVertices((id, attr) => 1.0) * .mapVertices((id, attr) => 1.0)
* *
* def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
* resetProb + (1.0 - resetProb) * msgSum * resetProb + (1.0 - resetProb) * msgSum
* def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
* Iterator((edge.dstId, edge.srcAttr * edge.attr)) * Iterator((edge.dstId, edge.srcAttr * edge.attr))
* def messageCombiner(a: Double, b: Double): Double = a + b * def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0 * val initialMessage = 0.0
@ -113,8 +113,8 @@ object Pregel {
initialMsg: A, initialMsg: A,
maxIterations: Int = Int.MaxValue, maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either) activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexID, VD, A) => VD, (vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A) mergeMsg: (A, A) => A)
: Graph[VD, ED] = : Graph[VD, ED] =
{ {

View file

@ -28,7 +28,7 @@ import org.apache.spark.graphx.impl.MsgRDDFunctions
import org.apache.spark.graphx.impl.VertexPartition import org.apache.spark.graphx.impl.VertexPartition
/** /**
* Extends `RDD[(VertexID, VD)]` by ensuring that there is only one entry for each vertex and by * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
* pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be * pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be
* joined efficiently. All operations except [[reindex]] preserve the index. To construct a * joined efficiently. All operations except [[reindex]] preserve the index. To construct a
* `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]]. * `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]].
@ -36,12 +36,12 @@ import org.apache.spark.graphx.impl.VertexPartition
* @example Construct a `VertexRDD` from a plain RDD: * @example Construct a `VertexRDD` from a plain RDD:
* {{{ * {{{
* // Construct an initial vertex set * // Construct an initial vertex set
* val someData: RDD[(VertexID, SomeType)] = loadData(someFile) * val someData: RDD[(VertexId, SomeType)] = loadData(someFile)
* val vset = VertexRDD(someData) * val vset = VertexRDD(someData)
* // If there were redundant values in someData we would use a reduceFunc * // If there were redundant values in someData we would use a reduceFunc
* val vset2 = VertexRDD(someData, reduceFunc) * val vset2 = VertexRDD(someData, reduceFunc)
* // Finally we can use the VertexRDD to index another dataset * // Finally we can use the VertexRDD to index another dataset
* val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile) * val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile)
* val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b } * val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
* // Now we can construct very fast joins between the two sets * // Now we can construct very fast joins between the two sets
* val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3) * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
@ -51,7 +51,7 @@ import org.apache.spark.graphx.impl.VertexPartition
*/ */
class VertexRDD[@specialized VD: ClassTag]( class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[VertexPartition[VD]]) val partitionsRDD: RDD[VertexPartition[VD]])
extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
require(partitionsRDD.partitioner.isDefined) require(partitionsRDD.partitioner.isDefined)
@ -92,9 +92,9 @@ class VertexRDD[@specialized VD: ClassTag](
} }
/** /**
* Provides the `RDD[(VertexID, VD)]` equivalent output. * Provides the `RDD[(VertexId, VD)]` equivalent output.
*/ */
override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = { override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
firstParent[VertexPartition[VD]].iterator(part, context).next.iterator firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
} }
@ -114,9 +114,9 @@ class VertexRDD[@specialized VD: ClassTag](
* rather than allocating new memory. * rather than allocating new memory.
* *
* @param pred the user defined predicate, which takes a tuple to conform to the * @param pred the user defined predicate, which takes a tuple to conform to the
* `RDD[(VertexID, VD)]` interface * `RDD[(VertexId, VD)]` interface
*/ */
override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] = override def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] =
this.mapVertexPartitions(_.filter(Function.untupled(pred))) this.mapVertexPartitions(_.filter(Function.untupled(pred)))
/** /**
@ -140,7 +140,7 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
* original VertexRDD. The resulting VertexRDD retains the same index. * original VertexRDD. The resulting VertexRDD retains the same index.
*/ */
def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] = def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f)) this.mapVertexPartitions(_.map(f))
/** /**
@ -172,7 +172,7 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a VertexRDD containing the results of `f` * @return a VertexRDD containing the results of `f`
*/ */
def leftZipJoin[VD2: ClassTag, VD3: ClassTag] def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
val newPartitionsRDD = partitionsRDD.zipPartitions( val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) => ) { (thisIter, otherIter) =>
@ -200,8 +200,8 @@ class VertexRDD[@specialized VD: ClassTag](
* by `f`. * by `f`.
*/ */
def leftJoin[VD2: ClassTag, VD3: ClassTag] def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: RDD[(VertexID, VD2)]) (other: RDD[(VertexId, VD2)])
(f: (VertexID, VD, Option[VD2]) => VD3) (f: (VertexId, VD, Option[VD2]) => VD3)
: VertexRDD[VD3] = { : VertexRDD[VD3] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy. // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin // If the other set is a VertexRDD then we use the much more efficient leftZipJoin
@ -225,7 +225,7 @@ class VertexRDD[@specialized VD: ClassTag](
* [[innerJoin]] for the behavior of the join. * [[innerJoin]] for the behavior of the join.
*/ */
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.zipPartitions( val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) => ) { (thisIter, otherIter) =>
@ -247,8 +247,8 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this` * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this`
* and `other`, with values supplied by `f` * and `other`, with values supplied by `f`
*/ */
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy. // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin // If the other set is a VertexRDD then we use the much more efficient innerZipJoin
other match { other match {
@ -278,7 +278,7 @@ class VertexRDD[@specialized VD: ClassTag](
* messages. * messages.
*/ */
def aggregateUsingIndex[VD2: ClassTag]( def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
val vertexPartition: VertexPartition[VD] = thisIter.next() val vertexPartition: VertexPartition[VD] = thisIter.next()
@ -303,8 +303,8 @@ object VertexRDD {
* *
* @param rdd the collection of vertex-attribute pairs * @param rdd the collection of vertex-attribute pairs
*/ */
def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = { def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = {
val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
case Some(p) => rdd case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
} }
@ -323,8 +323,8 @@ object VertexRDD {
* @param rdd the collection of vertex-attribute pairs * @param rdd the collection of vertex-attribute pairs
* @param mergeFunc the associative, commutative merge function. * @param mergeFunc the associative, commutative merge function.
*/ */
def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = { def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
case Some(p) => rdd case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
} }
@ -338,7 +338,7 @@ object VertexRDD {
* Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
* `defaultVal` otherwise. * `defaultVal` otherwise.
*/ */
def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD) def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD)
: VertexRDD[VD] = { : VertexRDD[VD] = {
VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) => VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
value.getOrElse(default) value.getOrElse(default)

View file

@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
*/ */
private[graphx] private[graphx]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
val srcIds: Array[VertexID], val srcIds: Array[VertexId],
val dstIds: Array[VertexID], val dstIds: Array[VertexId],
val data: Array[ED], val data: Array[ED],
val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable { val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
/** /**
* Reverse all the edges in this partition. * Reverse all the edges in this partition.
@ -118,8 +118,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/ */
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = { def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
val builder = new EdgePartitionBuilder[ED] val builder = new EdgePartitionBuilder[ED]
var currSrcId: VertexID = null.asInstanceOf[VertexID] var currSrcId: VertexId = null.asInstanceOf[VertexId]
var currDstId: VertexID = null.asInstanceOf[VertexID] var currDstId: VertexId = null.asInstanceOf[VertexId]
var currAttr: ED = null.asInstanceOf[ED] var currAttr: ED = null.asInstanceOf[ED]
var i = 0 var i = 0
while (i < size) { while (i < size) {
@ -153,7 +153,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/ */
def innerJoin[ED2: ClassTag, ED3: ClassTag] def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2]) (other: EdgePartition[ED2])
(f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = { (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = {
val builder = new EdgePartitionBuilder[ED3] val builder = new EdgePartitionBuilder[ED3]
var i = 0 var i = 0
var j = 0 var j = 0
@ -210,14 +210,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* iterator is generated using an index scan, so it is efficient at skipping edges that don't * iterator is generated using an index scan, so it is efficient at skipping edges that don't
* match srcIdPred. * match srcIdPred.
*/ */
def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] = def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator)) index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
/** /**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`. * cluster must start at position `index`.
*/ */
private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] { private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED] private[this] val edge = new Edge[ED]
private[this] var pos = index private[this] var pos = index

View file

@ -29,22 +29,22 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
var edges = new PrimitiveVector[Edge[ED]](size) var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */ /** Add a new edge to the partition. */
def add(src: VertexID, dst: VertexID, d: ED) { def add(src: VertexId, dst: VertexId, d: ED) {
edges += Edge(src, dst, d) edges += Edge(src, dst, d)
} }
def toEdgePartition: EdgePartition[ED] = { def toEdgePartition: EdgePartition[ED] = {
val edgeArray = edges.trim().array val edgeArray = edges.trim().array
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
val srcIds = new Array[VertexID](edgeArray.size) val srcIds = new Array[VertexId](edgeArray.size)
val dstIds = new Array[VertexID](edgeArray.size) val dstIds = new Array[VertexId](edgeArray.size)
val data = new Array[ED](edgeArray.size) val data = new Array[ED](edgeArray.size)
val index = new PrimitiveKeyOpenHashMap[VertexID, Int] val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index // adding them to the index
if (edgeArray.length > 0) { if (edgeArray.length > 0) {
index.update(srcIds(0), 0) index.update(srcIds(0), 0)
var currSrcId: VertexID = srcIds(0) var currSrcId: VertexId = srcIds(0)
var i = 0 var i = 0
while (i < edgeArray.size) { while (i < edgeArray.size) {
srcIds(i) = edgeArray(i).srcId srcIds(i) = edgeArray(i).srcId

View file

@ -41,7 +41,7 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
// allocating too many temporary Java objects. // allocating too many temporary Java objects.
private val triplet = new EdgeTriplet[VD, ED] private val triplet = new EdgeTriplet[VD, ED]
private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray) private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size override def hasNext: Boolean = pos < edgePartition.size

View file

@ -105,7 +105,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
} }
override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = { override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) { if (classTag[VD] equals classTag[VD2]) {
// The map preserves type, so we can use incremental replication // The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache() val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
@ -153,7 +153,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def subgraph( override def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = x => true, epred: EdgeTriplet[VD, ED] => Boolean = x => true,
vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
// Filter the vertices, reusing the partitioner and the index from this graph // Filter the vertices, reusing the partitioner and the index from this graph
val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
@ -195,7 +195,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
override def mapReduceTriplets[A: ClassTag]( override def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A, reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = { activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
@ -225,7 +225,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val edgeIter = activeDirectionOpt match { val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) => case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) { if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
.filter(e => vPart.isActive(e.dstId)) .filter(e => vPart.isActive(e.dstId))
} else { } else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
@ -236,7 +236,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId)) edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
case Some(EdgeDirection.Out) => case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) { if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
} else { } else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
} }
@ -267,8 +267,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
} // end of mapReduceTriplets } // end of mapReduceTriplets
override def outerJoinVertices[U: ClassTag, VD2: ClassTag] override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexID, U)]) (other: RDD[(VertexId, U)])
(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] =
{ {
if (classTag[VD] equals classTag[VD2]) { if (classTag[VD] equals classTag[VD2]) {
// updateF preserves type, so we can use incremental replication // updateF preserves type, so we can use incremental replication
@ -312,7 +312,7 @@ object GraphImpl {
} }
def apply[VD: ClassTag, ED: ClassTag]( def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexID, VD)], vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]], edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] = defaultVertexAttr: VD): GraphImpl[VD, ED] =
{ {
@ -321,7 +321,7 @@ object GraphImpl {
// Get the set of all vids // Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices) val partitioner = Partitioner.defaultPartitioner(vertices)
val vPartitioned = vertices.partitionBy(partitioner) val vPartitioned = vertices.partitionBy(partitioner)
val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner) val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
} }
@ -355,7 +355,7 @@ object GraphImpl {
/** /**
* Create the edge RDD, which is much more efficient for Java heap storage than the normal edges * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
* data structure (RDD[(VertexID, VertexID, ED)]). * data structure (RDD[(VertexId, VertexId, ED)]).
* *
* The edge RDD contains multiple partitions, and each partition contains only one RDD key-value * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
* pair: the key is the partition id, and the value is an EdgePartition object containing all the * pair: the key is the partition id, and the value is an EdgePartition object containing all the
@ -378,19 +378,19 @@ object GraphImpl {
defaultVertexAttr: VD): GraphImpl[VD, ED] = { defaultVertexAttr: VD): GraphImpl[VD, ED] = {
edges.cache() edges.cache()
// Get the set of all vids // Get the set of all vids
val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size)) val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size))
// Create the VertexRDD. // Create the VertexRDD.
val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr)) val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
GraphImpl(vertices, edges) GraphImpl(vertices, edges)
} }
/** Collects all vids mentioned in edges and partitions them by partitioner. */ /** Collects all vids mentioned in edges and partitions them by partitioner. */
private def collectVertexIDsFromEdges( private def collectVertexIdsFromEdges(
edges: EdgeRDD[_], edges: EdgeRDD[_],
partitioner: Partitioner): RDD[(VertexID, Int)] = { partitioner: Partitioner): RDD[(VertexId, Int)] = {
// TODO: Consider doing map side distinct before shuffle. // TODO: Consider doing map side distinct before shuffle.
new ShuffledRDD[VertexID, Int, (VertexID, Int)]( new ShuffledRDD[VertexId, Int, (VertexId, Int)](
edges.collectVertexIDs.map(vid => (vid, 0)), partitioner) edges.collectVertexIds.map(vid => (vid, 0)), partitioner)
.setSerializer(classOf[VertexIDMsgSerializer].getName) .setSerializer(classOf[VertexIdMsgSerializer].getName)
} }
} // end of object GraphImpl } // end of object GraphImpl

View file

@ -20,16 +20,16 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag} import scala.reflect.{classTag, ClassTag}
import org.apache.spark.Partitioner import org.apache.spark.Partitioner
import org.apache.spark.graphx.{PartitionID, VertexID} import org.apache.spark.graphx.{PartitionID, VertexId}
import org.apache.spark.rdd.{ShuffledRDD, RDD} import org.apache.spark.rdd.{ShuffledRDD, RDD}
private[graphx] private[graphx]
class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
@transient var partition: PartitionID, @transient var partition: PartitionID,
var vid: VertexID, var vid: VertexId,
var data: T) var data: T)
extends Product2[PartitionID, (VertexID, T)] with Serializable { extends Product2[PartitionID, (VertexId, T)] with Serializable {
override def _1 = partition override def _1 = partition
@ -61,7 +61,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
private[graphx] private[graphx]
class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) { class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = { def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner) val rdd = new ShuffledRDD[PartitionID, (VertexId, T), VertexBroadcastMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type. // Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) { if (classTag[T] == ClassTag.Int) {
@ -99,8 +99,8 @@ object MsgRDDFunctions {
new VertexBroadcastMsgRDDFunctions(rdd) new VertexBroadcastMsgRDDFunctions(rdd)
} }
def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = { def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = {
val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner) val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner)
// Set a custom serializer if the data is of int or double type. // Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) { if (classTag[T] == ClassTag.Int) {

View file

@ -50,9 +50,9 @@ class ReplicatedVertexView[VD: ClassTag](
* vids from both the source and destination of edges. It must always include both source and * vids from both the source and destination of edges. It must always include both source and
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
*/ */
private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
case Some(prevView) => case Some(prevView) =>
prevView.localVertexIDMap prevView.localVertexIdMap
case None => case None =>
edges.partitionsRDD.mapPartitions(_.map { edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) => case (pid, epart) =>
@ -62,7 +62,7 @@ class ReplicatedVertexView[VD: ClassTag](
vidToIndex.add(e.dstId) vidToIndex.add(e.dstId)
} }
(pid, vidToIndex) (pid, vidToIndex)
}, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap") }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap")
} }
private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true) private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
@ -75,7 +75,7 @@ class ReplicatedVertexView[VD: ClassTag](
srcAttrOnly.unpersist(blocking) srcAttrOnly.unpersist(blocking)
dstAttrOnly.unpersist(blocking) dstAttrOnly.unpersist(blocking)
noAttrs.unpersist(blocking) noAttrs.unpersist(blocking)
// Don't unpersist localVertexIDMap because a future ReplicatedVertexView may be using it // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it
// without modification // without modification
this this
} }
@ -133,8 +133,8 @@ class ReplicatedVertexView[VD: ClassTag](
case None => case None =>
// Within each edge partition, place the shipped vertex attributes into the correct // Within each edge partition, place the shipped vertex attributes into the correct
// locations specified in localVertexIDMap // locations specified in localVertexIdMap
localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
val (pid, vidToIndex) = mapIter.next() val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext) assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map // Populate the vertex array using the vidToIndex map
@ -157,15 +157,15 @@ class ReplicatedVertexView[VD: ClassTag](
private object ReplicatedVertexView { private object ReplicatedVertexView {
protected def buildBuffer[VD: ClassTag]( protected def buildBuffer[VD: ClassTag](
pid2vidIter: Iterator[Array[Array[VertexID]]], pid2vidIter: Iterator[Array[Array[VertexId]]],
vertexPartIter: Iterator[VertexPartition[VD]]) = { vertexPartIter: Iterator[VertexPartition[VD]]) = {
val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
val vertexPart: VertexPartition[VD] = vertexPartIter.next() val vertexPart: VertexPartition[VD] = vertexPartIter.next()
Iterator.tabulate(pid2vid.size) { pid => Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid) val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length val size = vidsCandidate.length
val vids = new PrimitiveVector[VertexID](pid2vid(pid).size) val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
val attrs = new PrimitiveVector[VD](pid2vid(pid).size) val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
var i = 0 var i = 0
while (i < size) { while (i < size) {
@ -181,16 +181,16 @@ private object ReplicatedVertexView {
} }
protected def buildActiveBuffer( protected def buildActiveBuffer(
pid2vidIter: Iterator[Array[Array[VertexID]]], pid2vidIter: Iterator[Array[Array[VertexId]]],
activePartIter: Iterator[VertexPartition[_]]) activePartIter: Iterator[VertexPartition[_]])
: Iterator[(Int, Array[VertexID])] = { : Iterator[(Int, Array[VertexId])] = {
val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
val activePart: VertexPartition[_] = activePartIter.next() val activePart: VertexPartition[_] = activePartIter.next()
Iterator.tabulate(pid2vid.size) { pid => Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid) val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length val size = vidsCandidate.length
val actives = new PrimitiveVector[VertexID](vidsCandidate.size) val actives = new PrimitiveVector[VertexId](vidsCandidate.size)
var i = 0 var i = 0
while (i < size) { while (i < size) {
val vid = vidsCandidate(i) val vid = vidsCandidate(i)
@ -205,8 +205,8 @@ private object ReplicatedVertexView {
} }
private[graphx] private[graphx]
class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD]) class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
extends Serializable { extends Serializable {
def iterator: Iterator[(VertexID, VD)] = def iterator: Iterator[(VertexId, VD)] =
(0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
} }

View file

@ -32,12 +32,12 @@ import org.apache.spark.util.collection.PrimitiveVector
private[impl] private[impl]
class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true) val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true)
val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false) val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false)
val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true) val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true)
val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false) val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false)
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] =
(includeSrcAttr, includeDstAttr) match { (includeSrcAttr, includeDstAttr) match {
case (true, true) => bothAttrs case (true, true) => bothAttrs
case (true, false) => srcAttrOnly case (true, false) => srcAttrOnly
@ -46,9 +46,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
} }
private def createPid2Vid( private def createPid2Vid(
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = { includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid. // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size val numEdges = edgePartition.size
val vSet = new VertexSet val vSet = new VertexSet
@ -71,7 +71,7 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
val numPartitions = vertices.partitions.size val numPartitions = vertices.partitions.size
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID]) val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
for ((vid, pid) <- iter) { for ((vid, pid) <- iter) {
pid2vid(pid) += vid pid2vid(pid) += vid
} }

View file

@ -25,12 +25,12 @@ import org.apache.spark.graphx._
import org.apache.spark.serializer._ import org.apache.spark.serializer._
private[graphx] private[graphx]
class VertexIDMsgSerializer(conf: SparkConf) extends Serializer { class VertexIdMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = { def writeObject[T](t: T) = {
val msg = t.asInstanceOf[(VertexID, _)] val msg = t.asInstanceOf[(VertexId, _)]
writeVarLong(msg._1, optimizePositive = false) writeVarLong(msg._1, optimizePositive = false)
this this
} }
@ -123,7 +123,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = { def writeObject[T](t: T) = {
val msg = t.asInstanceOf[(VertexID, Int)] val msg = t.asInstanceOf[(VertexId, Int)]
writeVarLong(msg._1, optimizePositive = false) writeVarLong(msg._1, optimizePositive = false)
writeUnsignedVarInt(msg._2) writeUnsignedVarInt(msg._2)
this this
@ -147,7 +147,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = { def writeObject[T](t: T) = {
val msg = t.asInstanceOf[(VertexID, Long)] val msg = t.asInstanceOf[(VertexId, Long)]
writeVarLong(msg._1, optimizePositive = false) writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg._2, optimizePositive = true) writeVarLong(msg._2, optimizePositive = true)
this this
@ -171,7 +171,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = { def writeObject[T](t: T) = {
val msg = t.asInstanceOf[(VertexID, Double)] val msg = t.asInstanceOf[(VertexId, Double)]
writeVarLong(msg._1, optimizePositive = false) writeVarLong(msg._1, optimizePositive = false)
writeDouble(msg._2) writeDouble(msg._2)
this this

View file

@ -26,18 +26,18 @@ import org.apache.spark.util.collection.BitSet
private[graphx] object VertexPartition { private[graphx] object VertexPartition {
def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = { def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = {
val map = new PrimitiveKeyOpenHashMap[VertexID, VD] val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { case (k, v) => iter.foreach { case (k, v) =>
map(k) = v map(k) = v
} }
new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
} }
def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD) def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
: VertexPartition[VD] = : VertexPartition[VD] =
{ {
val map = new PrimitiveKeyOpenHashMap[VertexID, VD] val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { case (k, v) => iter.foreach { case (k, v) =>
map.setMerge(k, v, mergeFunc) map.setMerge(k, v, mergeFunc)
} }
@ -60,15 +60,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
def size: Int = mask.cardinality() def size: Int = mask.cardinality()
/** Return the vertex attribute for the given vertex ID. */ /** Return the vertex attribute for the given vertex ID. */
def apply(vid: VertexID): VD = values(index.getPos(vid)) def apply(vid: VertexId): VD = values(index.getPos(vid))
def isDefined(vid: VertexID): Boolean = { def isDefined(vid: VertexId): Boolean = {
val pos = index.getPos(vid) val pos = index.getPos(vid)
pos >= 0 && mask.get(pos) pos >= 0 && mask.get(pos)
} }
/** Look up vid in activeSet, throwing an exception if it is None. */ /** Look up vid in activeSet, throwing an exception if it is None. */
def isActive(vid: VertexID): Boolean = { def isActive(vid: VertexId): Boolean = {
activeSet.get.contains(vid) activeSet.get.contains(vid)
} }
@ -88,7 +88,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* each of the entries in the original VertexRDD. The resulting * each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index. * VertexPartition retains the same index.
*/ */
def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = { def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = {
// Construct a view of the map transformation // Construct a view of the map transformation
val newValues = new Array[VD2](capacity) val newValues = new Array[VD2](capacity)
var i = mask.nextSetBit(0) var i = mask.nextSetBit(0)
@ -108,7 +108,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* RDD can be easily joined with the original vertex-set. Furthermore, the filter only * RDD can be easily joined with the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated. * modifies the bitmap index and so no new values are allocated.
*/ */
def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = { def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = {
// Allocate the array to store the results into // Allocate the array to store the results into
val newMask = new BitSet(capacity) val newMask = new BitSet(capacity)
// Iterate over the active bits in the old mask and evaluate the predicate // Iterate over the active bits in the old mask and evaluate the predicate
@ -146,7 +146,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another VertexPartition. */ /** Left outer join another VertexPartition. */
def leftJoin[VD2: ClassTag, VD3: ClassTag] def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexPartition[VD2]) (other: VertexPartition[VD2])
(f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
if (index != other.index) { if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.") logWarning("Joining two VertexPartitions with different indexes is slow.")
leftJoin(createUsingIndex(other.iterator))(f) leftJoin(createUsingIndex(other.iterator))(f)
@ -165,14 +165,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another iterator of messages. */ /** Left outer join another iterator of messages. */
def leftJoin[VD2: ClassTag, VD3: ClassTag] def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: Iterator[(VertexID, VD2)]) (other: Iterator[(VertexId, VD2)])
(f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
leftJoin(createUsingIndex(other))(f) leftJoin(createUsingIndex(other))(f)
} }
/** Inner join another VertexPartition. */ /** Inner join another VertexPartition. */
def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U]) def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
(f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
if (index != other.index) { if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.") logWarning("Joining two VertexPartitions with different indexes is slow.")
innerJoin(createUsingIndex(other.iterator))(f) innerJoin(createUsingIndex(other.iterator))(f)
@ -192,15 +192,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Inner join an iterator of messages. * Inner join an iterator of messages.
*/ */
def innerJoin[U: ClassTag, VD2: ClassTag] def innerJoin[U: ClassTag, VD2: ClassTag]
(iter: Iterator[Product2[VertexID, U]]) (iter: Iterator[Product2[VertexId, U]])
(f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
innerJoin(createUsingIndex(iter))(f) innerJoin(createUsingIndex(iter))(f)
} }
/** /**
* Similar effect as aggregateUsingIndex((a, b) => a) * Similar effect as aggregateUsingIndex((a, b) => a)
*/ */
def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]]) def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
: VertexPartition[VD2] = { : VertexPartition[VD2] = {
val newMask = new BitSet(capacity) val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity) val newValues = new Array[VD2](capacity)
@ -218,7 +218,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
* the partition, hidden by the bitmask. * the partition, hidden by the bitmask.
*/ */
def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = { def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = {
val newMask = new BitSet(capacity) val newMask = new BitSet(capacity)
val newValues = new Array[VD](capacity) val newValues = new Array[VD](capacity)
System.arraycopy(values, 0, newValues, 0, newValues.length) System.arraycopy(values, 0, newValues, 0, newValues.length)
@ -233,7 +233,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
} }
def aggregateUsingIndex[VD2: ClassTag]( def aggregateUsingIndex[VD2: ClassTag](
iter: Iterator[Product2[VertexID, VD2]], iter: Iterator[Product2[VertexId, VD2]],
reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = { reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
val newMask = new BitSet(capacity) val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity) val newValues = new Array[VD2](capacity)
@ -253,7 +253,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition[VD2](index, newValues, newMask) new VertexPartition[VD2](index, newValues, newMask)
} }
def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = { def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = {
val newActiveSet = new VertexSet val newActiveSet = new VertexSet
iter.foreach(newActiveSet.add(_)) iter.foreach(newActiveSet.add(_))
new VertexPartition(index, values, mask, Some(newActiveSet)) new VertexPartition(index, values, mask, Some(newActiveSet))
@ -263,7 +263,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Construct a new VertexPartition whose index contains only the vertices in the mask. * Construct a new VertexPartition whose index contains only the vertices in the mask.
*/ */
def reindex(): VertexPartition[VD] = { def reindex(): VertexPartition[VD] = {
val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD] val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- this.iterator) { for ((k, v) <- this.iterator) {
hashMap.setMerge(k, v, arbitraryMerge) hashMap.setMerge(k, v, arbitraryMerge)
@ -271,8 +271,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet) new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
} }
def iterator: Iterator[(VertexID, VD)] = def iterator: Iterator[(VertexId, VD)] =
mask.iterator.map(ind => (index.getValue(ind), values(ind))) mask.iterator.map(ind => (index.getValue(ind), values(ind)))
def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind)) def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind))
} }

View file

@ -20,5 +20,5 @@ package org.apache.spark.graphx
import org.apache.spark.util.collection.OpenHashSet import org.apache.spark.util.collection.OpenHashSet
package object impl { package object impl {
private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexID] private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexId]
} }

View file

@ -35,9 +35,9 @@ object ConnectedComponents {
* @return a graph with vertex attributes containing the smallest vertex in each * @return a graph with vertex attributes containing the smallest vertex in each
* connected component * connected component
*/ */
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid } val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { def sendMessage(edge: EdgeTriplet[VertexId, ED]) = {
if (edge.srcAttr < edge.dstAttr) { if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr)) Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) { } else if (edge.srcAttr > edge.dstAttr) {

View file

@ -92,7 +92,7 @@ object PageRank extends Logging {
// Define the three functions needed to implement PageRank in the GraphX // Define the three functions needed to implement PageRank in the GraphX
// version of Pregel // version of Pregel
def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]) = def sendMessage(edge: EdgeTriplet[Double, Double]) =
Iterator((edge.dstId, edge.srcAttr * edge.attr)) Iterator((edge.dstId, edge.srcAttr * edge.attr))
@ -137,7 +137,7 @@ object PageRank extends Logging {
// Define the three functions needed to implement PageRank in the GraphX // Define the three functions needed to implement PageRank in the GraphX
// version of Pregel // version of Pregel
def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = { def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR) (newPR, newPR - oldPR)

View file

@ -79,13 +79,13 @@ object SVDPlusPlus {
(g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
g = g.outerJoinVertices(t0) { g = g.outerJoinVertices(t0) {
(vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
} }
def mapTrainF(conf: Conf, u: Double) def mapTrainF(conf: Conf, u: Double)
(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
: Iterator[(VertexID, (RealVector, RealVector, Double))] = { : Iterator[(VertexId, (RealVector, RealVector, Double))] = {
val (usr, itm) = (et.srcAttr, et.dstAttr) val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1) val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
@ -112,7 +112,7 @@ object SVDPlusPlus {
et => Iterator((et.srcId, et.dstAttr._2)), et => Iterator((et.srcId, et.dstAttr._2)),
(g1: RealVector, g2: RealVector) => g1.add(g2)) (g1: RealVector, g2: RealVector) => g1.add(g2))
g = g.outerJoinVertices(t1) { g = g.outerJoinVertices(t1) {
(vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
} }
@ -123,7 +123,7 @@ object SVDPlusPlus {
(g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
(g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
g = g.outerJoinVertices(t2) { g = g.outerJoinVertices(t2) {
(vid: VertexID, (vid: VertexId,
vd: (RealVector, RealVector, Double, Double), vd: (RealVector, RealVector, Double, Double),
msg: Option[(RealVector, RealVector, Double)]) => msg: Option[(RealVector, RealVector, Double)]) =>
(vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
@ -133,7 +133,7 @@ object SVDPlusPlus {
// calculate error on training set // calculate error on training set
def mapTestF(conf: Conf, u: Double) def mapTestF(conf: Conf, u: Double)
(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
: Iterator[(VertexID, Double)] = : Iterator[(VertexId, Double)] =
{ {
val (usr, itm) = (et.srcAttr, et.dstAttr) val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1) val (p, q) = (usr._1, itm._1)
@ -146,7 +146,7 @@ object SVDPlusPlus {
g.cache() g.cache()
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
g = g.outerJoinVertices(t3) { g = g.outerJoinVertices(t3) {
(vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
} }

View file

@ -35,7 +35,7 @@ object StronglyConnectedComponents {
* *
* @return a graph with vertex attributes containing the smallest vertex id in each SCC * @return a graph with vertex attributes containing the smallest vertex id in each SCC
*/ */
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED] = {
// the graph we update with final SCC ids, and the graph we return at the end // the graph we update with final SCC ids, and the graph we return at the end
var sccGraph = graph.mapVertices { case (vid, _) => vid } var sccGraph = graph.mapVertices { case (vid, _) => vid }
@ -71,7 +71,7 @@ object StronglyConnectedComponents {
// collect min of all my neighbor's scc values, update if it's smaller than mine // collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine // then notify any neighbors with scc values larger than mine
sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID]( sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)( sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
(vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2), (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
e => { e => {
@ -85,7 +85,7 @@ object StronglyConnectedComponents {
// start at root of SCCs. Traverse values in reverse, notify all my neighbors // start at root of SCCs. Traverse values in reverse, notify all my neighbors
// do not propagate if colors do not match! // do not propagate if colors do not match!
sccWorkGraph = Pregel[(VertexID, Boolean), ED, Boolean]( sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
sccWorkGraph, false, activeDirection = EdgeDirection.In)( sccWorkGraph, false, activeDirection = EdgeDirection.In)(
// vertex is final if it is the root of a color // vertex is final if it is the root of a color
// or it has the same color as a neighbor that is final // or it has the same color as a neighbor that is final

View file

@ -61,7 +61,7 @@ object TriangleCount {
(vid, _, optSet) => optSet.getOrElse(null) (vid, _, optSet) => optSet.getOrElse(null)
} }
// Edge function computes intersection of smaller vertex with larger vertex // Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = { def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexId, Int)] = {
assert(et.srcAttr != null) assert(et.srcAttr != null)
assert(et.dstAttr != null) assert(et.dstAttr != null)
val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) { val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {

View file

@ -25,11 +25,11 @@ package object graphx {
* A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need * A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
* to follow any ordering or any constraints other than uniqueness. * to follow any ordering or any constraints other than uniqueness.
*/ */
type VertexID = Long type VertexId = Long
/** Integer identifer of a graph partition. */ /** Integer identifer of a graph partition. */
// TODO: Consider using Char. // TODO: Consider using Char.
type PartitionID = Int type PartitionID = Int
private[graphx] type VertexSet = OpenHashSet[VertexID] private[graphx] type VertexSet = OpenHashSet[VertexId]
} }

View file

@ -50,7 +50,7 @@ object GraphGenerators {
val mu = 4 val mu = 4
val sigma = 1.3 val sigma = 1.3
val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{ val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{
src => (src, sampleLogNormal(mu, sigma, numVertices)) src => (src, sampleLogNormal(mu, sigma, numVertices))
} }
val edges = vertices.flatMap { v => val edges = vertices.flatMap { v =>
@ -59,9 +59,9 @@ object GraphGenerators {
Graph(vertices, edges, 0) Graph(vertices, edges, 0)
} }
def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = { def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = {
val rand = new Random() val rand = new Random()
Array.fill(maxVertexID) { Edge[Int](src, rand.nextInt(maxVertexID), 1) } Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
} }
/** /**
@ -206,9 +206,9 @@ object GraphGenerators {
*/ */
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = { def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
// Convert row column address into vertex ids (row major order) // Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): VertexID = r * cols + c def sub2ind(r: Int, c: Int): VertexId = r * cols + c
val vertices: RDD[(VertexID, (Int,Int))] = val vertices: RDD[(VertexId, (Int,Int))] =
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) ) sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] = val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) => vertices.flatMap{ case (vid, (r,c)) =>
@ -228,7 +228,7 @@ object GraphGenerators {
* being the center vertex. * being the center vertex.
*/ */
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) val edges: RDD[(VertexId, VertexId)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
Graph.fromEdgeTuples(edges, 1) Graph.fromEdgeTuples(edges, 1)
} // end of starGraph } // end of starGraph

View file

@ -28,12 +28,12 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test("joinVertices") { test("joinVertices") {
withSpark { sc => withSpark { sc =>
val vertices = val vertices =
sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2) sc.parallelize(Seq[(VertexId, String)]((1, "one"), (2, "two"), (3, "three")), 2)
val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo")))) val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
val g: Graph[String, String] = Graph(vertices, edges) val g: Graph[String, String] = Graph(vertices, edges)
val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20))) val tbl = sc.parallelize(Seq[(VertexId, Int)]((1, 10), (2, 20)))
val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u } val g1 = g.joinVertices(tbl) { (vid: VertexId, attr: String, u: Int) => attr + u }
val v = g1.vertices.collect().toSet val v = g1.vertices.collect().toSet
assert(v === Set((1, "one10"), (2, "two20"), (3, "three"))) assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
@ -60,7 +60,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test ("filter") { test ("filter") {
withSpark { sc => withSpark { sc =>
val n = 5 val n = 5
val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
val graph: Graph[Int, Int] = Graph(vertices, edges).cache() val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
val filteredGraph = graph.filter( val filteredGraph = graph.filter(
@ -68,7 +68,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
val degrees: VertexRDD[Int] = graph.outDegrees val degrees: VertexRDD[Int] = graph.outDegrees
graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
}, },
vpred = (vid: VertexID, deg:Int) => deg > 0 vpred = (vid: VertexId, deg:Int) => deg > 0
).cache() ).cache()
val v = filteredGraph.vertices.collect().toSet val v = filteredGraph.vertices.collect().toSet

View file

@ -27,7 +27,7 @@ import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext { class GraphSuite extends FunSuite with LocalSparkContext {
def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = { def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = {
Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v") Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexId, x: VertexId)), 3), "v")
} }
test("Graph.fromEdgeTuples") { test("Graph.fromEdgeTuples") {
@ -57,7 +57,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc => withSpark { sc =>
val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L) val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) } val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) }
val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) val vertices: RDD[(VertexId, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
val graph = Graph(vertices, edges, false) val graph = Graph(vertices, edges, false)
assert( graph.edges.count() === rawEdges.size ) assert( graph.edges.count() === rawEdges.size )
// Vertices not explicitly provided but referenced by edges should be created automatically // Vertices not explicitly provided but referenced by edges should be created automatically
@ -74,7 +74,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5 val n = 5
val star = starGraph(sc, n) val star = starGraph(sc, n)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet === assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
(1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet) (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
} }
} }
@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val p = 100 val p = 100
val verts = 1 to n val verts = 1 to n
val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x => val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0) verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
assert(graph.edges.partitions.length === p) assert(graph.edges.partitions.length === p)
val partitionedGraph = graph.partitionBy(EdgePartition2D) val partitionedGraph = graph.partitionBy(EdgePartition2D)
assert(graph.edges.partitions.length === p) assert(graph.edges.partitions.length === p)
@ -136,10 +136,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n) val star = starGraph(sc, n)
// mapVertices preserving type // mapVertices preserving type
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2") val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet) assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
// mapVertices changing type // mapVertices changing type
val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length) val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet) assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
} }
} }
@ -168,7 +168,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc => withSpark { sc =>
val n = 5 val n = 5
val star = starGraph(sc, n) val star = starGraph(sc, n)
assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet) assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
} }
} }
@ -191,7 +191,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mask") { test("mask") {
withSpark { sc => withSpark { sc =>
val n = 5 val n = 5
val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
val graph: Graph[Int, Int] = Graph(vertices, edges).cache() val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
@ -218,7 +218,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n) val star = starGraph(sc, n)
val doubleStar = Graph.fromEdgeTuples( val doubleStar = Graph.fromEdgeTuples(
sc.parallelize((1 to n).flatMap(x => sc.parallelize((1 to n).flatMap(x =>
List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v") List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v")
val star2 = doubleStar.groupEdges { (a, b) => a} val star2 = doubleStar.groupEdges { (a, b) => a}
assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) === assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int])) star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
@ -237,7 +237,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
// activeSetOpt // activeSetOpt
val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID) val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId)
val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0) val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
val vids = complete.mapVertices((vid, attr) => vid).cache() val vids = complete.mapVertices((vid, attr) => vid).cache()
val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 } val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
@ -248,10 +248,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
} }
Iterator((et.srcId, 1)) Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet) assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)
// outerJoinVertices followed by mapReduceTriplets(activeSetOpt) // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3) val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3)
val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache() val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache() val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) } val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
@ -262,7 +262,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
} }
Iterator((et.dstId, 1)) Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet) assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)
} }
} }
@ -277,7 +277,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet (a: Int, b: Int) => a + b).collect.toSet
assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0))) assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
// outerJoinVertices preserving type // outerJoinVertices preserving type
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString } val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar = val newReverseStar =

View file

@ -27,7 +27,7 @@ class PregelSuite extends FunSuite with LocalSparkContext {
test("1 iteration") { test("1 iteration") {
withSpark { sc => withSpark { sc =>
val n = 5 val n = 5
val starEdges = (1 to n).map(x => (0: VertexID, x: VertexID)) val starEdges = (1 to n).map(x => (0: VertexId, x: VertexId))
val star = Graph.fromEdgeTuples(sc.parallelize(starEdges, 3), "v").cache() val star = Graph.fromEdgeTuples(sc.parallelize(starEdges, 3), "v").cache()
val result = Pregel(star, 0)( val result = Pregel(star, 0)(
(vid, attr, msg) => attr, (vid, attr, msg) => attr,
@ -41,12 +41,12 @@ class PregelSuite extends FunSuite with LocalSparkContext {
withSpark { sc => withSpark { sc =>
val n = 5 val n = 5
val chain = Graph.fromEdgeTuples( val chain = Graph.fromEdgeTuples(
sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3), sc.parallelize((1 until n).map(x => (x: VertexId, x + 1: VertexId)), 3),
0).cache() 0).cache()
assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet) assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexId, 0)).toSet)
val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }.cache() val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }.cache()
assert(chainWithSeed.vertices.collect.toSet === assert(chainWithSeed.vertices.collect.toSet ===
Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet) Set((1: VertexId, 1)) ++ (2 to n).map(x => (x: VertexId, 0)).toSet)
val result = Pregel(chainWithSeed, 0)( val result = Pregel(chainWithSeed, 0)(
(vid, attr, msg) => math.max(msg, attr), (vid, attr, msg) => math.max(msg, attr),
et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty, et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty,

View file

@ -99,7 +99,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("IntAggMsgSerializer") { test("IntAggMsgSerializer") {
val conf = new SparkConf(false) val conf = new SparkConf(false)
val outMsg = (4: VertexID, 5) val outMsg = (4: VertexId, 5)
val bout = new ByteArrayOutputStream val bout = new ByteArrayOutputStream
val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout) val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg) outStrm.writeObject(outMsg)
@ -107,8 +107,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush() bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray) val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin) val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
val inMsg1: (VertexID, Int) = inStrm.readObject() val inMsg1: (VertexId, Int) = inStrm.readObject()
val inMsg2: (VertexID, Int) = inStrm.readObject() val inMsg2: (VertexId, Int) = inStrm.readObject()
assert(outMsg === inMsg1) assert(outMsg === inMsg1)
assert(outMsg === inMsg2) assert(outMsg === inMsg2)
@ -119,7 +119,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("LongAggMsgSerializer") { test("LongAggMsgSerializer") {
val conf = new SparkConf(false) val conf = new SparkConf(false)
val outMsg = (4: VertexID, 1L << 32) val outMsg = (4: VertexId, 1L << 32)
val bout = new ByteArrayOutputStream val bout = new ByteArrayOutputStream
val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout) val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg) outStrm.writeObject(outMsg)
@ -127,8 +127,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush() bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray) val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin) val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
val inMsg1: (VertexID, Long) = inStrm.readObject() val inMsg1: (VertexId, Long) = inStrm.readObject()
val inMsg2: (VertexID, Long) = inStrm.readObject() val inMsg2: (VertexId, Long) = inStrm.readObject()
assert(outMsg === inMsg1) assert(outMsg === inMsg1)
assert(outMsg === inMsg2) assert(outMsg === inMsg2)
@ -139,7 +139,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("DoubleAggMsgSerializer") { test("DoubleAggMsgSerializer") {
val conf = new SparkConf(false) val conf = new SparkConf(false)
val outMsg = (4: VertexID, 5.0) val outMsg = (4: VertexId, 5.0)
val bout = new ByteArrayOutputStream val bout = new ByteArrayOutputStream
val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout) val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg) outStrm.writeObject(outMsg)
@ -147,8 +147,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush() bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray) val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin) val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
val inMsg1: (VertexID, Double) = inStrm.readObject() val inMsg1: (VertexId, Double) = inStrm.readObject()
val inMsg2: (VertexID, Double) = inStrm.readObject() val inMsg2: (VertexId, Double) = inStrm.readObject()
assert(outMsg === inMsg1) assert(outMsg === inMsg1)
assert(outMsg === inMsg2) assert(outMsg === inMsg2)

View file

@ -79,7 +79,7 @@ class EdgePartitionSuite extends FunSuite {
test("innerJoin") { test("innerJoin") {
def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
val builder = new EdgePartitionBuilder[A] val builder = new EdgePartitionBuilder[A]
for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) } for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
builder.toEdgePartition builder.toEdgePartition
} }
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))

View file

@ -100,7 +100,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
test("Connected Components on a Toy Connected Graph") { test("Connected Components on a Toy Connected Graph") {
withSpark { sc => withSpark { sc =>
// Create an RDD for the vertices // Create an RDD for the vertices
val users: RDD[(VertexID, (String, String))] = val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student")))) (4L, ("peter", "student"))))