diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index 5b0e5221ba..5c16ba8175 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -2,25 +2,9 @@ package spark.graph import spark._ import spark.SparkContext._ -// import com.esotericsoftware.kryo._ // import breeze.linalg._ - - -// class AnalyticsKryoRegistrator extends KryoRegistrator { -// def registerClasses(kryo: Kryo) { -// println("registering kryo") -// kryo.register(classOf[(Int,Float,Float)]) -// Graph.kryoRegister[(Int,Float,Float), Float](kryo) -// Graph.kryoRegister[(Int,Float), Float](kryo) -// Graph.kryoRegister[Int, Float](kryo) -// Graph.kryoRegister[Float, Float](kryo) -// kryo.setReferences(false); -// } -// } - - object Analytics { @@ -111,14 +95,6 @@ object Analytics { gatherDirection = EdgeDirection.In) } - - - - - - - - // /** // * Compute the connected component membership of each vertex // * and return an RDD with the vertex value containing the @@ -250,9 +226,6 @@ object Analytics { // //System.setProperty("spark.shuffle.compress", "false") // System.setProperty("spark.kryo.registrator", "spark.graphlab.AnalyticsKryoRegistrator") - - - taskType match { case "pagerank" => { diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 1b1929cc26..7b1111ae77 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -43,12 +43,6 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl } -private[graph] -case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U, - @specialized(Char, Int, Boolean, Byte, Long, Float, Double) V]( - var _1: U, var _2: V) - - /** * A Graph RDD that supports computation on graphs. */ @@ -61,26 +55,23 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected ( _rawETable: RDD[(Pid, EdgePartition[ED])]) { def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = { - this( - Graph.DEFAULT_NUM_VERTEX_PARTITIONS, Graph.DEFAULT_NUM_EDGE_PARTITIONS, - vertices, edges, - null, null) + this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null) } def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = { if (_cached) { - val newgraph = new Graph(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable) - newgraph.cache() + (new Graph(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)) + .cache() } else { new Graph(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null) } } - def withVertexPartitioner(numVertexPartitions: Int = Graph.DEFAULT_NUM_VERTEX_PARTITIONS) = { + def withVertexPartitioner(numVertexPartitions: Int) = { withPartitioner(numVertexPartitions, numEdgePartitions) } - def withEdgePartitioner(numEdgePartitions: Int = Graph.DEFAULT_NUM_EDGE_PARTITIONS) = { + def withEdgePartitioner(numEdgePartitions: Int) = { withPartitioner(numVertexPartitions, numEdgePartitions) } @@ -139,11 +130,11 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected ( edgeDirection) } - def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]): Graph[VD2, ED] = { + def mapVertices[VD2: ClassManifest](f: Vertex[VD] => Vertex[VD2]): Graph[VD2, ED] = { newGraph(vertices.map(f), edges) } - def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]): Graph[VD, ED2] = { + def mapEdges[ED2: ClassManifest](f: Edge[ED] => Edge[ED2]): Graph[VD, ED2] = { newGraph(vertices, edges.map(f)) } @@ -237,7 +228,7 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected ( } } } - vmap.int2ObjectEntrySet().fastIterator().filter{!_.getValue()._2.isEmpty}.map{ entry => + vmap.int2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry => (entry.getIntKey(), entry.getValue()._2) } } @@ -316,14 +307,10 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected ( object Graph { - val DEFAULT_NUM_VERTEX_PARTITIONS = 5 - val DEFAULT_NUM_EDGE_PARTITIONS = 5 - /** * Load an edge list from file initializing the Graph RDD */ - def textFile[ED: ClassManifest](sc: SparkContext, - fname: String, edgeParser: Array[String] => ED ) = { + def textFile[ED: ClassManifest](sc: SparkContext, fname: String, edgeParser: Array[String] => ED) = { // Parse the edge data table val edges = sc.textFile(fname).map { line => diff --git a/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala index e72e500fb8..e1cb77f114 100644 --- a/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala @@ -8,21 +8,17 @@ import spark.KryoRegistrator class GraphKryoRegistrator extends KryoRegistrator { def registerClasses(kryo: Kryo) { - kryo.register(classOf[(Int, Float, Float)]) - registerClass[(Int, Float, Float), Float](kryo) - registerClass[(Int, Float), Float](kryo) - registerClass[Int, Float](kryo) - registerClass[Float, Float](kryo) + //kryo.register(classOf[(Int, Float, Float)]) + registerClass[Int, Int, Int](kryo) // This avoids a large number of hash table lookups. kryo.setReferences(false) } - private def registerClass[VD: Manifest, ED: Manifest](kryo: Kryo) { - //kryo.register(classManifest[VD].erasure) - // kryo.register(classManifest[ED].erasure) - kryo.register(classOf[(Vid, Vid, ED)]) - kryo.register(classOf[(Vid, ED)]) - //kryo.register(classOf[EdgeBlockRecord[ED]]) + private def registerClass[VD: Manifest, ED: Manifest, VD2: Manifest](kryo: Kryo) { + kryo.register(classOf[Vertex[VD]]) + kryo.register(classOf[Edge[ED]]) + kryo.register(classOf[MutableTuple2[VD, VD2]]) + kryo.register(classOf[(Vid, VD2)]) } } diff --git a/graph/src/main/scala/spark/graph/package.scala b/graph/src/main/scala/spark/graph/package.scala index e7ec3f6e86..cf1b23ca5d 100644 --- a/graph/src/main/scala/spark/graph/package.scala +++ b/graph/src/main/scala/spark/graph/package.scala @@ -11,4 +11,11 @@ package object graph { * Return the default null-like value for a data type T. */ def nullValue[T] = null.asInstanceOf[T] + + + private[graph] + case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U, + @specialized(Char, Int, Boolean, Byte, Long, Float, Double) V]( + var _1: U, var _2: V) + }