[SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming guide example snippets from source files instead of hard code them
## What changes were proposed in this pull request? I extract 6 example programs from GraphX programming guide and replace them with `include_example` label. The 6 example programs are: - AggregateMessagesExample.scala - SSSPExample.scala - TriangleCountingExample.scala - ConnectedComponentsExample.scala - ComprehensiveExample.scala - PageRankExample.scala All the example code can run using `bin/run-example graphx.EXAMPLE_NAME` ## How was this patch tested? Manual. Author: WeichenXu <WeichenXu123@outlook.com> Closes #14015 from WeichenXu123/graphx_example_plugin.
This commit is contained in:
parent
192d1f9cf3
commit
0bd7cd18bc
|
@ -603,29 +603,7 @@ slightly unreliable and instead opted for more explicit user control.
|
||||||
In the following example we use the [`aggregateMessages`][Graph.aggregateMessages] operator to
|
In the following example we use the [`aggregateMessages`][Graph.aggregateMessages] operator to
|
||||||
compute the average age of the more senior followers of each user.
|
compute the average age of the more senior followers of each user.
|
||||||
|
|
||||||
{% highlight scala %}
|
{% include_example scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala %}
|
||||||
// Import random graph generation library
|
|
||||||
import org.apache.spark.graphx.util.GraphGenerators
|
|
||||||
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
|
|
||||||
val graph: Graph[Double, Int] =
|
|
||||||
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
|
|
||||||
// Compute the number of older followers and their total age
|
|
||||||
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
|
|
||||||
triplet => { // Map Function
|
|
||||||
if (triplet.srcAttr > triplet.dstAttr) {
|
|
||||||
// Send message to destination vertex containing counter and age
|
|
||||||
triplet.sendToDst(1, triplet.srcAttr)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
// Add counter and age
|
|
||||||
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
|
|
||||||
)
|
|
||||||
// Divide total age by number of older followers to get average age of older followers
|
|
||||||
val avgAgeOfOlderFollowers: VertexRDD[Double] =
|
|
||||||
olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
|
|
||||||
// Display the results
|
|
||||||
avgAgeOfOlderFollowers.collect.foreach(println(_))
|
|
||||||
{% endhighlight %}
|
|
||||||
|
|
||||||
> The `aggregateMessages` operation performs optimally when the messages (and the sums of
|
> The `aggregateMessages` operation performs optimally when the messages (and the sums of
|
||||||
> messages) are constant sized (e.g., floats and addition instead of lists and concatenation).
|
> messages) are constant sized (e.g., floats and addition instead of lists and concatenation).
|
||||||
|
@ -793,29 +771,7 @@ second argument list contains the user defined functions for receiving messages
|
||||||
We can use the Pregel operator to express computation such as single source
|
We can use the Pregel operator to express computation such as single source
|
||||||
shortest path in the following example.
|
shortest path in the following example.
|
||||||
|
|
||||||
{% highlight scala %}
|
{% include_example scala/org/apache/spark/examples/graphx/SSSPExample.scala %}
|
||||||
import org.apache.spark.graphx._
|
|
||||||
// Import random graph generation library
|
|
||||||
import org.apache.spark.graphx.util.GraphGenerators
|
|
||||||
// A graph with edge attributes containing distances
|
|
||||||
val graph: Graph[Long, Double] =
|
|
||||||
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
|
|
||||||
val sourceId: VertexId = 42 // The ultimate source
|
|
||||||
// Initialize the graph such that all vertices except the root have distance infinity.
|
|
||||||
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
|
|
||||||
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
|
|
||||||
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
|
|
||||||
triplet => { // Send Message
|
|
||||||
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
|
|
||||||
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
|
|
||||||
} else {
|
|
||||||
Iterator.empty
|
|
||||||
}
|
|
||||||
},
|
|
||||||
(a,b) => math.min(a,b) // Merge Message
|
|
||||||
)
|
|
||||||
println(sssp.vertices.collect.mkString("\n"))
|
|
||||||
{% endhighlight %}
|
|
||||||
|
|
||||||
<a name="graph_builders"></a>
|
<a name="graph_builders"></a>
|
||||||
|
|
||||||
|
@ -1009,64 +965,19 @@ GraphX comes with static and dynamic implementations of PageRank as methods on t
|
||||||
|
|
||||||
GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given in `data/graphx/users.txt`, and a set of relationships between users is given in `data/graphx/followers.txt`. We compute the PageRank of each user as follows:
|
GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given in `data/graphx/users.txt`, and a set of relationships between users is given in `data/graphx/followers.txt`. We compute the PageRank of each user as follows:
|
||||||
|
|
||||||
{% highlight scala %}
|
{% include_example scala/org/apache/spark/examples/graphx/PageRankExample.scala %}
|
||||||
// Load the edges as a graph
|
|
||||||
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
|
|
||||||
// Run PageRank
|
|
||||||
val ranks = graph.pageRank(0.0001).vertices
|
|
||||||
// Join the ranks with the usernames
|
|
||||||
val users = sc.textFile("data/graphx/users.txt").map { line =>
|
|
||||||
val fields = line.split(",")
|
|
||||||
(fields(0).toLong, fields(1))
|
|
||||||
}
|
|
||||||
val ranksByUsername = users.join(ranks).map {
|
|
||||||
case (id, (username, rank)) => (username, rank)
|
|
||||||
}
|
|
||||||
// Print the result
|
|
||||||
println(ranksByUsername.collect().mkString("\n"))
|
|
||||||
{% endhighlight %}
|
|
||||||
|
|
||||||
## Connected Components
|
## Connected Components
|
||||||
|
|
||||||
The connected components algorithm labels each connected component of the graph with the ID of its lowest-numbered vertex. For example, in a social network, connected components can approximate clusters. GraphX contains an implementation of the algorithm in the [`ConnectedComponents` object][ConnectedComponents], and we compute the connected components of the example social network dataset from the [PageRank section](#pagerank) as follows:
|
The connected components algorithm labels each connected component of the graph with the ID of its lowest-numbered vertex. For example, in a social network, connected components can approximate clusters. GraphX contains an implementation of the algorithm in the [`ConnectedComponents` object][ConnectedComponents], and we compute the connected components of the example social network dataset from the [PageRank section](#pagerank) as follows:
|
||||||
|
|
||||||
{% highlight scala %}
|
{% include_example scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala %}
|
||||||
// Load the graph as in the PageRank example
|
|
||||||
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
|
|
||||||
// Find the connected components
|
|
||||||
val cc = graph.connectedComponents().vertices
|
|
||||||
// Join the connected components with the usernames
|
|
||||||
val users = sc.textFile("data/graphx/users.txt").map { line =>
|
|
||||||
val fields = line.split(",")
|
|
||||||
(fields(0).toLong, fields(1))
|
|
||||||
}
|
|
||||||
val ccByUsername = users.join(cc).map {
|
|
||||||
case (id, (username, cc)) => (username, cc)
|
|
||||||
}
|
|
||||||
// Print the result
|
|
||||||
println(ccByUsername.collect().mkString("\n"))
|
|
||||||
{% endhighlight %}
|
|
||||||
|
|
||||||
## Triangle Counting
|
## Triangle Counting
|
||||||
|
|
||||||
A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].*
|
A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].*
|
||||||
|
|
||||||
{% highlight scala %}
|
{% include_example scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala %}
|
||||||
// Load the edges in canonical order and partition the graph for triangle count
|
|
||||||
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
|
|
||||||
// Find the triangle count for each vertex
|
|
||||||
val triCounts = graph.triangleCount().vertices
|
|
||||||
// Join the triangle counts with the usernames
|
|
||||||
val users = sc.textFile("data/graphx/users.txt").map { line =>
|
|
||||||
val fields = line.split(",")
|
|
||||||
(fields(0).toLong, fields(1))
|
|
||||||
}
|
|
||||||
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
|
|
||||||
(username, tc)
|
|
||||||
}
|
|
||||||
// Print the result
|
|
||||||
println(triCountByUsername.collect().mkString("\n"))
|
|
||||||
{% endhighlight %}
|
|
||||||
|
|
||||||
|
|
||||||
# Examples
|
# Examples
|
||||||
|
@ -1076,36 +987,4 @@ to important relationships and users, run page-rank on the sub-graph, and
|
||||||
then finally return attributes associated with the top users. I can do
|
then finally return attributes associated with the top users. I can do
|
||||||
all of this in just a few lines with GraphX:
|
all of this in just a few lines with GraphX:
|
||||||
|
|
||||||
{% highlight scala %}
|
{% include_example scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala %}
|
||||||
// Connect to the Spark cluster
|
|
||||||
val sc = new SparkContext("spark://master.amplab.org", "research")
|
|
||||||
|
|
||||||
// Load my user data and parse into tuples of user id and attribute list
|
|
||||||
val users = (sc.textFile("data/graphx/users.txt")
|
|
||||||
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
|
|
||||||
|
|
||||||
// Parse the edge data which is already in userId -> userId format
|
|
||||||
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
|
|
||||||
|
|
||||||
// Attach the user attributes
|
|
||||||
val graph = followerGraph.outerJoinVertices(users) {
|
|
||||||
case (uid, deg, Some(attrList)) => attrList
|
|
||||||
// Some users may not have attributes so we set them as empty
|
|
||||||
case (uid, deg, None) => Array.empty[String]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restrict the graph to users with usernames and names
|
|
||||||
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
|
|
||||||
|
|
||||||
// Compute the PageRank
|
|
||||||
val pagerankGraph = subgraph.pageRank(0.001)
|
|
||||||
|
|
||||||
// Get the attributes of the top pagerank users
|
|
||||||
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
|
|
||||||
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
|
|
||||||
case (uid, attrList, None) => (0.0, attrList.toList)
|
|
||||||
}
|
|
||||||
|
|
||||||
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
|
|
||||||
|
|
||||||
{% endhighlight %}
|
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// scalastyle:off println
|
||||||
|
package org.apache.spark.examples.graphx
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
import org.apache.spark.graphx.{Graph, VertexRDD}
|
||||||
|
import org.apache.spark.graphx.util.GraphGenerators
|
||||||
|
// $example off$
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An example use the [`aggregateMessages`][Graph.aggregateMessages] operator to
|
||||||
|
* compute the average age of the more senior followers of each user
|
||||||
|
* Run with
|
||||||
|
* {{{
|
||||||
|
* bin/run-example graphx.AggregateMessagesExample
|
||||||
|
* }}}
|
||||||
|
*/
|
||||||
|
object AggregateMessagesExample {
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
// Creates a SparkSession.
|
||||||
|
val spark = SparkSession
|
||||||
|
.builder
|
||||||
|
.appName(s"${this.getClass.getSimpleName}")
|
||||||
|
.getOrCreate()
|
||||||
|
val sc = spark.sparkContext
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
// Create a graph with "age" as the vertex property.
|
||||||
|
// Here we use a random graph for simplicity.
|
||||||
|
val graph: Graph[Double, Int] =
|
||||||
|
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
|
||||||
|
// Compute the number of older followers and their total age
|
||||||
|
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
|
||||||
|
triplet => { // Map Function
|
||||||
|
if (triplet.srcAttr > triplet.dstAttr) {
|
||||||
|
// Send message to destination vertex containing counter and age
|
||||||
|
triplet.sendToDst(1, triplet.srcAttr)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Add counter and age
|
||||||
|
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
|
||||||
|
)
|
||||||
|
// Divide total age by number of older followers to get average age of older followers
|
||||||
|
val avgAgeOfOlderFollowers: VertexRDD[Double] =
|
||||||
|
olderFollowers.mapValues( (id, value) =>
|
||||||
|
value match { case (count, totalAge) => totalAge / count } )
|
||||||
|
// Display the results
|
||||||
|
avgAgeOfOlderFollowers.collect.foreach(println(_))
|
||||||
|
// $example off$
|
||||||
|
|
||||||
|
spark.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// scalastyle:on println
|
|
@ -0,0 +1,80 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// scalastyle:off println
|
||||||
|
package org.apache.spark.examples.graphx
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
import org.apache.spark.graphx.GraphLoader
|
||||||
|
// $example off$
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Suppose I want to build a graph from some text files, restrict the graph
|
||||||
|
* to important relationships and users, run page-rank on the sub-graph, and
|
||||||
|
* then finally return attributes associated with the top users.
|
||||||
|
* This example do all of this in just a few lines with GraphX.
|
||||||
|
*
|
||||||
|
* Run with
|
||||||
|
* {{{
|
||||||
|
* bin/run-example graphx.ComprehensiveExample
|
||||||
|
* }}}
|
||||||
|
*/
|
||||||
|
object ComprehensiveExample {
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
// Creates a SparkSession.
|
||||||
|
val spark = SparkSession
|
||||||
|
.builder
|
||||||
|
.appName(s"${this.getClass.getSimpleName}")
|
||||||
|
.getOrCreate()
|
||||||
|
val sc = spark.sparkContext
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
// Load my user data and parse into tuples of user id and attribute list
|
||||||
|
val users = (sc.textFile("data/graphx/users.txt")
|
||||||
|
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
|
||||||
|
|
||||||
|
// Parse the edge data which is already in userId -> userId format
|
||||||
|
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
|
||||||
|
|
||||||
|
// Attach the user attributes
|
||||||
|
val graph = followerGraph.outerJoinVertices(users) {
|
||||||
|
case (uid, deg, Some(attrList)) => attrList
|
||||||
|
// Some users may not have attributes so we set them as empty
|
||||||
|
case (uid, deg, None) => Array.empty[String]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restrict the graph to users with usernames and names
|
||||||
|
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
|
||||||
|
|
||||||
|
// Compute the PageRank
|
||||||
|
val pagerankGraph = subgraph.pageRank(0.001)
|
||||||
|
|
||||||
|
// Get the attributes of the top pagerank users
|
||||||
|
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
|
||||||
|
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
|
||||||
|
case (uid, attrList, None) => (0.0, attrList.toList)
|
||||||
|
}
|
||||||
|
|
||||||
|
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
|
||||||
|
// $example off$
|
||||||
|
|
||||||
|
spark.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// scalastyle:on println
|
|
@ -0,0 +1,68 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// scalastyle:off println
|
||||||
|
package org.apache.spark.examples.graphx
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
import org.apache.spark.graphx.GraphLoader
|
||||||
|
// $example off$
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A connected components algorithm example.
|
||||||
|
* The connected components algorithm labels each connected component of the graph
|
||||||
|
* with the ID of its lowest-numbered vertex.
|
||||||
|
* For example, in a social network, connected components can approximate clusters.
|
||||||
|
* GraphX contains an implementation of the algorithm in the
|
||||||
|
* [`ConnectedComponents` object][ConnectedComponents],
|
||||||
|
* and we compute the connected components of the example social network dataset.
|
||||||
|
*
|
||||||
|
* Run with
|
||||||
|
* {{{
|
||||||
|
* bin/run-example graphx.ConnectedComponentsExample
|
||||||
|
* }}}
|
||||||
|
*/
|
||||||
|
object ConnectedComponentsExample {
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
// Creates a SparkSession.
|
||||||
|
val spark = SparkSession
|
||||||
|
.builder
|
||||||
|
.appName(s"${this.getClass.getSimpleName}")
|
||||||
|
.getOrCreate()
|
||||||
|
val sc = spark.sparkContext
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
// Load the graph as in the PageRank example
|
||||||
|
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
|
||||||
|
// Find the connected components
|
||||||
|
val cc = graph.connectedComponents().vertices
|
||||||
|
// Join the connected components with the usernames
|
||||||
|
val users = sc.textFile("data/graphx/users.txt").map { line =>
|
||||||
|
val fields = line.split(",")
|
||||||
|
(fields(0).toLong, fields(1))
|
||||||
|
}
|
||||||
|
val ccByUsername = users.join(cc).map {
|
||||||
|
case (id, (username, cc)) => (username, cc)
|
||||||
|
}
|
||||||
|
// Print the result
|
||||||
|
println(ccByUsername.collect().mkString("\n"))
|
||||||
|
// $example off$
|
||||||
|
spark.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// scalastyle:on println
|
|
@ -0,0 +1,61 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// scalastyle:off println
|
||||||
|
package org.apache.spark.examples.graphx
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
import org.apache.spark.graphx.GraphLoader
|
||||||
|
// $example off$
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A PageRank example on social network dataset
|
||||||
|
* Run with
|
||||||
|
* {{{
|
||||||
|
* bin/run-example graphx.PageRankExample
|
||||||
|
* }}}
|
||||||
|
*/
|
||||||
|
object PageRankExample {
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
// Creates a SparkSession.
|
||||||
|
val spark = SparkSession
|
||||||
|
.builder
|
||||||
|
.appName(s"${this.getClass.getSimpleName}")
|
||||||
|
.getOrCreate()
|
||||||
|
val sc = spark.sparkContext
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
// Load the edges as a graph
|
||||||
|
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
|
||||||
|
// Run PageRank
|
||||||
|
val ranks = graph.pageRank(0.0001).vertices
|
||||||
|
// Join the ranks with the usernames
|
||||||
|
val users = sc.textFile("data/graphx/users.txt").map { line =>
|
||||||
|
val fields = line.split(",")
|
||||||
|
(fields(0).toLong, fields(1))
|
||||||
|
}
|
||||||
|
val ranksByUsername = users.join(ranks).map {
|
||||||
|
case (id, (username, rank)) => (username, rank)
|
||||||
|
}
|
||||||
|
// Print the result
|
||||||
|
println(ranksByUsername.collect().mkString("\n"))
|
||||||
|
// $example off$
|
||||||
|
spark.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// scalastyle:on println
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// scalastyle:off println
|
||||||
|
package org.apache.spark.examples.graphx
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
import org.apache.spark.graphx.{Graph, VertexId}
|
||||||
|
import org.apache.spark.graphx.util.GraphGenerators
|
||||||
|
// $example off$
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An example use the Pregel operator to express computation
|
||||||
|
* such as single source shortest path
|
||||||
|
* Run with
|
||||||
|
* {{{
|
||||||
|
* bin/run-example graphx.SSSPExample
|
||||||
|
* }}}
|
||||||
|
*/
|
||||||
|
object SSSPExample {
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
// Creates a SparkSession.
|
||||||
|
val spark = SparkSession
|
||||||
|
.builder
|
||||||
|
.appName(s"${this.getClass.getSimpleName}")
|
||||||
|
.getOrCreate()
|
||||||
|
val sc = spark.sparkContext
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
// A graph with edge attributes containing distances
|
||||||
|
val graph: Graph[Long, Double] =
|
||||||
|
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
|
||||||
|
val sourceId: VertexId = 42 // The ultimate source
|
||||||
|
// Initialize the graph such that all vertices except the root have distance infinity.
|
||||||
|
val initialGraph = graph.mapVertices((id, _) =>
|
||||||
|
if (id == sourceId) 0.0 else Double.PositiveInfinity)
|
||||||
|
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
|
||||||
|
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
|
||||||
|
triplet => { // Send Message
|
||||||
|
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
|
||||||
|
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
|
||||||
|
} else {
|
||||||
|
Iterator.empty
|
||||||
|
}
|
||||||
|
},
|
||||||
|
(a, b) => math.min(a, b) // Merge Message
|
||||||
|
)
|
||||||
|
println(sssp.vertices.collect.mkString("\n"))
|
||||||
|
// $example off$
|
||||||
|
|
||||||
|
spark.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// scalastyle:on println
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// scalastyle:off println
|
||||||
|
package org.apache.spark.examples.graphx
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
|
||||||
|
// $example off$
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A vertex is part of a triangle when it has two adjacent vertices with an edge between them.
|
||||||
|
* GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount]
|
||||||
|
* that determines the number of triangles passing through each vertex,
|
||||||
|
* providing a measure of clustering.
|
||||||
|
* We compute the triangle count of the social network dataset.
|
||||||
|
*
|
||||||
|
* Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`)
|
||||||
|
* and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].
|
||||||
|
*
|
||||||
|
* Run with
|
||||||
|
* {{{
|
||||||
|
* bin/run-example graphx.TriangleCountingExample
|
||||||
|
* }}}
|
||||||
|
*/
|
||||||
|
object TriangleCountingExample {
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
// Creates a SparkSession.
|
||||||
|
val spark = SparkSession
|
||||||
|
.builder
|
||||||
|
.appName(s"${this.getClass.getSimpleName}")
|
||||||
|
.getOrCreate()
|
||||||
|
val sc = spark.sparkContext
|
||||||
|
|
||||||
|
// $example on$
|
||||||
|
// Load the edges in canonical order and partition the graph for triangle count
|
||||||
|
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
|
||||||
|
.partitionBy(PartitionStrategy.RandomVertexCut)
|
||||||
|
// Find the triangle count for each vertex
|
||||||
|
val triCounts = graph.triangleCount().vertices
|
||||||
|
// Join the triangle counts with the usernames
|
||||||
|
val users = sc.textFile("data/graphx/users.txt").map { line =>
|
||||||
|
val fields = line.split(",")
|
||||||
|
(fields(0).toLong, fields(1))
|
||||||
|
}
|
||||||
|
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
|
||||||
|
(username, tc)
|
||||||
|
}
|
||||||
|
// Print the result
|
||||||
|
println(triCountByUsername.collect().mkString("\n"))
|
||||||
|
// $example off$
|
||||||
|
spark.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// scalastyle:on println
|
Loading…
Reference in a new issue