merging changes from upstream benchmarking branch

This commit is contained in:
Joseph E. Gonzalez 2013-10-13 19:54:09 -07:00
commit 637b67da56
11 changed files with 293 additions and 386 deletions

43
conf/core-site.xml Normal file
View file

@ -0,0 +1,43 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/mnt/ephemeral-hdfs</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://ec2-50-17-7-68.compute-1.amazonaws.com:9000</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>65536</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>false</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.skip.checksum</name>
<value>false</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hadoop-hdfs/dn._PORT</value>
</property>
<property>
<name>dfs.client.file-block-storage-locations.timeout</name>
<value>3000</value>
</property>
</configuration>

View file

@ -1,2 +1,10 @@
# A Spark Worker will be started on each of the machines listed below.
localhost
ec2-23-20-12-62.compute-1.amazonaws.com
ec2-54-205-173-19.compute-1.amazonaws.com
ec2-54-225-4-124.compute-1.amazonaws.com
ec2-23-22-209-112.compute-1.amazonaws.com
ec2-50-16-69-88.compute-1.amazonaws.com
ec2-54-205-163-126.compute-1.amazonaws.com
ec2-54-242-235-95.compute-1.amazonaws.com
ec2-54-211-169-232.compute-1.amazonaws.com
ec2-54-237-31-30.compute-1.amazonaws.com
ec2-54-235-15-124.compute-1.amazonaws.com

View file

@ -1,21 +1,19 @@
#!/usr/bin/env bash
# This file contains environment variables required to run Spark. Copy it as
# spark-env.sh and edit that to configure Spark for your site.
#
# The following variables can be set in this file:
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# spark-env.sh and edit that to configure Spark for your site. At a minimum,
# the following two variables should be set:
# - SCALA_HOME, to point to your Scala installation, or SCALA_LIBRARY_PATH to
# point to the directory for Scala library JARs (if you install Scala as a
# Debian or RPM package, these are in a separate path, often /usr/share/java)
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
# - SPARK_JAVA_OPTS, to set node-specific JVM options for Spark. Note that
# we recommend setting app-wide options in the application's driver program.
# Examples of node-specific options : -Dspark.local.dir, GC options
# Examples of app-wide options : -Dspark.serializer
#
# If using the standalone deploy mode, you can also set variables for it here:
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
# If using the standalone deploy mode, you can also set variables for it:
# - SPARK_MASTER_IP, to bind the master to a different IP address
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes
# to be spawned on every slave machine

View file

@ -156,6 +156,7 @@ object SparkEnv extends Logging {
val serializer = serializerManager.setDefault(
System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
logInfo("spark.serializer is " + System.getProperty("spark.serializer"))
val closureSerializer = serializerManager.get(
System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))

View file

@ -61,14 +61,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
// Allow the user to register their own classes by setting spark.kryo.registrator
try {
Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
logDebug("Running user registrator: " + regCls)
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
} catch {
case _: Exception => println("Failed to register spark.kryo.registrator")
Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
logDebug("Running user registrator: " + regCls)
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
kryo.setClassLoader(classLoader)
@ -116,7 +112,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
}
}
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance with Logging {
val kryo = ks.newKryo()
val output = ks.newKryoOutput()
val input = ks.newKryoInput()

View file

@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
* instance of the serializer object has been created, the get method returns that instead of
* creating a new one.
*/
private[spark] class SerializerManager {
private[spark] class SerializerManager extends org.apache.spark.Logging {
private val serializers = new ConcurrentHashMap[String, Serializer]
private var _default: Serializer = _

View file

@ -6,37 +6,6 @@ import org.apache.spark._
object Analytics extends Logging {
// def main(args: Array[String]) {
// //pregelPagerank()
// }
// /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
// */
// // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// // // Compute the out degree of each vertex
// // val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
// // (vertex, deg) => (deg.getOrElse(0), 1.0F)
// // )
// // GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)(
// // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
// // (a: Float, b: Float) => a + b, // merge
// // (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply
// // numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
// // }
// def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// // Compute the out degree of each vertex
// val pagerankGraph = graph.updateVertices[Int, (Int, Double)](graph.outDegrees,
// (vertex, deg) => (deg.getOrElse(0), 1.0)
// )
// GraphLab.iterateGA2[(Int, Double), ED, Double](pagerankGraph)(
// (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
// (a: Double, b: Double) => a + b, // merge
// 0.0, // default
// (vertex, a: Double) => (vertex.data._1, (0.15 + 0.85 * a)), // apply
// numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
// }
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
@ -96,346 +65,239 @@ object Analytics extends Logging {
* lowest vertex id in the connected component containing
* that vertex.
*/
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]) = {
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
GraphLab.iterate(ccGraph)(
(me_id, edge) => edge.otherVertex(me_id).data, // gather
(a: Vid, b: Vid) => math.min(a, b), // merge
(id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply
(me_id, edge) => (edge.vertex(me_id).data < edge.otherVertex(me_id).data), // scatter
numIter,
gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both
)
}
// /**
// * Compute the shortest path to a set of markers
// */
// def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = {
// val sourceSet = sources.toSet
// val spGraph = graph.mapVertices {
// case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
// }
// GraphLab.iterateGA[Float, Float, Float](spGraph)(
// (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
// (a: Float, b: Float) => math.min(a, b), // merge
// (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply
// numIter,
// gatherDirection = EdgeDirection.In)
// }
def main(args: Array[String]) = {
val host = args(0)
val taskType = args(1)
val fname = args(2)
val options = args.drop(3).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
// // /**
// // * Compute the connected component membership of each vertex
// // * and return an RDD with the vertex value containing the
// // * lowest vertex id in the connected component containing
// // * that vertex.
// // */
// // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
// // numIter: Int = Int.MaxValue) = {
def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = {
loggers.map{
loggerName =>
val logger = org.apache.log4j.Logger.getLogger(loggerName)
val prevLevel = logger.getLevel()
logger.setLevel(level)
loggerName -> prevLevel
}.toMap
}
// setLogLevels(org.apache.log4j.Level.DEBUG, Seq("org.apache.spark"))
// // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) })
// // val edges = graph.edges // .mapValues(v => None)
// // val ccGraph = new Graph(vertices, edges)
val serializer = "org.apache.spark.serializer.KryoSerializer"
System.setProperty("spark.serializer", serializer)
//System.setProperty("spark.shuffle.compress", "false")
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
// // ccGraph.iterateDynamic(
// // (me_id, edge) => edge.otherVertex(me_id).data, // gather
// // (a: Int, b: Int) => math.min(a, b), // merge
// // Integer.MAX_VALUE,
// // (v, a: Int) => math.min(v.data, a), // apply
// // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter
// // numIter,
// // gatherEdges = EdgeDirection.Both,
// // scatterEdges = EdgeDirection.Both).vertices
// // //
// // // graph_ret.vertices.collect.foreach(println)
// // // graph_ret.edges.take(10).foreach(println)
// // }
taskType match {
case "pagerank" => {
var numIter = Int.MaxValue
var isDynamic = false
var tol:Float = 0.001F
var outFname = ""
var numVPart = 4
var numEPart = 4
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
if(!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!")
sys.exit(1)
}
println("======================================")
println("| PageRank |")
println("--------------------------------------")
println(" Using parameters:")
println(" \tDynamic: " + isDynamic)
if(isDynamic) println(" \t |-> Tolerance: " + tol)
println(" \tNumIter: " + numIter)
println("======================================")
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
val startTime = System.currentTimeMillis
logInfo("GRAPHX: starting tasks")
logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
logInfo("GRAPHX: Number of edges " + graph.edges.count)
val pr = Analytics.pagerank(graph, numIter)
// val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// else Analytics.pagerank(graph, numIter)
logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) )
if (!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname)
pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
}
logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
sc.stop()
}
case "cc" => {
var numIter = Int.MaxValue
var numVPart = 4
var numEPart = 4
var isDynamic = false
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case ("numEPart", v) => numEPart = v.toInt
case ("numVPart", v) => numVPart = v.toInt
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
if(!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!")
sys.exit(1)
}
println("======================================")
println("| Connected Components |")
println("--------------------------------------")
println(" Using parameters:")
println(" \tDynamic: " + isDynamic)
println(" \tNumIter: " + numIter)
println("======================================")
val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
//val graph = GraphLoader.textFile(sc, fname, a => 1.0F)
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
val cc = Analytics.connectedComponents(graph, numIter)
//val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter)
println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
sc.stop()
}
//
// case "shortestpath" => {
//
// var numIter = Int.MaxValue
// var isDynamic = true
// var sources: List[Int] = List.empty
//
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case ("source", v) => sources ++= List(v.toInt)
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
//
//
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
//
// if(sources.isEmpty) {
// println("No sources provided!")
// sys.exit(1)
// }
//
// println("======================================")
// println("| Shortest Path |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// println(" \tNumIter: " + numIter)
// println(" \tSources: [" + sources.mkString(", ") + "]")
// println("======================================")
//
// val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
// val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
// //val sp = Analytics.shortestPath(graph, sources, numIter)
// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// // else Analytics.shortestPath(graph, sources, numIter)
// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
//
// sc.stop()
// }
// // /**
// // * Compute the shortest path to a set of markers
// // */
// // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float],
// // sources: List[Int], numIter: Int) = {
// // val sourceSet = sources.toSet
// // val vertices = graph.vertices.mapPartitions(
// // iter => iter.map {
// // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) )
// // });
// case "als" => {
// // val edges = graph.edges // .mapValues(v => None)
// // val spGraph = new Graph(vertices, edges)
// var numIter = 5
// var lambda = 0.01
// var latentK = 10
// var usersFname = "usersFactors.tsv"
// var moviesFname = "moviesFname.tsv"
// var numVPart = 4
// var numEPart = 4
// // val niterations = Int.MaxValue
// // spGraph.iterateDynamic(
// // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
// // (a: Float, b: Float) => math.min(a, b), // merge
// // Float.MaxValue,
// // (v, a: Float) => math.min(v.data, a), // apply
// // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter
// // numIter,
// // gatherEdges = EdgeDirection.In,
// // scatterEdges = EdgeDirection.Out).vertices
// // }
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("lambda", v) => lambda = v.toDouble
// case ("latentK", v) => latentK = v.toInt
// case ("usersFname", v) => usersFname = v
// case ("moviesFname", v) => moviesFname = v
// case ("numVPart", v) => numVPart = v.toInt
// case ("numEPart", v) => numEPart = v.toInt
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// println("======================================")
// println("| Alternating Least Squares |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tNumIter: " + numIter)
// println(" \tLambda: " + lambda)
// println(" \tLatentK: " + latentK)
// println(" \tusersFname: " + usersFname)
// println(" \tmoviesFname: " + moviesFname)
// println("======================================")
// val sc = new SparkContext(host, "ALS(" + fname + ")")
// val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
// graph.numVPart = numVPart
// graph.numEPart = numEPart
// val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
// assert(maxUser < minMovie)
// val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
// factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
// .saveAsTextFile(usersFname)
// factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
// .saveAsTextFile(moviesFname)
// sc.stop()
// }
// // /**
// // *
// // */
// // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double],
// // latentK: Int, lambda: Double, numIter: Int) = {
// // val vertices = graph.vertices.mapPartitions( _.map {
// // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } )
// // }).cache
// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// // val edges = graph.edges // .mapValues(v => None)
// // val alsGraph = new Graph(vertices, edges)
// // alsGraph.numVPart = graph.numVPart
// // alsGraph.numEPart = graph.numEPart
// // val niterations = Int.MaxValue
// // alsGraph.iterateDynamic[(Array[Double], Array[Double])](
// // (me_id, edge) => { // gather
// // val X = edge.otherVertex(me_id).data
// // val y = edge.data
// // val Xy = X.map(_ * y)
// // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray
// // (Xy, XtX)
// // },
// // (a, b) => {
// // // The difference between the while loop and the zip is a FACTOR OF TWO in overall
// // // runtime
// // var i = 0
// // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 }
// // i = 0
// // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 }
// // a
// // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r })
// // },
// // (Array.empty[Double], Array.empty[Double]), // default value is empty
// // (vertex, accum) => { // apply
// // val XyArray = accum._1
// // val XtXArray = accum._2
// // if(XyArray.isEmpty) vertex.data // no neighbors
// // else {
// // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) =>
// // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) +
// // (if(i == j) lambda else 1.0F) //regularization
// // }
// // val Xy = DenseMatrix.create(latentK,1,XyArray)
// // val w = XtX \ Xy
// // w.data
// // }
// // },
// // (me_id, edge) => true,
// // numIter,
// // gatherEdges = EdgeDirection.Both,
// // scatterEdges = EdgeDirection.Both,
// // vertex => vertex.id < maxUser).vertices
// // }
// def main(args: Array[String]) = {
// val host = args(0)
// val taskType = args(1)
// val fname = args(2)
// val options = args.drop(3).map { arg =>
// arg.dropWhile(_ == '-').split('=') match {
// case Array(opt, v) => (opt -> v)
// case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
// }
// }
// System.setProperty("spark.serializer", "spark.KryoSerializer")
// //System.setProperty("spark.shuffle.compress", "false")
// System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
// taskType match {
// case "pagerank" => {
// var numIter = Int.MaxValue
// var isDynamic = false
// var tol:Float = 0.001F
// var outFname = ""
// var numVPart = 4
// var numEPart = 4
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case ("tol", v) => tol = v.toFloat
// case ("output", v) => outFname = v
// case ("numVPart", v) => numVPart = v.toInt
// case ("numEPart", v) => numEPart = v.toInt
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// println("======================================")
// println("| PageRank |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// if(isDynamic) println(" \t |-> Tolerance: " + tol)
// println(" \tNumIter: " + numIter)
// println("======================================")
// val sc = new SparkContext(host, "PageRank(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
// val startTime = System.currentTimeMillis
// logInfo("GRAPHX: starting tasks")
// logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
// logInfo("GRAPHX: Number of edges " + graph.edges.count)
// val pr = Analytics.pagerank(graph, numIter)
// // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// // else Analytics.pagerank(graph, numIter)
// logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
// if (!outFname.isEmpty) {
// println("Saving pageranks of pages to " + outFname)
// pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
// }
// logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
// sc.stop()
// }
// case "cc" => {
// var numIter = Int.MaxValue
// var isDynamic = false
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// println("======================================")
// println("| Connected Components |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// println(" \tNumIter: " + numIter)
// println("======================================")
// val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => 1.0F)
// val cc = Analytics.connectedComponents(graph, numIter)
// // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// // else Analytics.connectedComponents(graph, numIter)
// println("Components: " + cc.vertices.map(_.data).distinct())
// sc.stop()
// }
// case "shortestpath" => {
// var numIter = Int.MaxValue
// var isDynamic = true
// var sources: List[Int] = List.empty
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("dynamic", v) => isDynamic = v.toBoolean
// case ("source", v) => sources ++= List(v.toInt)
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// if(!isDynamic && numIter == Int.MaxValue) {
// println("Set number of iterations!")
// sys.exit(1)
// }
// if(sources.isEmpty) {
// println("No sources provided!")
// sys.exit(1)
// }
// println("======================================")
// println("| Shortest Path |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tDynamic: " + isDynamic)
// println(" \tNumIter: " + numIter)
// println(" \tSources: [" + sources.mkString(", ") + "]")
// println("======================================")
// val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
// val sp = Analytics.shortestPath(graph, sources, numIter)
// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// // else Analytics.shortestPath(graph, sources, numIter)
// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
// sc.stop()
// }
// // case "als" => {
// // var numIter = 5
// // var lambda = 0.01
// // var latentK = 10
// // var usersFname = "usersFactors.tsv"
// // var moviesFname = "moviesFname.tsv"
// // var numVPart = 4
// // var numEPart = 4
// // options.foreach{
// // case ("numIter", v) => numIter = v.toInt
// // case ("lambda", v) => lambda = v.toDouble
// // case ("latentK", v) => latentK = v.toInt
// // case ("usersFname", v) => usersFname = v
// // case ("moviesFname", v) => moviesFname = v
// // case ("numVPart", v) => numVPart = v.toInt
// // case ("numEPart", v) => numEPart = v.toInt
// // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// // }
// // println("======================================")
// // println("| Alternating Least Squares |")
// // println("--------------------------------------")
// // println(" Using parameters:")
// // println(" \tNumIter: " + numIter)
// // println(" \tLambda: " + lambda)
// // println(" \tLatentK: " + latentK)
// // println(" \tusersFname: " + usersFname)
// // println(" \tmoviesFname: " + moviesFname)
// // println("======================================")
// // val sc = new SparkContext(host, "ALS(" + fname + ")")
// // val graph = Graph.textFile(sc, fname, a => a(0).toDouble )
// // graph.numVPart = numVPart
// // graph.numEPart = numEPart
// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
// // assert(maxUser < minMovie)
// // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
// // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
// // .saveAsTextFile(usersFname)
// // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
// // .saveAsTextFile(moviesFname)
// // sc.stop()
// // }
// case _ => {
// println("Invalid task type.")
// }
// }
// }
case _ => {
println("Invalid task type.")
}
}
}
// /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
@ -649,7 +511,7 @@ object Analytics extends Logging {
// val sc = new SparkContext(host, "PageRank(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache()
// val graph = GraphLoader.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache()
// val startTime = System.currentTimeMillis
// logInfo("GRAPHX: starting tasks")
@ -692,7 +554,7 @@ object Analytics extends Logging {
// println("======================================")
// val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => 1.0)
// val graph = GraphLoader.textFile(sc, fname, a => 1.0)
// val cc = Analytics.connectedComponents(graph, numIter)
// // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// // else Analytics.connectedComponents(graph, numIter)
@ -735,7 +597,7 @@ object Analytics extends Logging {
// println("======================================")
// val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) )
// val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) )
// val sp = Analytics.shortestPath(graph, sources, numIter)
// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// // else Analytics.shortestPath(graph, sources, numIter)
@ -778,7 +640,7 @@ object Analytics extends Logging {
// println("======================================")
// val sc = new SparkContext(host, "ALS(" + fname + ")")
// val graph = Graph.textFile(sc, fname, a => a(0).toDouble )
// val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
// graph.numVPart = numVPart
// graph.numEPart = numEPart

View file

@ -4,7 +4,7 @@ import com.esotericsoftware.kryo.Kryo
import org.apache.spark.graph.impl.MessageToPartition
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.graph.impl._
class GraphKryoRegistrator extends KryoRegistrator {
@ -13,6 +13,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MutableTuple2[Object, Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[(Vid, Object)])
kryo.register(classOf[EdgePartition[Object]])
// This avoids a large number of hash table lookups.
kryo.setReferences(false)

View file

@ -20,7 +20,7 @@ object GraphLoader {
: GraphImpl[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path).flatMap { line =>
val edges = sc.textFile(path, minEdgePartitions).flatMap { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if(lineArray.length < 2) {

View file

@ -526,7 +526,8 @@ def apply[VD: ClassManifest, ED: ClassManifest](
.map { e =>
// Random partitioning based on the source vertex id.
// val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions)
val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
// val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
val part: Pid = randomVertexCut(e.src, e.dst, numPartitions)
//val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
// Should we be using 3-tuple or an optimized class
@ -642,7 +643,6 @@ def apply[VD: ClassManifest, ED: ClassManifest](
// protected def createVTable[VD: ClassManifest, ED: ClassManifest](
// eTable: IndexedRDD[Pid, EdgePartition[ED]],
// vid2pid: Index

View file

@ -63,9 +63,6 @@ object GraphGenerators {
}
// For now just writes graph to a file. Eventually
// it will return a spark.graph.Graph
// Right now it just generates a bunch of edges where
// the edge data is the weight (default 1)