diff --git a/conf/core-site.xml b/conf/core-site.xml new file mode 100644 index 0000000000..eefd875fc8 --- /dev/null +++ b/conf/core-site.xml @@ -0,0 +1,43 @@ + + + + + + + + + hadoop.tmp.dir + /mnt/ephemeral-hdfs + + + + fs.default.name + hdfs://ec2-50-17-7-68.compute-1.amazonaws.com:9000 + + + + io.file.buffer.size + 65536 + + + + dfs.client.read.shortcircuit + false + + + + dfs.client.read.shortcircuit.skip.checksum + false + + + + dfs.domain.socket.path + /var/run/hadoop-hdfs/dn._PORT + + + + dfs.client.file-block-storage-locations.timeout + 3000 + + + diff --git a/conf/slaves b/conf/slaves index da0a01343d..728d22ac2e 100644 --- a/conf/slaves +++ b/conf/slaves @@ -1,2 +1,10 @@ -# A Spark Worker will be started on each of the machines listed below. -localhost \ No newline at end of file +ec2-23-20-12-62.compute-1.amazonaws.com +ec2-54-205-173-19.compute-1.amazonaws.com +ec2-54-225-4-124.compute-1.amazonaws.com +ec2-23-22-209-112.compute-1.amazonaws.com +ec2-50-16-69-88.compute-1.amazonaws.com +ec2-54-205-163-126.compute-1.amazonaws.com +ec2-54-242-235-95.compute-1.amazonaws.com +ec2-54-211-169-232.compute-1.amazonaws.com +ec2-54-237-31-30.compute-1.amazonaws.com +ec2-54-235-15-124.compute-1.amazonaws.com diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index 0a35ee7c79..b8936314ec 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -1,21 +1,19 @@ #!/usr/bin/env bash # This file contains environment variables required to run Spark. Copy it as -# spark-env.sh and edit that to configure Spark for your site. -# -# The following variables can be set in this file: -# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node +# spark-env.sh and edit that to configure Spark for your site. At a minimum, +# the following two variables should be set: +# - SCALA_HOME, to point to your Scala installation, or SCALA_LIBRARY_PATH to +# point to the directory for Scala library JARs (if you install Scala as a +# Debian or RPM package, these are in a separate path, often /usr/share/java) # - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos -# - SPARK_JAVA_OPTS, to set node-specific JVM options for Spark. Note that -# we recommend setting app-wide options in the application's driver program. -# Examples of node-specific options : -Dspark.local.dir, GC options -# Examples of app-wide options : -Dspark.serializer # -# If using the standalone deploy mode, you can also set variables for it here: -# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname +# If using the standalone deploy mode, you can also set variables for it: +# - SPARK_MASTER_IP, to bind the master to a different IP address # - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports # - SPARK_WORKER_CORES, to set the number of cores to use on this machine # - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g) # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT -# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node +# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes +# to be spawned on every slave machine diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 29968c273c..9b8384bcbb 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -156,6 +156,7 @@ object SparkEnv extends Logging { val serializer = serializerManager.setDefault( System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer")) + logInfo("spark.serializer is " + System.getProperty("spark.serializer")) val closureSerializer = serializerManager.get( System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 24ef204aa1..3feafde8b6 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -61,14 +61,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) // Allow the user to register their own classes by setting spark.kryo.registrator - try { - Option(System.getProperty("spark.kryo.registrator")).foreach { regCls => - logDebug("Running user registrator: " + regCls) - val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] - reg.registerClasses(kryo) - } - } catch { - case _: Exception => println("Failed to register spark.kryo.registrator") + Option(System.getProperty("spark.kryo.registrator")).foreach { regCls => + logDebug("Running user registrator: " + regCls) + val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] + reg.registerClasses(kryo) } kryo.setClassLoader(classLoader) @@ -116,7 +112,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { +private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance with Logging { val kryo = ks.newKryo() val output = ks.newKryoOutput() val input = ks.newKryoInput() diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 2955986fec..5082730ae3 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap * instance of the serializer object has been created, the get method returns that instead of * creating a new one. */ -private[spark] class SerializerManager { +private[spark] class SerializerManager extends org.apache.spark.Logging { private val serializers = new ConcurrentHashMap[String, Serializer] private var _default: Serializer = _ diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 49498fbcd4..9e2b54541b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -6,37 +6,6 @@ import org.apache.spark._ object Analytics extends Logging { -// def main(args: Array[String]) { -// //pregelPagerank() -// } - - // /** - // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD - // */ - // // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { - // // // Compute the out degree of each vertex - // // val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, - // // (vertex, deg) => (deg.getOrElse(0), 1.0F) - // // ) - // // GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)( - // // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather - // // (a: Float, b: Float) => a + b, // merge - // // (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply - // // numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } - // // } - // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { - // // Compute the out degree of each vertex - // val pagerankGraph = graph.updateVertices[Int, (Int, Double)](graph.outDegrees, - // (vertex, deg) => (deg.getOrElse(0), 1.0) - // ) - // GraphLab.iterateGA2[(Int, Double), ED, Double](pagerankGraph)( - // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather - // (a: Double, b: Double) => a + b, // merge - // 0.0, // default - // (vertex, a: Double) => (vertex.data._1, (0.15 + 0.85 * a)), // apply - // numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } - // } - /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ @@ -96,346 +65,239 @@ object Analytics extends Logging { * lowest vertex id in the connected component containing * that vertex. */ - def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]) = { + def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { val ccGraph = graph.mapVertices { case (vid, _) => vid } - GraphLab.iterate(ccGraph)( (me_id, edge) => edge.otherVertex(me_id).data, // gather (a: Vid, b: Vid) => math.min(a, b), // merge (id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply (me_id, edge) => (edge.vertex(me_id).data < edge.otherVertex(me_id).data), // scatter + numIter, gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both ) } + + def main(args: Array[String]) = { + val host = args(0) + val taskType = args(1) + val fname = args(2) + val options = args.drop(3).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = { + loggers.map{ + loggerName => + val logger = org.apache.log4j.Logger.getLogger(loggerName) + val prevLevel = logger.getLevel() + logger.setLevel(level) + loggerName -> prevLevel + }.toMap + } +// setLogLevels(org.apache.log4j.Level.DEBUG, Seq("org.apache.spark")) - // /** - // * Compute the shortest path to a set of markers - // */ - // def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = { - // val sourceSet = sources.toSet - // val spGraph = graph.mapVertices { - // case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue)) - // } - // GraphLab.iterateGA[Float, Float, Float](spGraph)( - // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather - // (a: Float, b: Float) => math.min(a, b), // merge - // (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply - // numIter, - // gatherDirection = EdgeDirection.In) - // } + val serializer = "org.apache.spark.serializer.KryoSerializer" + System.setProperty("spark.serializer", serializer) + //System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") - // // /** - // // * Compute the connected component membership of each vertex - // // * and return an RDD with the vertex value containing the - // // * lowest vertex id in the connected component containing - // // * that vertex. - // // */ - // // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], - // // numIter: Int = Int.MaxValue) = { + taskType match { + case "pagerank" => { - // // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) }) - // // val edges = graph.edges // .mapValues(v => None) - // // val ccGraph = new Graph(vertices, edges) + var numIter = Int.MaxValue + var isDynamic = false + var tol:Float = 0.001F + var outFname = "" + var numVPart = 4 + var numEPart = 4 - // // ccGraph.iterateDynamic( - // // (me_id, edge) => edge.otherVertex(me_id).data, // gather - // // (a: Int, b: Int) => math.min(a, b), // merge - // // Integer.MAX_VALUE, - // // (v, a: Int) => math.min(v.data, a), // apply - // // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter - // // numIter, - // // gatherEdges = EdgeDirection.Both, - // // scatterEdges = EdgeDirection.Both).vertices - // // // - // // // graph_ret.vertices.collect.foreach(println) - // // // graph_ret.edges.take(10).foreach(println) - // // } + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("tol", v) => tol = v.toFloat + case ("output", v) => outFname = v + case ("numVPart", v) => numVPart = v.toInt + case ("numEPart", v) => numEPart = v.toInt + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if(!isDynamic && numIter == Int.MaxValue) { + println("Set number of iterations!") + sys.exit(1) + } + println("======================================") + println("| PageRank |") + println("--------------------------------------") + println(" Using parameters:") + println(" \tDynamic: " + isDynamic) + if(isDynamic) println(" \t |-> Tolerance: " + tol) + println(" \tNumIter: " + numIter) + println("======================================") + + val sc = new SparkContext(host, "PageRank(" + fname + ")") + + val graph = GraphLoader.textFile(sc, fname, a => 1.0F, + minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() + + val startTime = System.currentTimeMillis + logInfo("GRAPHX: starting tasks") + logInfo("GRAPHX: Number of vertices " + graph.vertices.count) + logInfo("GRAPHX: Number of edges " + graph.edges.count) + + val pr = Analytics.pagerank(graph, numIter) + // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) + // else Analytics.pagerank(graph, numIter) + logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) ) + if (!outFname.isEmpty) { + println("Saving pageranks of pages to " + outFname) + pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) + } + logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") + sc.stop() + } + + case "cc" => { + + var numIter = Int.MaxValue + var numVPart = 4 + var numEPart = 4 + var isDynamic = false + + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("numEPart", v) => numEPart = v.toInt + case ("numVPart", v) => numVPart = v.toInt + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if(!isDynamic && numIter == Int.MaxValue) { + println("Set number of iterations!") + sys.exit(1) + } + println("======================================") + println("| Connected Components |") + println("--------------------------------------") + println(" Using parameters:") + println(" \tDynamic: " + isDynamic) + println(" \tNumIter: " + numIter) + println("======================================") + + val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") + //val graph = GraphLoader.textFile(sc, fname, a => 1.0F) + val graph = GraphLoader.textFile(sc, fname, a => 1.0F, + minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() + val cc = Analytics.connectedComponents(graph, numIter) + //val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) + // else Analytics.connectedComponents(graph, numIter) + println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) + + sc.stop() + } +// +// case "shortestpath" => { +// +// var numIter = Int.MaxValue +// var isDynamic = true +// var sources: List[Int] = List.empty +// +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("source", v) => sources ++= List(v.toInt) +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } +// +// +// if(!isDynamic && numIter == Int.MaxValue) { +// println("Set number of iterations!") +// sys.exit(1) +// } +// +// if(sources.isEmpty) { +// println("No sources provided!") +// sys.exit(1) +// } +// +// println("======================================") +// println("| Shortest Path |") +// println("--------------------------------------") +// println(" Using parameters:") +// println(" \tDynamic: " + isDynamic) +// println(" \tNumIter: " + numIter) +// println(" \tSources: [" + sources.mkString(", ") + "]") +// println("======================================") +// +// val sc = new SparkContext(host, "ShortestPath(" + fname + ")") +// val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) +// //val sp = Analytics.shortestPath(graph, sources, numIter) +// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) +// // else Analytics.shortestPath(graph, sources, numIter) +// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) +// +// sc.stop() +// } - // // /** - // // * Compute the shortest path to a set of markers - // // */ - // // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float], - // // sources: List[Int], numIter: Int) = { - // // val sourceSet = sources.toSet - // // val vertices = graph.vertices.mapPartitions( - // // iter => iter.map { - // // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) ) - // // }); + // case "als" => { - // // val edges = graph.edges // .mapValues(v => None) - // // val spGraph = new Graph(vertices, edges) + // var numIter = 5 + // var lambda = 0.01 + // var latentK = 10 + // var usersFname = "usersFactors.tsv" + // var moviesFname = "moviesFname.tsv" + // var numVPart = 4 + // var numEPart = 4 - // // val niterations = Int.MaxValue - // // spGraph.iterateDynamic( - // // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather - // // (a: Float, b: Float) => math.min(a, b), // merge - // // Float.MaxValue, - // // (v, a: Float) => math.min(v.data, a), // apply - // // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter - // // numIter, - // // gatherEdges = EdgeDirection.In, - // // scatterEdges = EdgeDirection.Out).vertices - // // } + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("lambda", v) => lambda = v.toDouble + // case ("latentK", v) => latentK = v.toInt + // case ("usersFname", v) => usersFname = v + // case ("moviesFname", v) => moviesFname = v + // case ("numVPart", v) => numVPart = v.toInt + // case ("numEPart", v) => numEPart = v.toInt + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + // println("======================================") + // println("| Alternating Least Squares |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tNumIter: " + numIter) + // println(" \tLambda: " + lambda) + // println(" \tLatentK: " + latentK) + // println(" \tusersFname: " + usersFname) + // println(" \tmoviesFname: " + moviesFname) + // println("======================================") + + // val sc = new SparkContext(host, "ALS(" + fname + ")") + // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble ) + // graph.numVPart = numVPart + // graph.numEPart = numEPart + + // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) + // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_)) + // assert(maxUser < minMovie) + + // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache + // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(usersFname) + // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(moviesFname) + + // sc.stop() + // } - // // /** - // // * - // // */ - // // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double], - // // latentK: Int, lambda: Double, numIter: Int) = { - // // val vertices = graph.vertices.mapPartitions( _.map { - // // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } ) - // // }).cache - // // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) - // // val edges = graph.edges // .mapValues(v => None) - // // val alsGraph = new Graph(vertices, edges) - // // alsGraph.numVPart = graph.numVPart - // // alsGraph.numEPart = graph.numEPart - - // // val niterations = Int.MaxValue - // // alsGraph.iterateDynamic[(Array[Double], Array[Double])]( - // // (me_id, edge) => { // gather - // // val X = edge.otherVertex(me_id).data - // // val y = edge.data - // // val Xy = X.map(_ * y) - // // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray - // // (Xy, XtX) - // // }, - // // (a, b) => { - // // // The difference between the while loop and the zip is a FACTOR OF TWO in overall - // // // runtime - // // var i = 0 - // // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 } - // // i = 0 - // // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 } - // // a - // // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r }) - // // }, - // // (Array.empty[Double], Array.empty[Double]), // default value is empty - // // (vertex, accum) => { // apply - // // val XyArray = accum._1 - // // val XtXArray = accum._2 - // // if(XyArray.isEmpty) vertex.data // no neighbors - // // else { - // // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) => - // // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) + - // // (if(i == j) lambda else 1.0F) //regularization - // // } - // // val Xy = DenseMatrix.create(latentK,1,XyArray) - // // val w = XtX \ Xy - // // w.data - // // } - // // }, - // // (me_id, edge) => true, - // // numIter, - // // gatherEdges = EdgeDirection.Both, - // // scatterEdges = EdgeDirection.Both, - // // vertex => vertex.id < maxUser).vertices - // // } - - // def main(args: Array[String]) = { - // val host = args(0) - // val taskType = args(1) - // val fname = args(2) - // val options = args.drop(3).map { arg => - // arg.dropWhile(_ == '-').split('=') match { - // case Array(opt, v) => (opt -> v) - // case _ => throw new IllegalArgumentException("Invalid argument: " + arg) - // } - // } - - // System.setProperty("spark.serializer", "spark.KryoSerializer") - // //System.setProperty("spark.shuffle.compress", "false") - // System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator") - - // taskType match { - // case "pagerank" => { - - // var numIter = Int.MaxValue - // var isDynamic = false - // var tol:Float = 0.001F - // var outFname = "" - // var numVPart = 4 - // var numEPart = 4 - - // options.foreach{ - // case ("numIter", v) => numIter = v.toInt - // case ("dynamic", v) => isDynamic = v.toBoolean - // case ("tol", v) => tol = v.toFloat - // case ("output", v) => outFname = v - // case ("numVPart", v) => numVPart = v.toInt - // case ("numEPart", v) => numEPart = v.toInt - // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - // } - - // if(!isDynamic && numIter == Int.MaxValue) { - // println("Set number of iterations!") - // sys.exit(1) - // } - // println("======================================") - // println("| PageRank |") - // println("--------------------------------------") - // println(" Using parameters:") - // println(" \tDynamic: " + isDynamic) - // if(isDynamic) println(" \t |-> Tolerance: " + tol) - // println(" \tNumIter: " + numIter) - // println("======================================") - - // val sc = new SparkContext(host, "PageRank(" + fname + ")") - - // val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() - - // val startTime = System.currentTimeMillis - // logInfo("GRAPHX: starting tasks") - // logInfo("GRAPHX: Number of vertices " + graph.vertices.count) - // logInfo("GRAPHX: Number of edges " + graph.edges.count) - - // val pr = Analytics.pagerank(graph, numIter) - // // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) - // // else Analytics.pagerank(graph, numIter) - // logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) ) - // if (!outFname.isEmpty) { - // println("Saving pageranks of pages to " + outFname) - // pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) - // } - // logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") - // sc.stop() - // } - - // case "cc" => { - - // var numIter = Int.MaxValue - // var isDynamic = false - - // options.foreach{ - // case ("numIter", v) => numIter = v.toInt - // case ("dynamic", v) => isDynamic = v.toBoolean - // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - // } - - // if(!isDynamic && numIter == Int.MaxValue) { - // println("Set number of iterations!") - // sys.exit(1) - // } - // println("======================================") - // println("| Connected Components |") - // println("--------------------------------------") - // println(" Using parameters:") - // println(" \tDynamic: " + isDynamic) - // println(" \tNumIter: " + numIter) - // println("======================================") - - // val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") - // val graph = Graph.textFile(sc, fname, a => 1.0F) - // val cc = Analytics.connectedComponents(graph, numIter) - // // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) - // // else Analytics.connectedComponents(graph, numIter) - // println("Components: " + cc.vertices.map(_.data).distinct()) - - // sc.stop() - // } - - // case "shortestpath" => { - - // var numIter = Int.MaxValue - // var isDynamic = true - // var sources: List[Int] = List.empty - - // options.foreach{ - // case ("numIter", v) => numIter = v.toInt - // case ("dynamic", v) => isDynamic = v.toBoolean - // case ("source", v) => sources ++= List(v.toInt) - // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - // } - - - // if(!isDynamic && numIter == Int.MaxValue) { - // println("Set number of iterations!") - // sys.exit(1) - // } - - // if(sources.isEmpty) { - // println("No sources provided!") - // sys.exit(1) - // } - - // println("======================================") - // println("| Shortest Path |") - // println("--------------------------------------") - // println(" Using parameters:") - // println(" \tDynamic: " + isDynamic) - // println(" \tNumIter: " + numIter) - // println(" \tSources: [" + sources.mkString(", ") + "]") - // println("======================================") - - // val sc = new SparkContext(host, "ShortestPath(" + fname + ")") - // val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) - // val sp = Analytics.shortestPath(graph, sources, numIter) - // // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) - // // else Analytics.shortestPath(graph, sources, numIter) - // println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) - - // sc.stop() - // } - - - // // case "als" => { - - // // var numIter = 5 - // // var lambda = 0.01 - // // var latentK = 10 - // // var usersFname = "usersFactors.tsv" - // // var moviesFname = "moviesFname.tsv" - // // var numVPart = 4 - // // var numEPart = 4 - - // // options.foreach{ - // // case ("numIter", v) => numIter = v.toInt - // // case ("lambda", v) => lambda = v.toDouble - // // case ("latentK", v) => latentK = v.toInt - // // case ("usersFname", v) => usersFname = v - // // case ("moviesFname", v) => moviesFname = v - // // case ("numVPart", v) => numVPart = v.toInt - // // case ("numEPart", v) => numEPart = v.toInt - // // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - // // } - - // // println("======================================") - // // println("| Alternating Least Squares |") - // // println("--------------------------------------") - // // println(" Using parameters:") - // // println(" \tNumIter: " + numIter) - // // println(" \tLambda: " + lambda) - // // println(" \tLatentK: " + latentK) - // // println(" \tusersFname: " + usersFname) - // // println(" \tmoviesFname: " + moviesFname) - // // println("======================================") - - // // val sc = new SparkContext(host, "ALS(" + fname + ")") - // // val graph = Graph.textFile(sc, fname, a => a(0).toDouble ) - // // graph.numVPart = numVPart - // // graph.numEPart = numEPart - - // // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) - // // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_)) - // // assert(maxUser < minMovie) - - // // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache - // // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t")) - // // .saveAsTextFile(usersFname) - // // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t")) - // // .saveAsTextFile(moviesFname) - - // // sc.stop() - // // } - - - // case _ => { - // println("Invalid task type.") - // } - // } - // } + case _ => { + println("Invalid task type.") + } + } + } // /** // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD @@ -649,7 +511,7 @@ object Analytics extends Logging { // val sc = new SparkContext(host, "PageRank(" + fname + ")") - // val graph = Graph.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache() + // val graph = GraphLoader.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache() // val startTime = System.currentTimeMillis // logInfo("GRAPHX: starting tasks") @@ -692,7 +554,7 @@ object Analytics extends Logging { // println("======================================") // val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") - // val graph = Graph.textFile(sc, fname, a => 1.0) + // val graph = GraphLoader.textFile(sc, fname, a => 1.0) // val cc = Analytics.connectedComponents(graph, numIter) // // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) // // else Analytics.connectedComponents(graph, numIter) @@ -735,7 +597,7 @@ object Analytics extends Logging { // println("======================================") // val sc = new SparkContext(host, "ShortestPath(" + fname + ")") - // val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) ) + // val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) ) // val sp = Analytics.shortestPath(graph, sources, numIter) // // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) // // else Analytics.shortestPath(graph, sources, numIter) @@ -778,7 +640,7 @@ object Analytics extends Logging { // println("======================================") // val sc = new SparkContext(host, "ALS(" + fname + ")") - // val graph = Graph.textFile(sc, fname, a => a(0).toDouble ) + // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble ) // graph.numVPart = numVPart // graph.numEPart = numEPart diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index 297506e1e3..2d72789878 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -4,7 +4,7 @@ import com.esotericsoftware.kryo.Kryo import org.apache.spark.graph.impl.MessageToPartition import org.apache.spark.serializer.KryoRegistrator - +import org.apache.spark.graph.impl._ class GraphKryoRegistrator extends KryoRegistrator { @@ -13,6 +13,8 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[Edge[Object]]) kryo.register(classOf[MutableTuple2[Object, Object]]) kryo.register(classOf[MessageToPartition[Object]]) + kryo.register(classOf[(Vid, Object)]) + kryo.register(classOf[EdgePartition[Object]]) // This avoids a large number of hash table lookups. kryo.setReferences(false) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index 903e407b2d..53e7d0a7bb 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -20,7 +20,7 @@ object GraphLoader { : GraphImpl[Int, ED] = { // Parse the edge data table - val edges = sc.textFile(path).flatMap { line => + val edges = sc.textFile(path, minEdgePartitions).flatMap { line => if (!line.isEmpty && line(0) != '#') { val lineArray = line.split("\\s+") if(lineArray.length < 2) { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 45dc863a6b..71f360bf04 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -526,7 +526,8 @@ def apply[VD: ClassManifest, ED: ClassManifest]( .map { e => // Random partitioning based on the source vertex id. // val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions) - val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) + // val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) + val part: Pid = randomVertexCut(e.src, e.dst, numPartitions) //val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) // Should we be using 3-tuple or an optimized class @@ -642,7 +643,6 @@ def apply[VD: ClassManifest, ED: ClassManifest]( - // protected def createVTable[VD: ClassManifest, ED: ClassManifest]( // eTable: IndexedRDD[Pid, EdgePartition[ED]], // vid2pid: Index diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index 01a04e9c39..aba3a7d942 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -63,9 +63,6 @@ object GraphGenerators { } - // For now just writes graph to a file. Eventually - // it will return a spark.graph.Graph - // Right now it just generates a bunch of edges where // the edge data is the weight (default 1)