Various minor changes.

This commit is contained in:
Reynold Xin 2013-06-29 21:28:31 -07:00
parent ce7e270bb4
commit 6acc2a7b3d
7 changed files with 26 additions and 25 deletions

View file

@ -7,6 +7,10 @@ import spark.SparkContext._
object Analytics extends Logging {
def main(args: Array[String]) {
//pregelPagerank()
}
// /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
// */

View file

@ -26,7 +26,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
*
* @todo should vertices return tuples instead of vertex objects?
*/
def vertices(): RDD[Vertex[VD]]
def vertices: RDD[Vertex[VD]]
/**
* Get the Edges and their data as an RDD. The entries in the RDD contain
@ -41,7 +41,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* @todo Should edges return 3 tuples instead of Edge objects? In this case
* we could rename EdgeTriplet to Edge?
*/
def edges(): RDD[Edge[ED]]
def edges: RDD[Edge[ED]]
/**
* Get the edges with the vertex data associated with the adjacent pair of
@ -61,7 +61,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* @see edges() If only the edge data and adjacent vertex ids are required.
*
*/
def triplets(): RDD[EdgeTriplet[VD, ED]]
def triplets: RDD[EdgeTriplet[VD, ED]]
/**
* Return a graph that is cached when first created. This is used to pin a

View file

@ -1,6 +1,5 @@
package spark.graph
import scala.collection.JavaConversions._
import spark.RDD
@ -11,9 +10,11 @@ object Pregel {
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A,
initialMsg: A,
numIter: Int) : Graph[VD, ED] = {
numIter: Int)
: Graph[VD, ED] = {
var g = graph.cache
var g = graph
//var g = graph.cache()
var i = 0
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge)

View file

@ -9,7 +9,7 @@ case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD
var id: Vid = 0,
var data: VD = nullValue[VD]) {
def this(tuple: Tuple2[Vid, VD]) = this(tuple._1, tuple._2)
def this(tuple: (Vid, VD)) = this(tuple._1, tuple._2)
def tuple = (id, data)
}

View file

@ -11,8 +11,8 @@ import spark.graph._
* A partition of edges in 3 large columnar arrays.
*/
private[graph]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]
{
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] {
val srcIds: IntArrayList = new IntArrayList
val dstIds: IntArrayList = new IntArrayList
// TODO: Specialize data.
@ -33,7 +33,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
def size: Int = srcIds.size
def iterator = new Iterator[Edge[ED]] {
private var edge = new Edge[ED]
private val edge = new Edge[ED]
private var pos = 0
override def hasNext: Boolean = pos < EdgePartition.this.size

View file

@ -2,11 +2,7 @@ package spark.graph.impl
import scala.collection.JavaConversions._
import spark.ClosureCleaner
import spark.HashPartitioner
import spark.Partitioner
import spark.RDD
import spark.SparkContext
import spark.{ClosureCleaner, HashPartitioner, RDD}
import spark.SparkContext._
import spark.graph._
@ -31,7 +27,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
if (_cached) {
(new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable))
new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)
.cache()
} else {
new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
@ -73,13 +69,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
if (!_cached && _rawEdges != null) {
_rawEdges
} else {
eTable.mapPartitions { iter => iter.next._2.iterator }
eTable.mapPartitions { iter => iter.next()._2.iterator }
}
}
/** Return a RDD that brings edges with its source and destination vertices together. */
override def triplets: RDD[EdgeTriplet[VD, ED]] = {
(new EdgeTripletRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 }
new EdgeTripletRDD(vTableReplicated, eTable).mapPartitions { part => part.next()._2 }
}
override def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] = {