VertexID -> VertexId
This commit is contained in:
parent
1210ec2945
commit
f4d9019aa8
|
@ -186,7 +186,7 @@ code constructs a graph from a collection of RDDs:
|
|||
// Assume the SparkContext has already been constructed
|
||||
val sc: SparkContext
|
||||
// Create an RDD for the vertices
|
||||
val users: RDD[(VertexID, (String, String))] =
|
||||
val users: RDD[(VertexId, (String, String))] =
|
||||
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
|
||||
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
|
||||
// Create an RDD for edges
|
||||
|
@ -291,7 +291,7 @@ graph contains the following:
|
|||
|
||||
{% highlight scala %}
|
||||
class Graph[VD, ED] {
|
||||
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
|
||||
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
|
||||
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
|
||||
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
|
||||
}
|
||||
|
@ -313,7 +313,7 @@ val newGraph = Graph(newVertices, graph.edges)
|
|||
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
|
||||
{% endhighlight %}
|
||||
|
||||
[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexID,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
|
||||
[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
|
||||
|
||||
These operators are often used to initialize the graph for a particular computation or project away
|
||||
unnecessary properties. For example, given a graph with the out-degrees as the vertex properties
|
||||
|
@ -339,7 +339,7 @@ add more in the future. The following is a list of the basic structural operato
|
|||
class Graph[VD, ED] {
|
||||
def reverse: Graph[VD, ED]
|
||||
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
|
||||
vpred: (VertexID, VD) => Boolean): Graph[VD, ED]
|
||||
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
|
||||
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
|
||||
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
|
||||
}
|
||||
|
@ -358,11 +358,11 @@ satisfy the edge predicate *and connect vertices that satisfy the vertex predica
|
|||
operator can be used in number of situations to restrict the graph to the vertices and edges of
|
||||
interest or eliminate broken links. For example in the following code we remove broken links:
|
||||
|
||||
[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexID,VD)⇒Boolean):Graph[VD,ED]
|
||||
[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]
|
||||
|
||||
{% highlight scala %}
|
||||
// Create an RDD for the vertices
|
||||
val users: RDD[(VertexID, (String, String))] =
|
||||
val users: RDD[(VertexId, (String, String))] =
|
||||
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
|
||||
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
|
||||
(4L, ("peter", "student"))))
|
||||
|
@ -425,9 +425,9 @@ using the *join* operators. Below we list the key join operators:
|
|||
|
||||
{% highlight scala %}
|
||||
class Graph[VD, ED] {
|
||||
def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD)
|
||||
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
|
||||
: Graph[VD, ED]
|
||||
def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2)
|
||||
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
|
||||
: Graph[VD2, ED]
|
||||
}
|
||||
{% endhighlight %}
|
||||
|
@ -437,7 +437,7 @@ returns a new graph with the vertex properties obtained by applying the user def
|
|||
to the result of the joined vertices. Vertices without a matching value in the RDD retain their
|
||||
original value.
|
||||
|
||||
[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexID,U)])((VertexID,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
|
||||
[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
|
||||
|
||||
> Note that if the RDD contains more than one value for a given vertex only one will be used. It
|
||||
> is therefore recommended that the input RDD be first made unique using the following which will
|
||||
|
@ -456,7 +456,7 @@ property type. Because not all vertices may have a matching value in the input
|
|||
function takes an `Option` type. For example, we can setup a graph for PageRank by initializing
|
||||
vertex properties with their `outDegree`.
|
||||
|
||||
[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexID,U)])((VertexID,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
|
||||
[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
|
||||
|
||||
|
||||
{% highlight scala %}
|
||||
|
@ -490,7 +490,7 @@ PageRank Value, shortest path to the source, and smallest reachable vertex id).
|
|||
### Map Reduce Triplets (mapReduceTriplets)
|
||||
<a name="mrTriplets"></a>
|
||||
|
||||
[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=>Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=>A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
|
||||
[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=>Iterator[(org.apache.spark.graphx.VertexId,A)],reduceFunc:(A,A)=>A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
|
||||
|
||||
The core (heavily optimized) aggregation primitive in GraphX is the
|
||||
[`mapReduceTriplets`][Graph.mapReduceTriplets] operator:
|
||||
|
@ -498,7 +498,7 @@ The core (heavily optimized) aggregation primitive in GraphX is the
|
|||
{% highlight scala %}
|
||||
class Graph[VD, ED] {
|
||||
def mapReduceTriplets[A](
|
||||
map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
|
||||
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
|
||||
reduce: (A, A) => A)
|
||||
: VertexRDD[A]
|
||||
}
|
||||
|
@ -580,13 +580,13 @@ compute the max in, out, and total degrees:
|
|||
|
||||
{% highlight scala %}
|
||||
// Define a reduce operation to compute the highest degree vertex
|
||||
def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = {
|
||||
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
|
||||
if (a._2 > b._2) a else b
|
||||
}
|
||||
// Compute the max degrees
|
||||
val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max)
|
||||
val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max)
|
||||
val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max)
|
||||
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
|
||||
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
|
||||
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
|
||||
{% endhighlight %}
|
||||
|
||||
### Collecting Neighbors
|
||||
|
@ -596,14 +596,14 @@ attributes at each vertex. This can be easily accomplished using the
|
|||
[`collectNeighborIds`][GraphOps.collectNeighborIds] and the
|
||||
[`collectNeighbors`][GraphOps.collectNeighbors] operators.
|
||||
|
||||
[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]]
|
||||
[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]]
|
||||
[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]]
|
||||
[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]
|
||||
|
||||
|
||||
{% highlight scala %}
|
||||
class GraphOps[VD, ED] {
|
||||
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
|
||||
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
|
||||
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
|
||||
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
|
||||
}
|
||||
{% endhighlight %}
|
||||
|
||||
|
@ -647,7 +647,7 @@ messages remaining.
|
|||
The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
|
||||
of its implementation (note calls to graph.cache have been removed):
|
||||
|
||||
[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexID,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
|
||||
[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
|
||||
|
||||
{% highlight scala %}
|
||||
class GraphOps[VD, ED] {
|
||||
|
@ -655,8 +655,8 @@ class GraphOps[VD, ED] {
|
|||
(initialMsg: A,
|
||||
maxIter: Int = Int.MaxValue,
|
||||
activeDir: EdgeDirection = EdgeDirection.Out)
|
||||
(vprog: (VertexID, VD, A) => VD,
|
||||
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
|
||||
(vprog: (VertexId, VD, A) => VD,
|
||||
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
|
||||
mergeMsg: (A, A) => A)
|
||||
: Graph[VD, ED] = {
|
||||
// Receive the initial message at each vertex
|
||||
|
@ -701,7 +701,7 @@ import org.apache.spark.graphx.util.GraphGenerators
|
|||
// A graph with edge attributes containing distances
|
||||
val graph: Graph[Int, Double] =
|
||||
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
|
||||
val sourceId: VertexID = 42 // The ultimate source
|
||||
val sourceId: VertexId = 42 // The ultimate source
|
||||
// Initialize the graph such that all vertices except the root have distance infinity.
|
||||
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
|
||||
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
|
||||
|
@ -748,7 +748,7 @@ It creates a `Graph` from the specified edges, automatically creating any vertic
|
|||
{% highlight scala %}
|
||||
object Graph {
|
||||
def apply[VD, ED](
|
||||
vertices: RDD[(VertexID, VD)],
|
||||
vertices: RDD[(VertexId, VD)],
|
||||
edges: RDD[Edge[ED]],
|
||||
defaultVertexAttr: VD = null)
|
||||
: Graph[VD, ED]
|
||||
|
@ -758,7 +758,7 @@ object Graph {
|
|||
defaultValue: VD): Graph[VD, ED]
|
||||
|
||||
def fromEdgeTuples[VD](
|
||||
rawEdges: RDD[(VertexID, VertexID)],
|
||||
rawEdges: RDD[(VertexId, VertexId)],
|
||||
defaultValue: VD,
|
||||
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
|
||||
|
||||
|
@ -774,8 +774,8 @@ object Graph {
|
|||
[PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy$
|
||||
|
||||
[GraphLoader.edgeListFile]: api/graphx/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]
|
||||
[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexID,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
|
||||
[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexID,VertexID)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
|
||||
[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
|
||||
[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
|
||||
[Graph.fromEdges]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
|
||||
|
||||
# Vertex and Edge RDDs
|
||||
|
@ -799,17 +799,17 @@ following additional functionality:
|
|||
{% highlight scala %}
|
||||
class VertexRDD[VD] {
|
||||
// Filter the vertex set but preserves the internal index
|
||||
def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
|
||||
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
|
||||
// Transform the values without changing the ids (preserves the internal index)
|
||||
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
|
||||
def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
|
||||
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
|
||||
// Remove vertices from this set that appear in the other set
|
||||
def diff(other: VertexRDD[VD]): VertexRDD[VD]
|
||||
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
|
||||
def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
|
||||
def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
|
||||
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
|
||||
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
|
||||
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
|
||||
def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
|
||||
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
|
||||
}
|
||||
{% endhighlight %}
|
||||
|
||||
|
@ -828,7 +828,7 @@ RDD. For example:
|
|||
|
||||
{% highlight scala %}
|
||||
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
|
||||
val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
|
||||
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
|
||||
// There should be 200 entries in rddB
|
||||
rddB.count
|
||||
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
|
||||
|
@ -854,7 +854,7 @@ def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
|
|||
// Revere the edges reusing both attributes and structure
|
||||
def reverse: EdgeRDD[ED]
|
||||
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
|
||||
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3]
|
||||
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
|
||||
{% endhighlight %}
|
||||
|
||||
In most applications we have found that operations on the `EdgeRDD` are accomplished through the
|
||||
|
|
|
@ -28,8 +28,8 @@ package org.apache.spark.graphx
|
|||
* @param attr The attribute associated with the edge
|
||||
*/
|
||||
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
|
||||
var srcId: VertexID = 0,
|
||||
var dstId: VertexID = 0,
|
||||
var srcId: VertexId = 0,
|
||||
var dstId: VertexId = 0,
|
||||
var attr: ED = null.asInstanceOf[ED])
|
||||
extends Serializable {
|
||||
|
||||
|
@ -39,7 +39,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
|
|||
* @param vid the id one of the two vertices on the edge.
|
||||
* @return the id of the other vertex on the edge.
|
||||
*/
|
||||
def otherVertexId(vid: VertexID): VertexID =
|
||||
def otherVertexId(vid: VertexId): VertexId =
|
||||
if (srcId == vid) dstId else { assert(dstId == vid); srcId }
|
||||
|
||||
/**
|
||||
|
@ -50,7 +50,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
|
|||
* @return the relative direction of the edge to the corresponding
|
||||
* vertex.
|
||||
*/
|
||||
def relativeDirection(vid: VertexID): EdgeDirection =
|
||||
def relativeDirection(vid: VertexId): EdgeDirection =
|
||||
if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
|
||||
}
|
||||
|
||||
|
|
|
@ -102,7 +102,7 @@ class EdgeRDD[@specialized ED: ClassTag](
|
|||
*/
|
||||
def innerJoin[ED2: ClassTag, ED3: ClassTag]
|
||||
(other: EdgeRDD[ED2])
|
||||
(f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = {
|
||||
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
|
||||
val ed2Tag = classTag[ED2]
|
||||
val ed3Tag = classTag[ED3]
|
||||
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
|
||||
|
@ -113,7 +113,7 @@ class EdgeRDD[@specialized ED: ClassTag](
|
|||
})
|
||||
}
|
||||
|
||||
private[graphx] def collectVertexIDs(): RDD[VertexID] = {
|
||||
private[graphx] def collectVertexIds(): RDD[VertexId] = {
|
||||
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
|
|||
* @param vid the id one of the two vertices on the edge
|
||||
* @return the attribute for the other vertex on the edge
|
||||
*/
|
||||
def otherVertexAttr(vid: VertexID): VD =
|
||||
def otherVertexAttr(vid: VertexId): VD =
|
||||
if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr }
|
||||
|
||||
/**
|
||||
|
@ -59,7 +59,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
|
|||
* @param vid the id of one of the two vertices on the edge
|
||||
* @return the attr for the vertex with that id
|
||||
*/
|
||||
def vertexAttr(vid: VertexID): VD =
|
||||
def vertexAttr(vid: VertexId): VD =
|
||||
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
|
||||
|
||||
override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
|
||||
|
|
|
@ -126,7 +126,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
|
|||
* }}}
|
||||
*
|
||||
*/
|
||||
def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED]
|
||||
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
|
||||
|
||||
/**
|
||||
* Transforms each edge attribute in the graph using the map function. The map function is not
|
||||
|
@ -242,7 +242,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
|
|||
*/
|
||||
def subgraph(
|
||||
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
|
||||
vpred: (VertexID, VD) => Boolean = ((v, d) => true))
|
||||
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
|
||||
: Graph[VD, ED]
|
||||
|
||||
/**
|
||||
|
@ -292,7 +292,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
|
|||
* vertex
|
||||
* {{{
|
||||
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
|
||||
* val inDeg: RDD[(VertexID, Int)] =
|
||||
* val inDeg: RDD[(VertexId, Int)] =
|
||||
* mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
|
||||
* }}}
|
||||
*
|
||||
|
@ -304,7 +304,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
|
|||
*
|
||||
*/
|
||||
def mapReduceTriplets[A: ClassTag](
|
||||
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
|
||||
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
|
||||
reduceFunc: (A, A) => A,
|
||||
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
|
||||
: VertexRDD[A]
|
||||
|
@ -328,14 +328,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
|
|||
*
|
||||
* {{{
|
||||
* val rawGraph: Graph[_, _] = Graph.textFile("webgraph")
|
||||
* val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees()
|
||||
* val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees()
|
||||
* val graph = rawGraph.outerJoinVertices(outDeg) {
|
||||
* (vid, data, optDeg) => optDeg.getOrElse(0)
|
||||
* }
|
||||
* }}}
|
||||
*/
|
||||
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
|
||||
(mapFunc: (VertexID, VD, Option[U]) => VD2)
|
||||
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
|
||||
(mapFunc: (VertexId, VD, Option[U]) => VD2)
|
||||
: Graph[VD2, ED]
|
||||
|
||||
/**
|
||||
|
@ -364,7 +364,7 @@ object Graph {
|
|||
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
|
||||
*/
|
||||
def fromEdgeTuples[VD: ClassTag](
|
||||
rawEdges: RDD[(VertexID, VertexID)],
|
||||
rawEdges: RDD[(VertexId, VertexId)],
|
||||
defaultValue: VD,
|
||||
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
|
||||
{
|
||||
|
@ -405,7 +405,7 @@ object Graph {
|
|||
* mentioned in edges but not in vertices
|
||||
*/
|
||||
def apply[VD: ClassTag, ED: ClassTag](
|
||||
vertices: RDD[(VertexID, VD)],
|
||||
vertices: RDD[(VertexId, VD)],
|
||||
edges: RDD[Edge[ED]],
|
||||
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
|
||||
GraphImpl(vertices, edges, defaultVertexAttr)
|
||||
|
|
|
@ -33,7 +33,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
|
|||
kryo.register(classOf[Edge[Object]])
|
||||
kryo.register(classOf[MessageToPartition[Object]])
|
||||
kryo.register(classOf[VertexBroadcastMsg[Object]])
|
||||
kryo.register(classOf[(VertexID, Object)])
|
||||
kryo.register(classOf[(VertexId, Object)])
|
||||
kryo.register(classOf[EdgePartition[Object]])
|
||||
kryo.register(classOf[BitSet])
|
||||
kryo.register(classOf[VertexIdToIndexMap])
|
||||
|
|
|
@ -80,19 +80,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
*
|
||||
* @return the set of neighboring ids for each vertex
|
||||
*/
|
||||
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = {
|
||||
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
|
||||
val nbrs =
|
||||
if (edgeDirection == EdgeDirection.Either) {
|
||||
graph.mapReduceTriplets[Array[VertexID]](
|
||||
graph.mapReduceTriplets[Array[VertexId]](
|
||||
mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
|
||||
reduceFunc = _ ++ _
|
||||
)
|
||||
} else if (edgeDirection == EdgeDirection.Out) {
|
||||
graph.mapReduceTriplets[Array[VertexID]](
|
||||
graph.mapReduceTriplets[Array[VertexId]](
|
||||
mapFunc = et => Iterator((et.srcId, Array(et.dstId))),
|
||||
reduceFunc = _ ++ _)
|
||||
} else if (edgeDirection == EdgeDirection.In) {
|
||||
graph.mapReduceTriplets[Array[VertexID]](
|
||||
graph.mapReduceTriplets[Array[VertexId]](
|
||||
mapFunc = et => Iterator((et.dstId, Array(et.srcId))),
|
||||
reduceFunc = _ ++ _)
|
||||
} else {
|
||||
|
@ -100,7 +100,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
|
||||
}
|
||||
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
|
||||
nbrsOpt.getOrElse(Array.empty[VertexID])
|
||||
nbrsOpt.getOrElse(Array.empty[VertexId])
|
||||
}
|
||||
} // end of collectNeighborIds
|
||||
|
||||
|
@ -116,8 +116,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
*
|
||||
* @return the vertex set of neighboring vertex attributes for each vertex
|
||||
*/
|
||||
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = {
|
||||
val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]](
|
||||
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
|
||||
val nbrs = graph.mapReduceTriplets[Array[(VertexId,VD)]](
|
||||
edge => {
|
||||
val msgToSrc = (edge.srcId, Array((edge.dstId, edge.dstAttr)))
|
||||
val msgToDst = (edge.dstId, Array((edge.srcId, edge.srcAttr)))
|
||||
|
@ -133,7 +133,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
(a, b) => a ++ b)
|
||||
|
||||
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
|
||||
nbrsOpt.getOrElse(Array.empty[(VertexID, VD)])
|
||||
nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
|
||||
}
|
||||
} // end of collectNeighbor
|
||||
|
||||
|
@ -164,9 +164,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
* }}}
|
||||
*
|
||||
*/
|
||||
def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD)
|
||||
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
|
||||
: Graph[VD, ED] = {
|
||||
val uf = (id: VertexID, data: VD, o: Option[U]) => {
|
||||
val uf = (id: VertexId, data: VD, o: Option[U]) => {
|
||||
o match {
|
||||
case Some(u) => mapFunc(id, data, u)
|
||||
case None => data
|
||||
|
@ -197,7 +197,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
* val degrees: VertexRDD[Int] = graph.outDegrees
|
||||
* graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
|
||||
* },
|
||||
* vpred = (vid: VertexID, deg:Int) => deg > 0
|
||||
* vpred = (vid: VertexId, deg:Int) => deg > 0
|
||||
* )
|
||||
* }}}
|
||||
*
|
||||
|
@ -205,7 +205,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
def filter[VD2: ClassTag, ED2: ClassTag](
|
||||
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
|
||||
epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
|
||||
vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = {
|
||||
vpred: (VertexId, VD2) => Boolean = (v:VertexId, d:VD2) => true): Graph[VD, ED] = {
|
||||
graph.mask(preprocess(graph).subgraph(epred, vpred))
|
||||
}
|
||||
|
||||
|
@ -260,8 +260,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
initialMsg: A,
|
||||
maxIterations: Int = Int.MaxValue,
|
||||
activeDirection: EdgeDirection = EdgeDirection.Either)(
|
||||
vprog: (VertexID, VD, A) => VD,
|
||||
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
|
||||
vprog: (VertexId, VD, A) => VD,
|
||||
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
|
||||
mergeMsg: (A, A) => A)
|
||||
: Graph[VD, ED] = {
|
||||
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
|
||||
|
@ -293,7 +293,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
*
|
||||
* @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
|
||||
*/
|
||||
def connectedComponents(): Graph[VertexID, ED] = {
|
||||
def connectedComponents(): Graph[VertexId, ED] = {
|
||||
ConnectedComponents.run(graph)
|
||||
}
|
||||
|
||||
|
@ -312,7 +312,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
|
|||
*
|
||||
* @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents$#run]]
|
||||
*/
|
||||
def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = {
|
||||
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = {
|
||||
StronglyConnectedComponents.run(graph, numIter)
|
||||
}
|
||||
} // end of GraphOps
|
||||
|
|
|
@ -23,7 +23,7 @@ package org.apache.spark.graphx
|
|||
*/
|
||||
trait PartitionStrategy extends Serializable {
|
||||
/** Returns the partition number for a given edge. */
|
||||
def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
|
||||
def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -73,9 +73,9 @@ object PartitionStrategy {
|
|||
* is used.
|
||||
*/
|
||||
case object EdgePartition2D extends PartitionStrategy {
|
||||
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
|
||||
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
|
||||
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
|
||||
val mixingPrime: VertexID = 1125899906842597L
|
||||
val mixingPrime: VertexId = 1125899906842597L
|
||||
val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
|
||||
val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
|
||||
(col * ceilSqrtNumParts + row) % numParts
|
||||
|
@ -87,8 +87,8 @@ object PartitionStrategy {
|
|||
* source.
|
||||
*/
|
||||
case object EdgePartition1D extends PartitionStrategy {
|
||||
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
|
||||
val mixingPrime: VertexID = 1125899906842597L
|
||||
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
|
||||
val mixingPrime: VertexId = 1125899906842597L
|
||||
(math.abs(src) * mixingPrime).toInt % numParts
|
||||
}
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ object PartitionStrategy {
|
|||
* random vertex cut that colocates all same-direction edges between two vertices.
|
||||
*/
|
||||
case object RandomVertexCut extends PartitionStrategy {
|
||||
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
|
||||
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
|
||||
math.abs((src, dst).hashCode()) % numParts
|
||||
}
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ object PartitionStrategy {
|
|||
* regardless of direction.
|
||||
*/
|
||||
case object CanonicalRandomVertexCut extends PartitionStrategy {
|
||||
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
|
||||
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
|
||||
val lower = math.min(src, dst)
|
||||
val higher = math.max(src, dst)
|
||||
math.abs((lower, higher).hashCode()) % numParts
|
||||
|
|
|
@ -40,9 +40,9 @@ import scala.reflect.ClassTag
|
|||
* // Set the vertex attributes to the initial pagerank values
|
||||
* .mapVertices((id, attr) => 1.0)
|
||||
*
|
||||
* def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
|
||||
* def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
|
||||
* resetProb + (1.0 - resetProb) * msgSum
|
||||
* def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
|
||||
* def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
|
||||
* Iterator((edge.dstId, edge.srcAttr * edge.attr))
|
||||
* def messageCombiner(a: Double, b: Double): Double = a + b
|
||||
* val initialMessage = 0.0
|
||||
|
@ -113,8 +113,8 @@ object Pregel {
|
|||
initialMsg: A,
|
||||
maxIterations: Int = Int.MaxValue,
|
||||
activeDirection: EdgeDirection = EdgeDirection.Either)
|
||||
(vprog: (VertexID, VD, A) => VD,
|
||||
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
|
||||
(vprog: (VertexId, VD, A) => VD,
|
||||
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
|
||||
mergeMsg: (A, A) => A)
|
||||
: Graph[VD, ED] =
|
||||
{
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.spark.graphx.impl.MsgRDDFunctions
|
|||
import org.apache.spark.graphx.impl.VertexPartition
|
||||
|
||||
/**
|
||||
* Extends `RDD[(VertexID, VD)]` by ensuring that there is only one entry for each vertex and by
|
||||
* Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
|
||||
* pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be
|
||||
* joined efficiently. All operations except [[reindex]] preserve the index. To construct a
|
||||
* `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]].
|
||||
|
@ -36,12 +36,12 @@ import org.apache.spark.graphx.impl.VertexPartition
|
|||
* @example Construct a `VertexRDD` from a plain RDD:
|
||||
* {{{
|
||||
* // Construct an initial vertex set
|
||||
* val someData: RDD[(VertexID, SomeType)] = loadData(someFile)
|
||||
* val someData: RDD[(VertexId, SomeType)] = loadData(someFile)
|
||||
* val vset = VertexRDD(someData)
|
||||
* // If there were redundant values in someData we would use a reduceFunc
|
||||
* val vset2 = VertexRDD(someData, reduceFunc)
|
||||
* // Finally we can use the VertexRDD to index another dataset
|
||||
* val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile)
|
||||
* val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile)
|
||||
* val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
|
||||
* // Now we can construct very fast joins between the two sets
|
||||
* val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
|
||||
|
@ -51,7 +51,7 @@ import org.apache.spark.graphx.impl.VertexPartition
|
|||
*/
|
||||
class VertexRDD[@specialized VD: ClassTag](
|
||||
val partitionsRDD: RDD[VertexPartition[VD]])
|
||||
extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
|
||||
extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
|
||||
|
||||
require(partitionsRDD.partitioner.isDefined)
|
||||
|
||||
|
@ -92,9 +92,9 @@ class VertexRDD[@specialized VD: ClassTag](
|
|||
}
|
||||
|
||||
/**
|
||||
* Provides the `RDD[(VertexID, VD)]` equivalent output.
|
||||
* Provides the `RDD[(VertexId, VD)]` equivalent output.
|
||||
*/
|
||||
override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = {
|
||||
override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
|
||||
firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
|
||||
}
|
||||
|
||||
|
@ -114,9 +114,9 @@ class VertexRDD[@specialized VD: ClassTag](
|
|||
* rather than allocating new memory.
|
||||
*
|
||||
* @param pred the user defined predicate, which takes a tuple to conform to the
|
||||
* `RDD[(VertexID, VD)]` interface
|
||||
* `RDD[(VertexId, VD)]` interface
|
||||
*/
|
||||
override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] =
|
||||
override def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] =
|
||||
this.mapVertexPartitions(_.filter(Function.untupled(pred)))
|
||||
|
||||
/**
|
||||
|
@ -140,7 +140,7 @@ class VertexRDD[@specialized VD: ClassTag](
|
|||
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
|
||||
* original VertexRDD. The resulting VertexRDD retains the same index.
|
||||
*/
|
||||
def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] =
|
||||
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
|
||||
this.mapVertexPartitions(_.map(f))
|
||||
|
||||
/**
|
||||
|
@ -172,7 +172,7 @@ class VertexRDD[@specialized VD: ClassTag](
|
|||
* @return a VertexRDD containing the results of `f`
|
||||
*/
|
||||
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
|
||||
(other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
|
||||
(other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
|
||||
val newPartitionsRDD = partitionsRDD.zipPartitions(
|
||||
other.partitionsRDD, preservesPartitioning = true
|
||||
) { (thisIter, otherIter) =>
|
||||
|
@ -200,8 +200,8 @@ class VertexRDD[@specialized VD: ClassTag](
|
|||
* by `f`.
|
||||
*/
|
||||
def leftJoin[VD2: ClassTag, VD3: ClassTag]
|
||||
(other: RDD[(VertexID, VD2)])
|
||||
(f: (VertexID, VD, Option[VD2]) => VD3)
|
||||
(other: RDD[(VertexId, VD2)])
|
||||
(f: (VertexId, VD, Option[VD2]) => VD3)
|
||||
: VertexRDD[VD3] = {
|
||||
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
|
||||
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin
|
||||
|
@ -225,7 +225,7 @@ class VertexRDD[@specialized VD: ClassTag](
|
|||
* [[innerJoin]] for the behavior of the join.
|
||||
*/
|
||||
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
|
||||
(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
|
||||
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
|
||||
val newPartitionsRDD = partitionsRDD.zipPartitions(
|
||||
other.partitionsRDD, preservesPartitioning = true
|
||||
) { (thisIter, otherIter) =>
|
||||
|
@ -247,8 +247,8 @@ class VertexRDD[@specialized VD: ClassTag](
|
|||
* @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this`
|
||||
* and `other`, with values supplied by `f`
|
||||
*/
|
||||
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
|
||||
(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
|
||||
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
|
||||
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
|
||||
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
|
||||
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
|
||||
other match {
|
||||
|
@ -278,7 +278,7 @@ class VertexRDD[@specialized VD: ClassTag](
|
|||
* messages.
|
||||
*/
|
||||
def aggregateUsingIndex[VD2: ClassTag](
|
||||
messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
|
||||
messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
|
||||
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
|
||||
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
|
||||
val vertexPartition: VertexPartition[VD] = thisIter.next()
|
||||
|
@ -303,8 +303,8 @@ object VertexRDD {
|
|||
*
|
||||
* @param rdd the collection of vertex-attribute pairs
|
||||
*/
|
||||
def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = {
|
||||
val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
|
||||
def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = {
|
||||
val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
|
||||
case Some(p) => rdd
|
||||
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
|
||||
}
|
||||
|
@ -323,8 +323,8 @@ object VertexRDD {
|
|||
* @param rdd the collection of vertex-attribute pairs
|
||||
* @param mergeFunc the associative, commutative merge function.
|
||||
*/
|
||||
def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
|
||||
val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
|
||||
def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
|
||||
val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
|
||||
case Some(p) => rdd
|
||||
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
|
||||
}
|
||||
|
@ -338,7 +338,7 @@ object VertexRDD {
|
|||
* Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
|
||||
* `defaultVal` otherwise.
|
||||
*/
|
||||
def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD)
|
||||
def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD)
|
||||
: VertexRDD[VD] = {
|
||||
VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
|
||||
value.getOrElse(default)
|
||||
|
|
|
@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
|
|||
*/
|
||||
private[graphx]
|
||||
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
|
||||
val srcIds: Array[VertexID],
|
||||
val dstIds: Array[VertexID],
|
||||
val srcIds: Array[VertexId],
|
||||
val dstIds: Array[VertexId],
|
||||
val data: Array[ED],
|
||||
val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable {
|
||||
val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
|
||||
|
||||
/**
|
||||
* Reverse all the edges in this partition.
|
||||
|
@ -118,8 +118,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
|||
*/
|
||||
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
|
||||
val builder = new EdgePartitionBuilder[ED]
|
||||
var currSrcId: VertexID = null.asInstanceOf[VertexID]
|
||||
var currDstId: VertexID = null.asInstanceOf[VertexID]
|
||||
var currSrcId: VertexId = null.asInstanceOf[VertexId]
|
||||
var currDstId: VertexId = null.asInstanceOf[VertexId]
|
||||
var currAttr: ED = null.asInstanceOf[ED]
|
||||
var i = 0
|
||||
while (i < size) {
|
||||
|
@ -153,7 +153,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
|||
*/
|
||||
def innerJoin[ED2: ClassTag, ED3: ClassTag]
|
||||
(other: EdgePartition[ED2])
|
||||
(f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = {
|
||||
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = {
|
||||
val builder = new EdgePartitionBuilder[ED3]
|
||||
var i = 0
|
||||
var j = 0
|
||||
|
@ -210,14 +210,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
|||
* iterator is generated using an index scan, so it is efficient at skipping edges that don't
|
||||
* match srcIdPred.
|
||||
*/
|
||||
def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] =
|
||||
def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
|
||||
index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
|
||||
|
||||
/**
|
||||
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
|
||||
* cluster must start at position `index`.
|
||||
*/
|
||||
private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] {
|
||||
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
|
||||
private[this] val edge = new Edge[ED]
|
||||
private[this] var pos = index
|
||||
|
||||
|
|
|
@ -29,22 +29,22 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
|
|||
var edges = new PrimitiveVector[Edge[ED]](size)
|
||||
|
||||
/** Add a new edge to the partition. */
|
||||
def add(src: VertexID, dst: VertexID, d: ED) {
|
||||
def add(src: VertexId, dst: VertexId, d: ED) {
|
||||
edges += Edge(src, dst, d)
|
||||
}
|
||||
|
||||
def toEdgePartition: EdgePartition[ED] = {
|
||||
val edgeArray = edges.trim().array
|
||||
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
|
||||
val srcIds = new Array[VertexID](edgeArray.size)
|
||||
val dstIds = new Array[VertexID](edgeArray.size)
|
||||
val srcIds = new Array[VertexId](edgeArray.size)
|
||||
val dstIds = new Array[VertexId](edgeArray.size)
|
||||
val data = new Array[ED](edgeArray.size)
|
||||
val index = new PrimitiveKeyOpenHashMap[VertexID, Int]
|
||||
val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
|
||||
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
|
||||
// adding them to the index
|
||||
if (edgeArray.length > 0) {
|
||||
index.update(srcIds(0), 0)
|
||||
var currSrcId: VertexID = srcIds(0)
|
||||
var currSrcId: VertexId = srcIds(0)
|
||||
var i = 0
|
||||
while (i < edgeArray.size) {
|
||||
srcIds(i) = edgeArray(i).srcId
|
||||
|
|
|
@ -41,7 +41,7 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
|
|||
// allocating too many temporary Java objects.
|
||||
private val triplet = new EdgeTriplet[VD, ED]
|
||||
|
||||
private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray)
|
||||
private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
|
||||
|
||||
override def hasNext: Boolean = pos < edgePartition.size
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
|
|||
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
|
||||
}
|
||||
|
||||
override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = {
|
||||
override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
|
||||
if (classTag[VD] equals classTag[VD2]) {
|
||||
// The map preserves type, so we can use incremental replication
|
||||
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
|
||||
|
@ -153,7 +153,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
|
|||
|
||||
override def subgraph(
|
||||
epred: EdgeTriplet[VD, ED] => Boolean = x => true,
|
||||
vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
|
||||
vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
|
||||
// Filter the vertices, reusing the partitioner and the index from this graph
|
||||
val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
|
||||
|
||||
|
@ -195,7 +195,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
|
|||
//////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
override def mapReduceTriplets[A: ClassTag](
|
||||
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
|
||||
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
|
||||
reduceFunc: (A, A) => A,
|
||||
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
|
||||
|
||||
|
@ -225,7 +225,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
|
|||
val edgeIter = activeDirectionOpt match {
|
||||
case Some(EdgeDirection.Both) =>
|
||||
if (activeFraction < 0.8) {
|
||||
edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
|
||||
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
|
||||
.filter(e => vPart.isActive(e.dstId))
|
||||
} else {
|
||||
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
|
||||
|
@ -236,7 +236,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
|
|||
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
|
||||
case Some(EdgeDirection.Out) =>
|
||||
if (activeFraction < 0.8) {
|
||||
edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
|
||||
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
|
||||
} else {
|
||||
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
|
||||
}
|
||||
|
@ -267,8 +267,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
|
|||
} // end of mapReduceTriplets
|
||||
|
||||
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
|
||||
(other: RDD[(VertexID, U)])
|
||||
(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] =
|
||||
(other: RDD[(VertexId, U)])
|
||||
(updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] =
|
||||
{
|
||||
if (classTag[VD] equals classTag[VD2]) {
|
||||
// updateF preserves type, so we can use incremental replication
|
||||
|
@ -312,7 +312,7 @@ object GraphImpl {
|
|||
}
|
||||
|
||||
def apply[VD: ClassTag, ED: ClassTag](
|
||||
vertices: RDD[(VertexID, VD)],
|
||||
vertices: RDD[(VertexId, VD)],
|
||||
edges: RDD[Edge[ED]],
|
||||
defaultVertexAttr: VD): GraphImpl[VD, ED] =
|
||||
{
|
||||
|
@ -321,7 +321,7 @@ object GraphImpl {
|
|||
// Get the set of all vids
|
||||
val partitioner = Partitioner.defaultPartitioner(vertices)
|
||||
val vPartitioned = vertices.partitionBy(partitioner)
|
||||
val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner)
|
||||
val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner)
|
||||
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
|
||||
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
|
||||
}
|
||||
|
@ -355,7 +355,7 @@ object GraphImpl {
|
|||
|
||||
/**
|
||||
* Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
|
||||
* data structure (RDD[(VertexID, VertexID, ED)]).
|
||||
* data structure (RDD[(VertexId, VertexId, ED)]).
|
||||
*
|
||||
* The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
|
||||
* pair: the key is the partition id, and the value is an EdgePartition object containing all the
|
||||
|
@ -378,19 +378,19 @@ object GraphImpl {
|
|||
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
|
||||
edges.cache()
|
||||
// Get the set of all vids
|
||||
val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size))
|
||||
val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size))
|
||||
// Create the VertexRDD.
|
||||
val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
|
||||
GraphImpl(vertices, edges)
|
||||
}
|
||||
|
||||
/** Collects all vids mentioned in edges and partitions them by partitioner. */
|
||||
private def collectVertexIDsFromEdges(
|
||||
private def collectVertexIdsFromEdges(
|
||||
edges: EdgeRDD[_],
|
||||
partitioner: Partitioner): RDD[(VertexID, Int)] = {
|
||||
partitioner: Partitioner): RDD[(VertexId, Int)] = {
|
||||
// TODO: Consider doing map side distinct before shuffle.
|
||||
new ShuffledRDD[VertexID, Int, (VertexID, Int)](
|
||||
edges.collectVertexIDs.map(vid => (vid, 0)), partitioner)
|
||||
.setSerializer(classOf[VertexIDMsgSerializer].getName)
|
||||
new ShuffledRDD[VertexId, Int, (VertexId, Int)](
|
||||
edges.collectVertexIds.map(vid => (vid, 0)), partitioner)
|
||||
.setSerializer(classOf[VertexIdMsgSerializer].getName)
|
||||
}
|
||||
} // end of object GraphImpl
|
||||
|
|
|
@ -20,16 +20,16 @@ package org.apache.spark.graphx.impl
|
|||
import scala.reflect.{classTag, ClassTag}
|
||||
|
||||
import org.apache.spark.Partitioner
|
||||
import org.apache.spark.graphx.{PartitionID, VertexID}
|
||||
import org.apache.spark.graphx.{PartitionID, VertexId}
|
||||
import org.apache.spark.rdd.{ShuffledRDD, RDD}
|
||||
|
||||
|
||||
private[graphx]
|
||||
class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
|
||||
@transient var partition: PartitionID,
|
||||
var vid: VertexID,
|
||||
var vid: VertexId,
|
||||
var data: T)
|
||||
extends Product2[PartitionID, (VertexID, T)] with Serializable {
|
||||
extends Product2[PartitionID, (VertexId, T)] with Serializable {
|
||||
|
||||
override def _1 = partition
|
||||
|
||||
|
@ -61,7 +61,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
|
|||
private[graphx]
|
||||
class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
|
||||
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
|
||||
val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
|
||||
val rdd = new ShuffledRDD[PartitionID, (VertexId, T), VertexBroadcastMsg[T]](self, partitioner)
|
||||
|
||||
// Set a custom serializer if the data is of int or double type.
|
||||
if (classTag[T] == ClassTag.Int) {
|
||||
|
@ -99,8 +99,8 @@ object MsgRDDFunctions {
|
|||
new VertexBroadcastMsgRDDFunctions(rdd)
|
||||
}
|
||||
|
||||
def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = {
|
||||
val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner)
|
||||
def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = {
|
||||
val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner)
|
||||
|
||||
// Set a custom serializer if the data is of int or double type.
|
||||
if (classTag[T] == ClassTag.Int) {
|
||||
|
|
|
@ -50,9 +50,9 @@ class ReplicatedVertexView[VD: ClassTag](
|
|||
* vids from both the source and destination of edges. It must always include both source and
|
||||
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
|
||||
*/
|
||||
private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
|
||||
private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
|
||||
case Some(prevView) =>
|
||||
prevView.localVertexIDMap
|
||||
prevView.localVertexIdMap
|
||||
case None =>
|
||||
edges.partitionsRDD.mapPartitions(_.map {
|
||||
case (pid, epart) =>
|
||||
|
@ -62,7 +62,7 @@ class ReplicatedVertexView[VD: ClassTag](
|
|||
vidToIndex.add(e.dstId)
|
||||
}
|
||||
(pid, vidToIndex)
|
||||
}, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap")
|
||||
}, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap")
|
||||
}
|
||||
|
||||
private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
|
||||
|
@ -75,7 +75,7 @@ class ReplicatedVertexView[VD: ClassTag](
|
|||
srcAttrOnly.unpersist(blocking)
|
||||
dstAttrOnly.unpersist(blocking)
|
||||
noAttrs.unpersist(blocking)
|
||||
// Don't unpersist localVertexIDMap because a future ReplicatedVertexView may be using it
|
||||
// Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it
|
||||
// without modification
|
||||
this
|
||||
}
|
||||
|
@ -133,8 +133,8 @@ class ReplicatedVertexView[VD: ClassTag](
|
|||
|
||||
case None =>
|
||||
// Within each edge partition, place the shipped vertex attributes into the correct
|
||||
// locations specified in localVertexIDMap
|
||||
localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
|
||||
// locations specified in localVertexIdMap
|
||||
localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
|
||||
val (pid, vidToIndex) = mapIter.next()
|
||||
assert(!mapIter.hasNext)
|
||||
// Populate the vertex array using the vidToIndex map
|
||||
|
@ -157,15 +157,15 @@ class ReplicatedVertexView[VD: ClassTag](
|
|||
|
||||
private object ReplicatedVertexView {
|
||||
protected def buildBuffer[VD: ClassTag](
|
||||
pid2vidIter: Iterator[Array[Array[VertexID]]],
|
||||
pid2vidIter: Iterator[Array[Array[VertexId]]],
|
||||
vertexPartIter: Iterator[VertexPartition[VD]]) = {
|
||||
val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
|
||||
val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
|
||||
val vertexPart: VertexPartition[VD] = vertexPartIter.next()
|
||||
|
||||
Iterator.tabulate(pid2vid.size) { pid =>
|
||||
val vidsCandidate = pid2vid(pid)
|
||||
val size = vidsCandidate.length
|
||||
val vids = new PrimitiveVector[VertexID](pid2vid(pid).size)
|
||||
val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
|
||||
val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
|
||||
var i = 0
|
||||
while (i < size) {
|
||||
|
@ -181,16 +181,16 @@ private object ReplicatedVertexView {
|
|||
}
|
||||
|
||||
protected def buildActiveBuffer(
|
||||
pid2vidIter: Iterator[Array[Array[VertexID]]],
|
||||
pid2vidIter: Iterator[Array[Array[VertexId]]],
|
||||
activePartIter: Iterator[VertexPartition[_]])
|
||||
: Iterator[(Int, Array[VertexID])] = {
|
||||
val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
|
||||
: Iterator[(Int, Array[VertexId])] = {
|
||||
val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
|
||||
val activePart: VertexPartition[_] = activePartIter.next()
|
||||
|
||||
Iterator.tabulate(pid2vid.size) { pid =>
|
||||
val vidsCandidate = pid2vid(pid)
|
||||
val size = vidsCandidate.length
|
||||
val actives = new PrimitiveVector[VertexID](vidsCandidate.size)
|
||||
val actives = new PrimitiveVector[VertexId](vidsCandidate.size)
|
||||
var i = 0
|
||||
while (i < size) {
|
||||
val vid = vidsCandidate(i)
|
||||
|
@ -205,8 +205,8 @@ private object ReplicatedVertexView {
|
|||
}
|
||||
|
||||
private[graphx]
|
||||
class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD])
|
||||
class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
|
||||
extends Serializable {
|
||||
def iterator: Iterator[(VertexID, VD)] =
|
||||
def iterator: Iterator[(VertexId, VD)] =
|
||||
(0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
|
||||
}
|
||||
|
|
|
@ -32,12 +32,12 @@ import org.apache.spark.util.collection.PrimitiveVector
|
|||
private[impl]
|
||||
class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
|
||||
|
||||
val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true)
|
||||
val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false)
|
||||
val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true)
|
||||
val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false)
|
||||
val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true)
|
||||
val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false)
|
||||
val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true)
|
||||
val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false)
|
||||
|
||||
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] =
|
||||
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] =
|
||||
(includeSrcAttr, includeDstAttr) match {
|
||||
case (true, true) => bothAttrs
|
||||
case (true, false) => srcAttrOnly
|
||||
|
@ -46,9 +46,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
|
|||
}
|
||||
|
||||
private def createPid2Vid(
|
||||
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = {
|
||||
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = {
|
||||
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
|
||||
val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
|
||||
val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
|
||||
val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
|
||||
val numEdges = edgePartition.size
|
||||
val vSet = new VertexSet
|
||||
|
@ -71,7 +71,7 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
|
|||
|
||||
val numPartitions = vertices.partitions.size
|
||||
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
|
||||
val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID])
|
||||
val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
|
||||
for ((vid, pid) <- iter) {
|
||||
pid2vid(pid) += vid
|
||||
}
|
||||
|
|
|
@ -25,12 +25,12 @@ import org.apache.spark.graphx._
|
|||
import org.apache.spark.serializer._
|
||||
|
||||
private[graphx]
|
||||
class VertexIDMsgSerializer(conf: SparkConf) extends Serializer {
|
||||
class VertexIdMsgSerializer(conf: SparkConf) extends Serializer {
|
||||
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
|
||||
|
||||
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
|
||||
def writeObject[T](t: T) = {
|
||||
val msg = t.asInstanceOf[(VertexID, _)]
|
||||
val msg = t.asInstanceOf[(VertexId, _)]
|
||||
writeVarLong(msg._1, optimizePositive = false)
|
||||
this
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
|
|||
|
||||
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
|
||||
def writeObject[T](t: T) = {
|
||||
val msg = t.asInstanceOf[(VertexID, Int)]
|
||||
val msg = t.asInstanceOf[(VertexId, Int)]
|
||||
writeVarLong(msg._1, optimizePositive = false)
|
||||
writeUnsignedVarInt(msg._2)
|
||||
this
|
||||
|
@ -147,7 +147,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
|
|||
|
||||
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
|
||||
def writeObject[T](t: T) = {
|
||||
val msg = t.asInstanceOf[(VertexID, Long)]
|
||||
val msg = t.asInstanceOf[(VertexId, Long)]
|
||||
writeVarLong(msg._1, optimizePositive = false)
|
||||
writeVarLong(msg._2, optimizePositive = true)
|
||||
this
|
||||
|
@ -171,7 +171,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer {
|
|||
|
||||
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
|
||||
def writeObject[T](t: T) = {
|
||||
val msg = t.asInstanceOf[(VertexID, Double)]
|
||||
val msg = t.asInstanceOf[(VertexId, Double)]
|
||||
writeVarLong(msg._1, optimizePositive = false)
|
||||
writeDouble(msg._2)
|
||||
this
|
||||
|
|
|
@ -26,18 +26,18 @@ import org.apache.spark.util.collection.BitSet
|
|||
|
||||
private[graphx] object VertexPartition {
|
||||
|
||||
def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = {
|
||||
val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
|
||||
def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = {
|
||||
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
|
||||
iter.foreach { case (k, v) =>
|
||||
map(k) = v
|
||||
}
|
||||
new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
|
||||
}
|
||||
|
||||
def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD)
|
||||
def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
|
||||
: VertexPartition[VD] =
|
||||
{
|
||||
val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
|
||||
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
|
||||
iter.foreach { case (k, v) =>
|
||||
map.setMerge(k, v, mergeFunc)
|
||||
}
|
||||
|
@ -60,15 +60,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
def size: Int = mask.cardinality()
|
||||
|
||||
/** Return the vertex attribute for the given vertex ID. */
|
||||
def apply(vid: VertexID): VD = values(index.getPos(vid))
|
||||
def apply(vid: VertexId): VD = values(index.getPos(vid))
|
||||
|
||||
def isDefined(vid: VertexID): Boolean = {
|
||||
def isDefined(vid: VertexId): Boolean = {
|
||||
val pos = index.getPos(vid)
|
||||
pos >= 0 && mask.get(pos)
|
||||
}
|
||||
|
||||
/** Look up vid in activeSet, throwing an exception if it is None. */
|
||||
def isActive(vid: VertexID): Boolean = {
|
||||
def isActive(vid: VertexId): Boolean = {
|
||||
activeSet.get.contains(vid)
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
* each of the entries in the original VertexRDD. The resulting
|
||||
* VertexPartition retains the same index.
|
||||
*/
|
||||
def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = {
|
||||
def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = {
|
||||
// Construct a view of the map transformation
|
||||
val newValues = new Array[VD2](capacity)
|
||||
var i = mask.nextSetBit(0)
|
||||
|
@ -108,7 +108,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
* RDD can be easily joined with the original vertex-set. Furthermore, the filter only
|
||||
* modifies the bitmap index and so no new values are allocated.
|
||||
*/
|
||||
def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = {
|
||||
def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = {
|
||||
// Allocate the array to store the results into
|
||||
val newMask = new BitSet(capacity)
|
||||
// Iterate over the active bits in the old mask and evaluate the predicate
|
||||
|
@ -146,7 +146,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
/** Left outer join another VertexPartition. */
|
||||
def leftJoin[VD2: ClassTag, VD3: ClassTag]
|
||||
(other: VertexPartition[VD2])
|
||||
(f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
|
||||
(f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
|
||||
if (index != other.index) {
|
||||
logWarning("Joining two VertexPartitions with different indexes is slow.")
|
||||
leftJoin(createUsingIndex(other.iterator))(f)
|
||||
|
@ -165,14 +165,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
|
||||
/** Left outer join another iterator of messages. */
|
||||
def leftJoin[VD2: ClassTag, VD3: ClassTag]
|
||||
(other: Iterator[(VertexID, VD2)])
|
||||
(f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
|
||||
(other: Iterator[(VertexId, VD2)])
|
||||
(f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
|
||||
leftJoin(createUsingIndex(other))(f)
|
||||
}
|
||||
|
||||
/** Inner join another VertexPartition. */
|
||||
def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
|
||||
(f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
|
||||
(f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
|
||||
if (index != other.index) {
|
||||
logWarning("Joining two VertexPartitions with different indexes is slow.")
|
||||
innerJoin(createUsingIndex(other.iterator))(f)
|
||||
|
@ -192,15 +192,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
* Inner join an iterator of messages.
|
||||
*/
|
||||
def innerJoin[U: ClassTag, VD2: ClassTag]
|
||||
(iter: Iterator[Product2[VertexID, U]])
|
||||
(f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
|
||||
(iter: Iterator[Product2[VertexId, U]])
|
||||
(f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
|
||||
innerJoin(createUsingIndex(iter))(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar effect as aggregateUsingIndex((a, b) => a)
|
||||
*/
|
||||
def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]])
|
||||
def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
|
||||
: VertexPartition[VD2] = {
|
||||
val newMask = new BitSet(capacity)
|
||||
val newValues = new Array[VD2](capacity)
|
||||
|
@ -218,7 +218,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
|
||||
* the partition, hidden by the bitmask.
|
||||
*/
|
||||
def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = {
|
||||
def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = {
|
||||
val newMask = new BitSet(capacity)
|
||||
val newValues = new Array[VD](capacity)
|
||||
System.arraycopy(values, 0, newValues, 0, newValues.length)
|
||||
|
@ -233,7 +233,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
}
|
||||
|
||||
def aggregateUsingIndex[VD2: ClassTag](
|
||||
iter: Iterator[Product2[VertexID, VD2]],
|
||||
iter: Iterator[Product2[VertexId, VD2]],
|
||||
reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
|
||||
val newMask = new BitSet(capacity)
|
||||
val newValues = new Array[VD2](capacity)
|
||||
|
@ -253,7 +253,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
new VertexPartition[VD2](index, newValues, newMask)
|
||||
}
|
||||
|
||||
def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = {
|
||||
def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = {
|
||||
val newActiveSet = new VertexSet
|
||||
iter.foreach(newActiveSet.add(_))
|
||||
new VertexPartition(index, values, mask, Some(newActiveSet))
|
||||
|
@ -263,7 +263,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
* Construct a new VertexPartition whose index contains only the vertices in the mask.
|
||||
*/
|
||||
def reindex(): VertexPartition[VD] = {
|
||||
val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD]
|
||||
val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
|
||||
val arbitraryMerge = (a: VD, b: VD) => a
|
||||
for ((k, v) <- this.iterator) {
|
||||
hashMap.setMerge(k, v, arbitraryMerge)
|
||||
|
@ -271,8 +271,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
|
|||
new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
|
||||
}
|
||||
|
||||
def iterator: Iterator[(VertexID, VD)] =
|
||||
def iterator: Iterator[(VertexId, VD)] =
|
||||
mask.iterator.map(ind => (index.getValue(ind), values(ind)))
|
||||
|
||||
def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind))
|
||||
def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind))
|
||||
}
|
||||
|
|
|
@ -20,5 +20,5 @@ package org.apache.spark.graphx
|
|||
import org.apache.spark.util.collection.OpenHashSet
|
||||
|
||||
package object impl {
|
||||
private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexID]
|
||||
private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexId]
|
||||
}
|
||||
|
|
|
@ -35,9 +35,9 @@ object ConnectedComponents {
|
|||
* @return a graph with vertex attributes containing the smallest vertex in each
|
||||
* connected component
|
||||
*/
|
||||
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
|
||||
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
|
||||
val ccGraph = graph.mapVertices { case (vid, _) => vid }
|
||||
def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
|
||||
def sendMessage(edge: EdgeTriplet[VertexId, ED]) = {
|
||||
if (edge.srcAttr < edge.dstAttr) {
|
||||
Iterator((edge.dstId, edge.srcAttr))
|
||||
} else if (edge.srcAttr > edge.dstAttr) {
|
||||
|
|
|
@ -92,7 +92,7 @@ object PageRank extends Logging {
|
|||
|
||||
// Define the three functions needed to implement PageRank in the GraphX
|
||||
// version of Pregel
|
||||
def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
|
||||
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
|
||||
resetProb + (1.0 - resetProb) * msgSum
|
||||
def sendMessage(edge: EdgeTriplet[Double, Double]) =
|
||||
Iterator((edge.dstId, edge.srcAttr * edge.attr))
|
||||
|
@ -137,7 +137,7 @@ object PageRank extends Logging {
|
|||
|
||||
// Define the three functions needed to implement PageRank in the GraphX
|
||||
// version of Pregel
|
||||
def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = {
|
||||
def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
|
||||
val (oldPR, lastDelta) = attr
|
||||
val newPR = oldPR + (1.0 - resetProb) * msgSum
|
||||
(newPR, newPR - oldPR)
|
||||
|
|
|
@ -79,13 +79,13 @@ object SVDPlusPlus {
|
|||
(g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
|
||||
|
||||
g = g.outerJoinVertices(t0) {
|
||||
(vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
|
||||
(vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
|
||||
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
|
||||
}
|
||||
|
||||
def mapTrainF(conf: Conf, u: Double)
|
||||
(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
|
||||
: Iterator[(VertexID, (RealVector, RealVector, Double))] = {
|
||||
: Iterator[(VertexId, (RealVector, RealVector, Double))] = {
|
||||
val (usr, itm) = (et.srcAttr, et.dstAttr)
|
||||
val (p, q) = (usr._1, itm._1)
|
||||
var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
|
||||
|
@ -112,7 +112,7 @@ object SVDPlusPlus {
|
|||
et => Iterator((et.srcId, et.dstAttr._2)),
|
||||
(g1: RealVector, g2: RealVector) => g1.add(g2))
|
||||
g = g.outerJoinVertices(t1) {
|
||||
(vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
|
||||
(vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
|
||||
if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
|
||||
}
|
||||
|
||||
|
@ -123,7 +123,7 @@ object SVDPlusPlus {
|
|||
(g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
|
||||
(g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
|
||||
g = g.outerJoinVertices(t2) {
|
||||
(vid: VertexID,
|
||||
(vid: VertexId,
|
||||
vd: (RealVector, RealVector, Double, Double),
|
||||
msg: Option[(RealVector, RealVector, Double)]) =>
|
||||
(vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
|
||||
|
@ -133,7 +133,7 @@ object SVDPlusPlus {
|
|||
// calculate error on training set
|
||||
def mapTestF(conf: Conf, u: Double)
|
||||
(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
|
||||
: Iterator[(VertexID, Double)] =
|
||||
: Iterator[(VertexId, Double)] =
|
||||
{
|
||||
val (usr, itm) = (et.srcAttr, et.dstAttr)
|
||||
val (p, q) = (usr._1, itm._1)
|
||||
|
@ -146,7 +146,7 @@ object SVDPlusPlus {
|
|||
g.cache()
|
||||
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
|
||||
g = g.outerJoinVertices(t3) {
|
||||
(vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
|
||||
(vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
|
||||
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ object StronglyConnectedComponents {
|
|||
*
|
||||
* @return a graph with vertex attributes containing the smallest vertex id in each SCC
|
||||
*/
|
||||
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = {
|
||||
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED] = {
|
||||
|
||||
// the graph we update with final SCC ids, and the graph we return at the end
|
||||
var sccGraph = graph.mapVertices { case (vid, _) => vid }
|
||||
|
@ -71,7 +71,7 @@ object StronglyConnectedComponents {
|
|||
|
||||
// collect min of all my neighbor's scc values, update if it's smaller than mine
|
||||
// then notify any neighbors with scc values larger than mine
|
||||
sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID](
|
||||
sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
|
||||
sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
|
||||
(vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
|
||||
e => {
|
||||
|
@ -85,7 +85,7 @@ object StronglyConnectedComponents {
|
|||
|
||||
// start at root of SCCs. Traverse values in reverse, notify all my neighbors
|
||||
// do not propagate if colors do not match!
|
||||
sccWorkGraph = Pregel[(VertexID, Boolean), ED, Boolean](
|
||||
sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
|
||||
sccWorkGraph, false, activeDirection = EdgeDirection.In)(
|
||||
// vertex is final if it is the root of a color
|
||||
// or it has the same color as a neighbor that is final
|
||||
|
|
|
@ -61,7 +61,7 @@ object TriangleCount {
|
|||
(vid, _, optSet) => optSet.getOrElse(null)
|
||||
}
|
||||
// Edge function computes intersection of smaller vertex with larger vertex
|
||||
def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = {
|
||||
def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexId, Int)] = {
|
||||
assert(et.srcAttr != null)
|
||||
assert(et.dstAttr != null)
|
||||
val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
|
||||
|
|
|
@ -25,11 +25,11 @@ package object graphx {
|
|||
* A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
|
||||
* to follow any ordering or any constraints other than uniqueness.
|
||||
*/
|
||||
type VertexID = Long
|
||||
type VertexId = Long
|
||||
|
||||
/** Integer identifer of a graph partition. */
|
||||
// TODO: Consider using Char.
|
||||
type PartitionID = Int
|
||||
|
||||
private[graphx] type VertexSet = OpenHashSet[VertexID]
|
||||
private[graphx] type VertexSet = OpenHashSet[VertexId]
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ object GraphGenerators {
|
|||
val mu = 4
|
||||
val sigma = 1.3
|
||||
|
||||
val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{
|
||||
val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{
|
||||
src => (src, sampleLogNormal(mu, sigma, numVertices))
|
||||
}
|
||||
val edges = vertices.flatMap { v =>
|
||||
|
@ -59,9 +59,9 @@ object GraphGenerators {
|
|||
Graph(vertices, edges, 0)
|
||||
}
|
||||
|
||||
def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = {
|
||||
def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = {
|
||||
val rand = new Random()
|
||||
Array.fill(maxVertexID) { Edge[Int](src, rand.nextInt(maxVertexID), 1) }
|
||||
Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -206,9 +206,9 @@ object GraphGenerators {
|
|||
*/
|
||||
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
|
||||
// Convert row column address into vertex ids (row major order)
|
||||
def sub2ind(r: Int, c: Int): VertexID = r * cols + c
|
||||
def sub2ind(r: Int, c: Int): VertexId = r * cols + c
|
||||
|
||||
val vertices: RDD[(VertexID, (Int,Int))] =
|
||||
val vertices: RDD[(VertexId, (Int,Int))] =
|
||||
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
|
||||
val edges: RDD[Edge[Double]] =
|
||||
vertices.flatMap{ case (vid, (r,c)) =>
|
||||
|
@ -228,7 +228,7 @@ object GraphGenerators {
|
|||
* being the center vertex.
|
||||
*/
|
||||
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
|
||||
val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
|
||||
val edges: RDD[(VertexId, VertexId)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
|
||||
Graph.fromEdgeTuples(edges, 1)
|
||||
} // end of starGraph
|
||||
|
||||
|
|
|
@ -28,12 +28,12 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
|
|||
test("joinVertices") {
|
||||
withSpark { sc =>
|
||||
val vertices =
|
||||
sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2)
|
||||
sc.parallelize(Seq[(VertexId, String)]((1, "one"), (2, "two"), (3, "three")), 2)
|
||||
val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
|
||||
val g: Graph[String, String] = Graph(vertices, edges)
|
||||
|
||||
val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20)))
|
||||
val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u }
|
||||
val tbl = sc.parallelize(Seq[(VertexId, Int)]((1, 10), (2, 20)))
|
||||
val g1 = g.joinVertices(tbl) { (vid: VertexId, attr: String, u: Int) => attr + u }
|
||||
|
||||
val v = g1.vertices.collect().toSet
|
||||
assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
|
||||
|
@ -60,7 +60,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
|
|||
test ("filter") {
|
||||
withSpark { sc =>
|
||||
val n = 5
|
||||
val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
|
||||
val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x)))
|
||||
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
|
||||
val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
|
||||
val filteredGraph = graph.filter(
|
||||
|
@ -68,7 +68,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
|
|||
val degrees: VertexRDD[Int] = graph.outDegrees
|
||||
graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
|
||||
},
|
||||
vpred = (vid: VertexID, deg:Int) => deg > 0
|
||||
vpred = (vid: VertexId, deg:Int) => deg > 0
|
||||
).cache()
|
||||
|
||||
val v = filteredGraph.vertices.collect().toSet
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.spark.rdd._
|
|||
class GraphSuite extends FunSuite with LocalSparkContext {
|
||||
|
||||
def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = {
|
||||
Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
|
||||
Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexId, x: VertexId)), 3), "v")
|
||||
}
|
||||
|
||||
test("Graph.fromEdgeTuples") {
|
||||
|
@ -57,7 +57,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
withSpark { sc =>
|
||||
val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
|
||||
val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) }
|
||||
val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
|
||||
val vertices: RDD[(VertexId, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
|
||||
val graph = Graph(vertices, edges, false)
|
||||
assert( graph.edges.count() === rawEdges.size )
|
||||
// Vertices not explicitly provided but referenced by edges should be created automatically
|
||||
|
@ -74,7 +74,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
val n = 5
|
||||
val star = starGraph(sc, n)
|
||||
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
|
||||
(1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet)
|
||||
(1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
val p = 100
|
||||
val verts = 1 to n
|
||||
val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
|
||||
verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0)
|
||||
verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
|
||||
assert(graph.edges.partitions.length === p)
|
||||
val partitionedGraph = graph.partitionBy(EdgePartition2D)
|
||||
assert(graph.edges.partitions.length === p)
|
||||
|
@ -136,10 +136,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
val star = starGraph(sc, n)
|
||||
// mapVertices preserving type
|
||||
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
|
||||
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet)
|
||||
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
|
||||
// mapVertices changing type
|
||||
val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
|
||||
assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet)
|
||||
assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -168,7 +168,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
withSpark { sc =>
|
||||
val n = 5
|
||||
val star = starGraph(sc, n)
|
||||
assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet)
|
||||
assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,7 +191,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
test("mask") {
|
||||
withSpark { sc =>
|
||||
val n = 5
|
||||
val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
|
||||
val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x)))
|
||||
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
|
||||
val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
|
||||
|
||||
|
@ -218,7 +218,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
val star = starGraph(sc, n)
|
||||
val doubleStar = Graph.fromEdgeTuples(
|
||||
sc.parallelize((1 to n).flatMap(x =>
|
||||
List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v")
|
||||
List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v")
|
||||
val star2 = doubleStar.groupEdges { (a, b) => a}
|
||||
assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
|
||||
star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
|
||||
|
@ -237,7 +237,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
|
||||
|
||||
// activeSetOpt
|
||||
val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID)
|
||||
val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId)
|
||||
val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
|
||||
val vids = complete.mapVertices((vid, attr) => vid).cache()
|
||||
val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
|
||||
|
@ -248,10 +248,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
}
|
||||
Iterator((et.srcId, 1))
|
||||
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
|
||||
assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet)
|
||||
assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)
|
||||
|
||||
// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
|
||||
val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3)
|
||||
val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3)
|
||||
val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
|
||||
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
|
||||
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
|
||||
|
@ -262,7 +262,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
}
|
||||
Iterator((et.dstId, 1))
|
||||
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
|
||||
assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet)
|
||||
assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -277,7 +277,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
|
||||
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
|
||||
(a: Int, b: Int) => a + b).collect.toSet
|
||||
assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0)))
|
||||
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
|
||||
// outerJoinVertices preserving type
|
||||
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
|
||||
val newReverseStar =
|
||||
|
|
|
@ -27,7 +27,7 @@ class PregelSuite extends FunSuite with LocalSparkContext {
|
|||
test("1 iteration") {
|
||||
withSpark { sc =>
|
||||
val n = 5
|
||||
val starEdges = (1 to n).map(x => (0: VertexID, x: VertexID))
|
||||
val starEdges = (1 to n).map(x => (0: VertexId, x: VertexId))
|
||||
val star = Graph.fromEdgeTuples(sc.parallelize(starEdges, 3), "v").cache()
|
||||
val result = Pregel(star, 0)(
|
||||
(vid, attr, msg) => attr,
|
||||
|
@ -41,12 +41,12 @@ class PregelSuite extends FunSuite with LocalSparkContext {
|
|||
withSpark { sc =>
|
||||
val n = 5
|
||||
val chain = Graph.fromEdgeTuples(
|
||||
sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3),
|
||||
sc.parallelize((1 until n).map(x => (x: VertexId, x + 1: VertexId)), 3),
|
||||
0).cache()
|
||||
assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet)
|
||||
assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexId, 0)).toSet)
|
||||
val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }.cache()
|
||||
assert(chainWithSeed.vertices.collect.toSet ===
|
||||
Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet)
|
||||
Set((1: VertexId, 1)) ++ (2 to n).map(x => (x: VertexId, 0)).toSet)
|
||||
val result = Pregel(chainWithSeed, 0)(
|
||||
(vid, attr, msg) => math.max(msg, attr),
|
||||
et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty,
|
||||
|
|
|
@ -99,7 +99,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
|
|||
|
||||
test("IntAggMsgSerializer") {
|
||||
val conf = new SparkConf(false)
|
||||
val outMsg = (4: VertexID, 5)
|
||||
val outMsg = (4: VertexId, 5)
|
||||
val bout = new ByteArrayOutputStream
|
||||
val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
|
||||
outStrm.writeObject(outMsg)
|
||||
|
@ -107,8 +107,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
|
|||
bout.flush()
|
||||
val bin = new ByteArrayInputStream(bout.toByteArray)
|
||||
val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
|
||||
val inMsg1: (VertexID, Int) = inStrm.readObject()
|
||||
val inMsg2: (VertexID, Int) = inStrm.readObject()
|
||||
val inMsg1: (VertexId, Int) = inStrm.readObject()
|
||||
val inMsg2: (VertexId, Int) = inStrm.readObject()
|
||||
assert(outMsg === inMsg1)
|
||||
assert(outMsg === inMsg2)
|
||||
|
||||
|
@ -119,7 +119,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
|
|||
|
||||
test("LongAggMsgSerializer") {
|
||||
val conf = new SparkConf(false)
|
||||
val outMsg = (4: VertexID, 1L << 32)
|
||||
val outMsg = (4: VertexId, 1L << 32)
|
||||
val bout = new ByteArrayOutputStream
|
||||
val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
|
||||
outStrm.writeObject(outMsg)
|
||||
|
@ -127,8 +127,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
|
|||
bout.flush()
|
||||
val bin = new ByteArrayInputStream(bout.toByteArray)
|
||||
val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
|
||||
val inMsg1: (VertexID, Long) = inStrm.readObject()
|
||||
val inMsg2: (VertexID, Long) = inStrm.readObject()
|
||||
val inMsg1: (VertexId, Long) = inStrm.readObject()
|
||||
val inMsg2: (VertexId, Long) = inStrm.readObject()
|
||||
assert(outMsg === inMsg1)
|
||||
assert(outMsg === inMsg2)
|
||||
|
||||
|
@ -139,7 +139,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
|
|||
|
||||
test("DoubleAggMsgSerializer") {
|
||||
val conf = new SparkConf(false)
|
||||
val outMsg = (4: VertexID, 5.0)
|
||||
val outMsg = (4: VertexId, 5.0)
|
||||
val bout = new ByteArrayOutputStream
|
||||
val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
|
||||
outStrm.writeObject(outMsg)
|
||||
|
@ -147,8 +147,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
|
|||
bout.flush()
|
||||
val bin = new ByteArrayInputStream(bout.toByteArray)
|
||||
val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
|
||||
val inMsg1: (VertexID, Double) = inStrm.readObject()
|
||||
val inMsg2: (VertexID, Double) = inStrm.readObject()
|
||||
val inMsg1: (VertexId, Double) = inStrm.readObject()
|
||||
val inMsg2: (VertexId, Double) = inStrm.readObject()
|
||||
assert(outMsg === inMsg1)
|
||||
assert(outMsg === inMsg2)
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ class EdgePartitionSuite extends FunSuite {
|
|||
test("innerJoin") {
|
||||
def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
|
||||
val builder = new EdgePartitionBuilder[A]
|
||||
for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) }
|
||||
for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
|
||||
builder.toEdgePartition
|
||||
}
|
||||
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
|
||||
|
|
|
@ -100,7 +100,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
|
|||
test("Connected Components on a Toy Connected Graph") {
|
||||
withSpark { sc =>
|
||||
// Create an RDD for the vertices
|
||||
val users: RDD[(VertexID, (String, String))] =
|
||||
val users: RDD[(VertexId, (String, String))] =
|
||||
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
|
||||
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
|
||||
(4L, ("peter", "student"))))
|
||||
|
|
Loading…
Reference in a new issue