From 6acc2a7b3d3c2a1b449c2c50ece7ad34b0863ed1 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 29 Jun 2013 21:28:31 -0700 Subject: [PATCH] Various minor changes. --- graph/src/main/scala/spark/graph/Analytics.scala | 4 ++++ graph/src/main/scala/spark/graph/Graph.scala | 6 +++--- graph/src/main/scala/spark/graph/GraphOps.scala | 4 ++-- graph/src/main/scala/spark/graph/Pregel.scala | 15 ++++++++------- graph/src/main/scala/spark/graph/Vertex.scala | 2 +- .../scala/spark/graph/impl/EdgePartition.scala | 8 ++++---- .../main/scala/spark/graph/impl/GraphImpl.scala | 12 ++++-------- 7 files changed, 26 insertions(+), 25 deletions(-) diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index 05275bec68..8f76622db0 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -7,6 +7,10 @@ import spark.SparkContext._ object Analytics extends Logging { + def main(args: Array[String]) { + //pregelPagerank() + } + // /** // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD // */ diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index fb5a86c41f..506e53df08 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -26,7 +26,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * * @todo should vertices return tuples instead of vertex objects? */ - def vertices(): RDD[Vertex[VD]] + def vertices: RDD[Vertex[VD]] /** * Get the Edges and their data as an RDD. The entries in the RDD contain @@ -41,7 +41,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @todo Should edges return 3 tuples instead of Edge objects? In this case * we could rename EdgeTriplet to Edge? */ - def edges(): RDD[Edge[ED]] + def edges: RDD[Edge[ED]] /** * Get the edges with the vertex data associated with the adjacent pair of @@ -61,7 +61,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @see edges() If only the edge data and adjacent vertex ids are required. * */ - def triplets(): RDD[EdgeTriplet[VD, ED]] + def triplets: RDD[EdgeTriplet[VD, ED]] /** * Return a graph that is cached when first created. This is used to pin a diff --git a/graph/src/main/scala/spark/graph/GraphOps.scala b/graph/src/main/scala/spark/graph/GraphOps.scala index 4fba8d1976..d98cd8d44c 100644 --- a/graph/src/main/scala/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/spark/graph/GraphOps.scala @@ -14,11 +14,11 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) { } lazy val outDegrees: RDD[(Vid, Int)] = { - g.aggregateNeighbors((vid,edge) => Some(1), _+_, EdgeDirection.Out) + g.aggregateNeighbors((vid, edge) => Some(1), _+_, EdgeDirection.Out) } lazy val degrees: RDD[(Vid, Int)] = { - g.aggregateNeighbors((vid,edge) => Some(1), _+_, EdgeDirection.Both) + g.aggregateNeighbors((vid, edge) => Some(1), _+_, EdgeDirection.Both) } def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = { diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/spark/graph/Pregel.scala index 699affba81..0a564b8041 100644 --- a/graph/src/main/scala/spark/graph/Pregel.scala +++ b/graph/src/main/scala/spark/graph/Pregel.scala @@ -1,19 +1,20 @@ package spark.graph -import scala.collection.JavaConversions._ import spark.RDD object Pregel { def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( - vprog: ( Vertex[VD], A) => VD, - sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], - mergeMsg: (A, A) => A, - initialMsg: A, - numIter: Int) : Graph[VD, ED] = { + vprog: (Vertex[VD], A) => VD, + sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], + mergeMsg: (A, A) => A, + initialMsg: A, + numIter: Int) + : Graph[VD, ED] = { - var g = graph.cache + var g = graph + //var g = graph.cache() var i = 0 def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge) diff --git a/graph/src/main/scala/spark/graph/Vertex.scala b/graph/src/main/scala/spark/graph/Vertex.scala index 5323643bec..32653571f7 100644 --- a/graph/src/main/scala/spark/graph/Vertex.scala +++ b/graph/src/main/scala/spark/graph/Vertex.scala @@ -9,7 +9,7 @@ case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD var id: Vid = 0, var data: VD = nullValue[VD]) { - def this(tuple: Tuple2[Vid, VD]) = this(tuple._1, tuple._2) + def this(tuple: (Vid, VD)) = this(tuple._1, tuple._2) def tuple = (id, data) } diff --git a/graph/src/main/scala/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/spark/graph/impl/EdgePartition.scala index e5ed2db0f2..0008534c0f 100644 --- a/graph/src/main/scala/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/spark/graph/impl/EdgePartition.scala @@ -11,8 +11,8 @@ import spark.graph._ * A partition of edges in 3 large columnar arrays. */ private[graph] -class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] -{ +class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] { + val srcIds: IntArrayList = new IntArrayList val dstIds: IntArrayList = new IntArrayList // TODO: Specialize data. @@ -33,7 +33,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) def size: Int = srcIds.size def iterator = new Iterator[Edge[ED]] { - private var edge = new Edge[ED] + private val edge = new Edge[ED] private var pos = 0 override def hasNext: Boolean = pos < EdgePartition.this.size @@ -46,4 +46,4 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) edge } } -} \ No newline at end of file +} diff --git a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala index 6ee7b8a062..e18a2f6b09 100644 --- a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala @@ -2,11 +2,7 @@ package spark.graph.impl import scala.collection.JavaConversions._ -import spark.ClosureCleaner -import spark.HashPartitioner -import spark.Partitioner -import spark.RDD -import spark.SparkContext +import spark.{ClosureCleaner, HashPartitioner, RDD} import spark.SparkContext._ import spark.graph._ @@ -31,7 +27,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = { if (_cached) { - (new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)) + new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable) .cache() } else { new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null) @@ -73,13 +69,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( if (!_cached && _rawEdges != null) { _rawEdges } else { - eTable.mapPartitions { iter => iter.next._2.iterator } + eTable.mapPartitions { iter => iter.next()._2.iterator } } } /** Return a RDD that brings edges with its source and destination vertices together. */ override def triplets: RDD[EdgeTriplet[VD, ED]] = { - (new EdgeTripletRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 } + new EdgeTripletRDD(vTableReplicated, eTable).mapPartitions { part => part.next()._2 } } override def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] = {