Minor cleanup.

This commit is contained in:
Reynold Xin 2013-04-05 14:21:24 +08:00
parent 7134856351
commit 9764e579b8
4 changed files with 23 additions and 60 deletions

View file

@ -2,25 +2,9 @@ package spark.graph
import spark._
import spark.SparkContext._
// import com.esotericsoftware.kryo._
// import breeze.linalg._
// class AnalyticsKryoRegistrator extends KryoRegistrator {
// def registerClasses(kryo: Kryo) {
// println("registering kryo")
// kryo.register(classOf[(Int,Float,Float)])
// Graph.kryoRegister[(Int,Float,Float), Float](kryo)
// Graph.kryoRegister[(Int,Float), Float](kryo)
// Graph.kryoRegister[Int, Float](kryo)
// Graph.kryoRegister[Float, Float](kryo)
// kryo.setReferences(false);
// }
// }
object Analytics {
@ -111,14 +95,6 @@ object Analytics {
gatherDirection = EdgeDirection.In)
}
// /**
// * Compute the connected component membership of each vertex
// * and return an RDD with the vertex value containing the
@ -250,9 +226,6 @@ object Analytics {
// //System.setProperty("spark.shuffle.compress", "false")
// System.setProperty("spark.kryo.registrator", "spark.graphlab.AnalyticsKryoRegistrator")
taskType match {
case "pagerank" => {

View file

@ -43,12 +43,6 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl
}
private[graph]
case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U,
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) V](
var _1: U, var _2: V)
/**
* A Graph RDD that supports computation on graphs.
*/
@ -61,26 +55,23 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
_rawETable: RDD[(Pid, EdgePartition[ED])]) {
def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = {
this(
Graph.DEFAULT_NUM_VERTEX_PARTITIONS, Graph.DEFAULT_NUM_EDGE_PARTITIONS,
vertices, edges,
null, null)
this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null)
}
def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
if (_cached) {
val newgraph = new Graph(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)
newgraph.cache()
(new Graph(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable))
.cache()
} else {
new Graph(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
}
}
def withVertexPartitioner(numVertexPartitions: Int = Graph.DEFAULT_NUM_VERTEX_PARTITIONS) = {
def withVertexPartitioner(numVertexPartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
def withEdgePartitioner(numEdgePartitions: Int = Graph.DEFAULT_NUM_EDGE_PARTITIONS) = {
def withEdgePartitioner(numEdgePartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
@ -139,11 +130,11 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
edgeDirection)
}
def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]): Graph[VD2, ED] = {
def mapVertices[VD2: ClassManifest](f: Vertex[VD] => Vertex[VD2]): Graph[VD2, ED] = {
newGraph(vertices.map(f), edges)
}
def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]): Graph[VD, ED2] = {
def mapEdges[ED2: ClassManifest](f: Edge[ED] => Edge[ED2]): Graph[VD, ED2] = {
newGraph(vertices, edges.map(f))
}
@ -237,7 +228,7 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
}
}
}
vmap.int2ObjectEntrySet().fastIterator().filter{!_.getValue()._2.isEmpty}.map{ entry =>
vmap.int2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
@ -316,14 +307,10 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
object Graph {
val DEFAULT_NUM_VERTEX_PARTITIONS = 5
val DEFAULT_NUM_EDGE_PARTITIONS = 5
/**
* Load an edge list from file initializing the Graph RDD
*/
def textFile[ED: ClassManifest](sc: SparkContext,
fname: String, edgeParser: Array[String] => ED ) = {
def textFile[ED: ClassManifest](sc: SparkContext, fname: String, edgeParser: Array[String] => ED) = {
// Parse the edge data table
val edges = sc.textFile(fname).map { line =>

View file

@ -8,21 +8,17 @@ import spark.KryoRegistrator
class GraphKryoRegistrator extends KryoRegistrator {
def registerClasses(kryo: Kryo) {
kryo.register(classOf[(Int, Float, Float)])
registerClass[(Int, Float, Float), Float](kryo)
registerClass[(Int, Float), Float](kryo)
registerClass[Int, Float](kryo)
registerClass[Float, Float](kryo)
//kryo.register(classOf[(Int, Float, Float)])
registerClass[Int, Int, Int](kryo)
// This avoids a large number of hash table lookups.
kryo.setReferences(false)
}
private def registerClass[VD: Manifest, ED: Manifest](kryo: Kryo) {
//kryo.register(classManifest[VD].erasure)
// kryo.register(classManifest[ED].erasure)
kryo.register(classOf[(Vid, Vid, ED)])
kryo.register(classOf[(Vid, ED)])
//kryo.register(classOf[EdgeBlockRecord[ED]])
private def registerClass[VD: Manifest, ED: Manifest, VD2: Manifest](kryo: Kryo) {
kryo.register(classOf[Vertex[VD]])
kryo.register(classOf[Edge[ED]])
kryo.register(classOf[MutableTuple2[VD, VD2]])
kryo.register(classOf[(Vid, VD2)])
}
}

View file

@ -11,4 +11,11 @@ package object graph {
* Return the default null-like value for a data type T.
*/
def nullValue[T] = null.asInstanceOf[T]
private[graph]
case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U,
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) V](
var _1: U, var _2: V)
}