[SPARK-2661][bagel]unpersist old processed rdd
Unpersist useless rdd during bagel iteration to make full use of memory. Author: Daoyuan <daoyuan.wang@intel.com> Closes #1519 from adrian-wang/bagelunpersist and squashes the following commits: 182c9dd [Daoyuan] rename var nextUseless to lastRDD 87fd3a4 [Daoyuan] bagel unpersist old processed rdd
This commit is contained in:
parent
e34922a221
commit
42dfab7d37
|
@ -72,6 +72,7 @@ object Bagel extends Logging {
|
|||
var verts = vertices
|
||||
var msgs = messages
|
||||
var noActivity = false
|
||||
var lastRDD: RDD[(K, (V, Array[M]))] = null
|
||||
do {
|
||||
logInfo("Starting superstep " + superstep + ".")
|
||||
val startTime = System.currentTimeMillis
|
||||
|
@ -83,6 +84,10 @@ object Bagel extends Logging {
|
|||
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
|
||||
val (processed, numMsgs, numActiveVerts) =
|
||||
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
|
||||
if (lastRDD != null) {
|
||||
lastRDD.unpersist(false)
|
||||
}
|
||||
lastRDD = processed
|
||||
|
||||
val timeTaken = System.currentTimeMillis - startTime
|
||||
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
|
||||
|
|
Loading…
Reference in a new issue