Merge pull request #513 from MLnick/bagel-caching
Adds choice of persistence level to Bagel.
This commit is contained in:
commit
cbf8f0d4dd
|
@ -4,8 +4,38 @@ import spark._
|
|||
import spark.SparkContext._
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import storage.StorageLevel
|
||||
|
||||
object Bagel extends Logging {
|
||||
|
||||
val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY
|
||||
|
||||
/**
|
||||
* Runs a Bagel program.
|
||||
* @param sc [[spark.SparkContext]] to use for the program.
|
||||
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
|
||||
* the vertex id.
|
||||
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
|
||||
* empty array, i.e. sc.parallelize(Array[K, Message]()).
|
||||
* @param combiner [[spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
|
||||
* message before sending (which often involves network I/O).
|
||||
* @param aggregator [[spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
|
||||
* and provides the result to each vertex in the next superstep.
|
||||
* @param partitioner [[spark.Partitioner]] partitions values by key
|
||||
* @param numPartitions number of partitions across which to split the graph.
|
||||
* Default is the default parallelism of the SparkContext
|
||||
* @param storageLevel [[spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
|
||||
* Defaults to caching in memory.
|
||||
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
|
||||
* optional Aggregator and the current superstep,
|
||||
* and returns a set of (Vertex, outgoing Messages) pairs
|
||||
* @tparam K key
|
||||
* @tparam V vertex type
|
||||
* @tparam M message type
|
||||
* @tparam C combiner
|
||||
* @tparam A aggregator
|
||||
* @return an RDD of (K, V) pairs representing the graph after completion of the program
|
||||
*/
|
||||
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
|
||||
C: Manifest, A: Manifest](
|
||||
sc: SparkContext,
|
||||
|
@ -14,7 +44,8 @@ object Bagel extends Logging {
|
|||
combiner: Combiner[M, C],
|
||||
aggregator: Option[Aggregator[V, A]],
|
||||
partitioner: Partitioner,
|
||||
numPartitions: Int
|
||||
numPartitions: Int,
|
||||
storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
|
||||
)(
|
||||
compute: (V, Option[C], Option[A], Int) => (V, Array[M])
|
||||
): RDD[(K, V)] = {
|
||||
|
@ -33,7 +64,7 @@ object Bagel extends Logging {
|
|||
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
|
||||
val grouped = combinedMsgs.groupWith(verts)
|
||||
val (processed, numMsgs, numActiveVerts) =
|
||||
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep))
|
||||
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel)
|
||||
|
||||
val timeTaken = System.currentTimeMillis - startTime
|
||||
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
|
||||
|
@ -50,6 +81,7 @@ object Bagel extends Logging {
|
|||
verts
|
||||
}
|
||||
|
||||
/** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default storage level */
|
||||
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
|
||||
sc: SparkContext,
|
||||
vertices: RDD[(K, V)],
|
||||
|
@ -59,12 +91,29 @@ object Bagel extends Logging {
|
|||
numPartitions: Int
|
||||
)(
|
||||
compute: (V, Option[C], Int) => (V, Array[M])
|
||||
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
|
||||
|
||||
/** Runs a Bagel program with no [[spark.bagel.Aggregator]] */
|
||||
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
|
||||
sc: SparkContext,
|
||||
vertices: RDD[(K, V)],
|
||||
messages: RDD[(K, M)],
|
||||
combiner: Combiner[M, C],
|
||||
partitioner: Partitioner,
|
||||
numPartitions: Int,
|
||||
storageLevel: StorageLevel
|
||||
)(
|
||||
compute: (V, Option[C], Int) => (V, Array[M])
|
||||
): RDD[(K, V)] = {
|
||||
run[K, V, M, C, Nothing](
|
||||
sc, vertices, messages, combiner, None, partitioner, numPartitions)(
|
||||
sc, vertices, messages, combiner, None, partitioner, numPartitions, storageLevel)(
|
||||
addAggregatorArg[K, V, M, C](compute))
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]]
|
||||
* and default storage level
|
||||
*/
|
||||
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
|
||||
sc: SparkContext,
|
||||
vertices: RDD[(K, V)],
|
||||
|
@ -73,13 +122,29 @@ object Bagel extends Logging {
|
|||
numPartitions: Int
|
||||
)(
|
||||
compute: (V, Option[C], Int) => (V, Array[M])
|
||||
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
|
||||
|
||||
/** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default [[spark.HashPartitioner]]*/
|
||||
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
|
||||
sc: SparkContext,
|
||||
vertices: RDD[(K, V)],
|
||||
messages: RDD[(K, M)],
|
||||
combiner: Combiner[M, C],
|
||||
numPartitions: Int,
|
||||
storageLevel: StorageLevel
|
||||
)(
|
||||
compute: (V, Option[C], Int) => (V, Array[M])
|
||||
): RDD[(K, V)] = {
|
||||
val part = new HashPartitioner(numPartitions)
|
||||
run[K, V, M, C, Nothing](
|
||||
sc, vertices, messages, combiner, None, part, numPartitions)(
|
||||
sc, vertices, messages, combiner, None, part, numPartitions, storageLevel)(
|
||||
addAggregatorArg[K, V, M, C](compute))
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]],
|
||||
* [[spark.bagel.DefaultCombiner]] and the default storage level
|
||||
*/
|
||||
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
|
||||
sc: SparkContext,
|
||||
vertices: RDD[(K, V)],
|
||||
|
@ -87,10 +152,24 @@ object Bagel extends Logging {
|
|||
numPartitions: Int
|
||||
)(
|
||||
compute: (V, Option[Array[M]], Int) => (V, Array[M])
|
||||
): RDD[(K, V)] = {
|
||||
): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
|
||||
|
||||
/**
|
||||
* Runs a Bagel program with no [[spark.bagel.Aggregator]], the default [[spark.HashPartitioner]]
|
||||
* and [[spark.bagel.DefaultCombiner]]
|
||||
*/
|
||||
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
|
||||
sc: SparkContext,
|
||||
vertices: RDD[(K, V)],
|
||||
messages: RDD[(K, M)],
|
||||
numPartitions: Int,
|
||||
storageLevel: StorageLevel
|
||||
)(
|
||||
compute: (V, Option[Array[M]], Int) => (V, Array[M])
|
||||
): RDD[(K, V)] = {
|
||||
val part = new HashPartitioner(numPartitions)
|
||||
run[K, V, M, Array[M], Nothing](
|
||||
sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions)(
|
||||
sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions, storageLevel)(
|
||||
addAggregatorArg[K, V, M, Array[M]](compute))
|
||||
}
|
||||
|
||||
|
@ -117,7 +196,8 @@ object Bagel extends Logging {
|
|||
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
|
||||
sc: SparkContext,
|
||||
grouped: RDD[(K, (Seq[C], Seq[V]))],
|
||||
compute: (V, Option[C]) => (V, Array[M])
|
||||
compute: (V, Option[C]) => (V, Array[M]),
|
||||
storageLevel: StorageLevel
|
||||
): (RDD[(K, (V, Array[M]))], Int, Int) = {
|
||||
var numMsgs = sc.accumulator(0)
|
||||
var numActiveVerts = sc.accumulator(0)
|
||||
|
@ -135,7 +215,7 @@ object Bagel extends Logging {
|
|||
numActiveVerts += 1
|
||||
|
||||
Some((newVert, newMsgs))
|
||||
}.cache
|
||||
}.persist(storageLevel)
|
||||
|
||||
// Force evaluation of processed RDD for accurate performance measurements
|
||||
processed.foreach(x => {})
|
||||
|
@ -166,6 +246,7 @@ trait Aggregator[V, A] {
|
|||
def mergeAggregators(a: A, b: A): A
|
||||
}
|
||||
|
||||
/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
|
||||
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
|
||||
def createCombiner(msg: M): Array[M] =
|
||||
Array(msg)
|
||||
|
|
|
@ -7,6 +7,7 @@ import org.scalatest.time.SpanSugar._
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import spark._
|
||||
import storage.StorageLevel
|
||||
|
||||
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
|
||||
class TestMessage(val targetId: String) extends Message[String] with Serializable
|
||||
|
@ -79,4 +80,21 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("using non-default persistence level") {
|
||||
failAfter(10 seconds) {
|
||||
sc = new SparkContext("local", "test")
|
||||
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
|
||||
val msgs = sc.parallelize(Array[(String, TestMessage)]())
|
||||
val numSupersteps = 50
|
||||
val result =
|
||||
Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) {
|
||||
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
|
||||
(new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
|
||||
}
|
||||
for ((id, vert) <- result.collect) {
|
||||
assert(vert.age === numSupersteps)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue