spark-instrumented-optimizer/docs/bagel-programming-guide.md

167 lines
6.6 KiB
Markdown
Raw Normal View History

---
layout: global
title: Bagel Programming Guide
---
**Bagel will soon be superseded by [GraphX](graphx-programming-guide.html); we recommend that new users try GraphX instead.**
Bagel is a Spark implementation of Google's [Pregel](http://portal.acm.org/citation.cfm?id=1807184) graph processing framework. Bagel currently supports basic graph computation, combiners, and aggregators.
In the Pregel programming model, jobs run as a sequence of iterations called _supersteps_. In each superstep, each vertex in the graph runs a user-specified function that can update state associated with the vertex and send messages to other vertices for use in the *next* iteration.
This guide shows the programming model and features of Bagel by walking through an example implementation of PageRank on Bagel.
2013-09-01 02:01:50 -04:00
# Linking with Bagel
2013-09-01 02:01:50 -04:00
To use Bagel in your program, add the following SBT or Maven dependency:
2013-09-01 02:01:50 -04:00
groupId = org.apache.spark
artifactId = spark-bagel_{{site.SCALA_BINARY_VERSION}}
2013-09-01 02:01:50 -04:00
version = {{site.SPARK_VERSION}}
2013-09-01 02:01:50 -04:00
# Programming Model
[SPARK-1566] consolidate programming guide, and general doc updates This is a fairly large PR to clean up and update the docs for 1.0. The major changes are: * A unified programming guide for all languages replaces language-specific ones and shows language-specific info in tabs * New programming guide sections on key-value pairs, unit testing, input formats beyond text, migrating from 0.9, and passing functions to Spark * Spark-submit guide moved to a separate page and expanded slightly * Various cleanups of the menu system, security docs, and others * Updated look of title bar to differentiate the docs from previous Spark versions You can find the updated docs at http://people.apache.org/~matei/1.0-docs/_site/ and in particular http://people.apache.org/~matei/1.0-docs/_site/programming-guide.html. Author: Matei Zaharia <matei@databricks.com> Closes #896 from mateiz/1.0-docs and squashes the following commits: 03e6853 [Matei Zaharia] Some tweaks to configuration and YARN docs 0779508 [Matei Zaharia] tweak ef671d4 [Matei Zaharia] Keep frames in JavaDoc links, and other small tweaks 1bf4112 [Matei Zaharia] Review comments 4414f88 [Matei Zaharia] tweaks d04e979 [Matei Zaharia] Fix some old links to Java guide a34ed33 [Matei Zaharia] tweak 541bb3b [Matei Zaharia] miscellaneous changes fcefdec [Matei Zaharia] Moved submitting apps to separate doc 61d72b4 [Matei Zaharia] stuff 181f217 [Matei Zaharia] migration guide, remove old language guides e11a0da [Matei Zaharia] Add more API functions 6a030a9 [Matei Zaharia] tweaks 8db0ae3 [Matei Zaharia] Added key-value pairs section 318d2c9 [Matei Zaharia] tweaks 1c81477 [Matei Zaharia] New section on basics and function syntax e38f559 [Matei Zaharia] Actually added programming guide to Git a33d6fe [Matei Zaharia] First pass at updating programming guide to support all languages, plus other tweaks throughout 3b6a876 [Matei Zaharia] More CSS tweaks 01ec8bf [Matei Zaharia] More CSS tweaks e6d252e [Matei Zaharia] Change color of doc title bar to differentiate from 0.9.0
2014-05-30 03:34:33 -04:00
Bagel operates on a graph represented as a [distributed dataset](programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.
We first extend the default `Vertex` class to store a `Double`
representing the current PageRank of the vertex, and similarly extend
the `Message` and `Edge` classes. Note that these need to be marked `@serializable` to allow Spark to transfer them across machines. We also import the Bagel types and implicit conversions.
{% highlight scala %}
2013-09-01 01:17:40 -04:00
import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._
@serializable class PREdge(val targetId: String) extends Edge
@serializable class PRVertex(
val id: String, val rank: Double, val outEdges: Seq[Edge],
val active: Boolean) extends Vertex
@serializable class PRMessage(
val targetId: String, val rankShare: Double) extends Message
{% endhighlight %}
Next, we load a sample graph from a text file as a distributed dataset and package it into `PRVertex` objects. We also cache the distributed dataset because Bagel will use it multiple times and we'd like to avoid recomputing it.
{% highlight scala %}
val input = sc.textFile("data/mllib/pagerank_data.txt")
val numVerts = input.count()
val verts = input.map(line => {
val fields = line.split('\t')
val (id, linksStr) = (fields(0), fields(1))
val links = linksStr.split(',').map(new PREdge(_))
(id, new PRVertex(id, 1.0 / numVerts, links, true))
}).cache
{% endhighlight %}
We run the Bagel job, passing in `verts`, an empty distributed dataset of messages, and a custom compute function that runs PageRank for 10 iterations.
{% highlight scala %}
val emptyMsgs = sc.parallelize(List[(String, PRMessage)]())
def compute(self: PRVertex, msgs: Option[Seq[PRMessage]], superstep: Int)
: (PRVertex, Iterable[PRMessage]) = {
val msgSum = msgs.getOrElse(List()).map(_.rankShare).sum
val newRank =
if (msgSum != 0)
0.15 / numVerts + 0.85 * msgSum
else
self.rank
val halt = superstep >= 10
val msgsOut =
if (!halt)
self.outEdges.map(edge =>
new PRMessage(edge.targetId, newRank / self.outEdges.size))
else
List()
(new PRVertex(self.id, newRank, self.outEdges, !halt), msgsOut)
}
{% endhighlight %}
val result = Bagel.run(sc, verts, emptyMsgs)()(compute)
Finally, we print the results.
{% highlight scala %}
println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString)
{% endhighlight %}
2013-09-01 02:01:50 -04:00
## Combiners
Sending a message to another vertex generally involves expensive communication over the network. For certain algorithms, it's possible to reduce the amount of communication using _combiners_. For example, if the compute function receives integer messages and only uses their sum, it's possible for Bagel to combine multiple messages to the same vertex by summing them.
For combiner support, Bagel can optionally take a set of combiner functions that convert messages to their combined form.
_Example: PageRank with combiners_
2013-09-01 02:01:50 -04:00
## Aggregators
Aggregators perform a reduce across all vertices after each superstep, and provide the result to each vertex in the next superstep.
For aggregator support, Bagel can optionally take an aggregator function that reduces across each vertex.
_Example_
2013-09-01 02:01:50 -04:00
## Operations
Here are the actions and types in the Bagel API. See [Bagel.scala](https://github.com/apache/spark/blob/master/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala) for details.
2013-09-01 02:01:50 -04:00
### Actions
{% highlight scala %}
/*** Full form ***/
Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute)
// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])
/*** Abbreviated forms ***/
Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute)
// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])
Bagel.run(sc, vertices, messages, combiner, numSplits)(compute)
// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])
Bagel.run(sc, vertices, messages, numSplits)(compute)
// where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])
{% endhighlight %}
2013-09-01 02:01:50 -04:00
### Types
{% highlight scala %}
trait Combiner[M, C] {
def createCombiner(msg: M): C
def mergeMsg(combiner: C, msg: M): C
def mergeCombiners(a: C, b: C): C
}
trait Aggregator[V, A] {
def createAggregator(vert: V): A
def mergeAggregators(a: A, b: A): A
}
trait Vertex {
def active: Boolean
}
trait Message[K] {
def targetId: K
}
{% endhighlight %}
2013-09-01 02:01:50 -04:00
# Where to Go from Here
2014-01-02 08:11:21 -05:00
Two example jobs, PageRank and shortest path, are included in `examples/src/main/scala/org/apache/spark/examples/bagel`. You can run them by passing the class name to the `bin/run-example` script included in Spark; e.g.:
2013-09-01 01:38:50 -04:00
2014-01-02 08:11:21 -05:00
./bin/run-example org.apache.spark.examples.bagel.WikipediaPageRank
2013-09-01 01:38:50 -04:00
Each example program prints usage help when run without any arguments.