switching from floats to doubles in pagerank and sssp

This commit is contained in:
Joseph E. Gonzalez 2013-04-16 11:27:56 -07:00
parent 9eec317835
commit 2635416cee

View file

@ -12,13 +12,13 @@ object Analytics {
*/
def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0F)
val pagerankGraph = graph.updateVertices[Int, (Int, Double)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0)
)
GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)(
GraphLab.iterateGA[(Int, Double), ED, Double](pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
(a: Float, b: Float) => a + b, // merge
(vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply
(a: Double, b: Double) => a + b, // merge
(vertex, a: Option[Double]) => (vertex.data._1, (0.15 + 0.85 * a.getOrElse(0.0))), // apply
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
}
@ -27,14 +27,14 @@ object Analytics {
*/
def pregelPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0F)
val pagerankGraph = graph.updateVertices[Int, (Int, Double)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0)
)
Pregel.iterate[(Int, Float), ED, Float](pagerankGraph)(
(vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply
Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)(
(vertex, a: Double) => (vertex.data._1, (0.15 + 0.85 * a)), // apply
(me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather
(a: Float, b: Float) => a + b, // merge
1.0F,
(a: Double, b: Double) => a + b, // merge
1.0,
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
}
@ -42,18 +42,18 @@ object Analytics {
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
tol: Float, maxIter: Int = 10) = {
tol: Double, maxIter: Int = 10) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.updateVertices[Int, (Int, Float, Float)](graph.outDegrees,
(vertex, degIter) => (degIter.sum, 1.0F, 1.0F)
val pagerankGraph = graph.updateVertices[Int, (Int, Double, Double)](graph.outDegrees,
(vertex, degIter) => (degIter.sum, 1.0, 1.0)
)
// Run PageRank
GraphLab.iterateGAS(pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
(a: Float, b: Float) => a + b,
(vertex, a: Option[Float]) =>
(vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F)), vertex.data._2), // apply
(a: Double, b: Double) => a + b,
(vertex, a: Option[Double]) =>
(vertex.data._1, (0.15 + 0.85 * a.getOrElse(0.0)), vertex.data._2), // apply
(me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter
maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) }
}
@ -77,15 +77,15 @@ object Analytics {
/**
* Compute the shortest path to a set of markers
*/
def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = {
def shortestPath[VD: Manifest](graph: Graph[VD, Double], sources: List[Int], numIter: Int) = {
val sourceSet = sources.toSet
val spGraph = graph.mapVertices {
case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0 else Double.MaxValue))
}
GraphLab.iterateGA[Float, Float, Float](spGraph)(
GraphLab.iterateGA[Double, Double, Double](spGraph)(
(me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
(a: Float, b: Float) => math.min(a, b), // merge
(v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply
(a: Double, b: Double) => math.min(a, b), // merge
(v, a: Option[Double]) => math.min(v.data, a.getOrElse(Double.MaxValue)), // apply
numIter,
gatherDirection = EdgeDirection.In)
}
@ -123,12 +123,12 @@ object Analytics {
// /**
// * Compute the shortest path to a set of markers
// */
// def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float],
// def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Double],
// sources: List[Int], numIter: Int) = {
// val sourceSet = sources.toSet
// val vertices = graph.vertices.mapPartitions(
// iter => iter.map {
// case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) )
// case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Double.MaxValue) )
// });
// val edges = graph.edges // .mapValues(v => None)
@ -137,9 +137,9 @@ object Analytics {
// val niterations = Int.MaxValue
// spGraph.iterateDynamic(
// (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
// (a: Float, b: Float) => math.min(a, b), // merge
// Float.MaxValue,
// (v, a: Float) => math.min(v.data, a), // apply
// (a: Double, b: Double) => math.min(a, b), // merge
// Double.MaxValue,
// (v, a: Double) => math.min(v.data, a), // apply
// (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter
// numIter,
// gatherEdges = EdgeDirection.In,
@ -224,7 +224,7 @@ object Analytics {
var numIter = Int.MaxValue
var isDynamic = false
var tol:Float = 0.001F
var tol:Double = 0.001
var outFname = ""
var numVPart = 4
var numEPart = 4
@ -232,7 +232,7 @@ object Analytics {
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case ("tol", v) => tol = v.toFloat
case ("tol", v) => tol = v.toDouble
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
@ -253,7 +253,7 @@ object Analytics {
println("======================================")
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart)
val graph = Graph.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart)
val startTime = System.currentTimeMillis
val pr = Analytics.pagerank(graph, numIter)
@ -292,7 +292,7 @@ object Analytics {
println("======================================")
val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
val graph = Graph.textFile(sc, fname, a => 1.0F)
val graph = Graph.textFile(sc, fname, a => 1.0)
val cc = Analytics.connectedComponents(graph, numIter)
// val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter)
@ -335,7 +335,7 @@ object Analytics {
println("======================================")
val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) )
val sp = Analytics.shortestPath(graph, sources, numIter)
// val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// else Analytics.shortestPath(graph, sources, numIter)