2012-12-05 01:10:25 -05:00
|
|
|
package spark
|
|
|
|
|
|
|
|
import org.apache.hadoop.fs.Path
|
2012-12-20 14:52:23 -05:00
|
|
|
import rdd.{CheckpointRDD, CoalescedRDD}
|
2012-12-11 02:36:37 -05:00
|
|
|
import scheduler.{ResultTask, ShuffleMapTask}
|
2012-12-05 01:10:25 -05:00
|
|
|
|
2012-12-11 02:36:37 -05:00
|
|
|
/**
|
2012-12-17 21:52:43 -05:00
|
|
|
* Enumeration to manage state transitions of an RDD through checkpointing
|
|
|
|
* [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]
|
2012-12-11 02:36:37 -05:00
|
|
|
*/
|
2012-12-17 21:52:43 -05:00
|
|
|
private[spark] object CheckpointState extends Enumeration {
|
|
|
|
type CheckpointState = Value
|
|
|
|
val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value
|
|
|
|
}
|
2012-12-05 01:10:25 -05:00
|
|
|
|
2012-12-17 21:52:43 -05:00
|
|
|
/**
|
|
|
|
* This class contains all the information of the regarding RDD checkpointing.
|
|
|
|
*/
|
2012-12-05 01:10:25 -05:00
|
|
|
private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
|
2012-12-11 02:36:37 -05:00
|
|
|
extends Logging with Serializable {
|
2012-12-05 01:10:25 -05:00
|
|
|
|
2012-12-17 21:52:43 -05:00
|
|
|
import CheckpointState._
|
2012-12-05 01:10:25 -05:00
|
|
|
|
2012-12-17 21:52:43 -05:00
|
|
|
var cpState = Initialized
|
2012-12-11 02:36:37 -05:00
|
|
|
@transient var cpFile: Option[String] = None
|
|
|
|
@transient var cpRDD: Option[RDD[T]] = None
|
2012-12-05 01:10:25 -05:00
|
|
|
|
2012-12-11 02:36:37 -05:00
|
|
|
// Mark the RDD for checkpointing
|
2012-12-17 21:52:43 -05:00
|
|
|
def markForCheckpoint() {
|
|
|
|
RDDCheckpointData.synchronized {
|
|
|
|
if (cpState == Initialized) cpState = MarkedForCheckpoint
|
|
|
|
}
|
2012-12-05 01:10:25 -05:00
|
|
|
}
|
|
|
|
|
2012-12-11 02:36:37 -05:00
|
|
|
// Is the RDD already checkpointed
|
2012-12-17 21:52:43 -05:00
|
|
|
def isCheckpointed(): Boolean = {
|
|
|
|
RDDCheckpointData.synchronized { cpState == Checkpointed }
|
2012-12-05 01:10:25 -05:00
|
|
|
}
|
|
|
|
|
2012-12-17 21:52:43 -05:00
|
|
|
// Get the file to which this RDD was checkpointed to as an Option
|
|
|
|
def getCheckpointFile(): Option[String] = {
|
2012-12-11 02:36:37 -05:00
|
|
|
RDDCheckpointData.synchronized { cpFile }
|
2012-12-05 01:10:25 -05:00
|
|
|
}
|
|
|
|
|
2012-12-11 02:36:37 -05:00
|
|
|
// Do the checkpointing of the RDD. Called after the first job using that RDD is over.
|
2012-12-05 01:10:25 -05:00
|
|
|
def doCheckpoint() {
|
2012-12-11 02:36:37 -05:00
|
|
|
// If it is marked for checkpointing AND checkpointing is not already in progress,
|
|
|
|
// then set it to be in progress, else return
|
|
|
|
RDDCheckpointData.synchronized {
|
2012-12-17 21:52:43 -05:00
|
|
|
if (cpState == MarkedForCheckpoint) {
|
|
|
|
cpState = CheckpointingInProgress
|
2012-12-05 01:10:25 -05:00
|
|
|
} else {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-12-11 02:36:37 -05:00
|
|
|
// Save to file, and reload it as an RDD
|
2012-12-20 14:52:23 -05:00
|
|
|
val path = new Path(rdd.context.checkpointDir, "rdd-" + rdd.id).toString
|
|
|
|
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _)
|
|
|
|
val newRDD = new CheckpointRDD[T](rdd.context, path)
|
2012-12-11 02:36:37 -05:00
|
|
|
|
|
|
|
// Change the dependencies and splits of the RDD
|
|
|
|
RDDCheckpointData.synchronized {
|
2012-12-20 14:52:23 -05:00
|
|
|
cpFile = Some(path)
|
2012-12-05 01:10:25 -05:00
|
|
|
cpRDD = Some(newRDD)
|
2012-12-11 02:36:37 -05:00
|
|
|
rdd.changeDependencies(newRDD)
|
2012-12-17 21:52:43 -05:00
|
|
|
cpState = Checkpointed
|
2012-12-11 02:36:37 -05:00
|
|
|
RDDCheckpointData.checkpointCompleted()
|
|
|
|
logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
|
2012-12-05 01:10:25 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-12-11 02:36:37 -05:00
|
|
|
// Get preferred location of a split after checkpointing
|
2012-12-18 16:30:53 -05:00
|
|
|
def getPreferredLocations(split: Split) = {
|
2012-12-11 02:36:37 -05:00
|
|
|
RDDCheckpointData.synchronized {
|
|
|
|
cpRDD.get.preferredLocations(split)
|
|
|
|
}
|
2012-12-05 01:10:25 -05:00
|
|
|
}
|
|
|
|
|
2012-12-18 16:30:53 -05:00
|
|
|
def getSplits: Array[Split] = {
|
|
|
|
RDDCheckpointData.synchronized {
|
|
|
|
cpRDD.get.splits
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-12-11 02:36:37 -05:00
|
|
|
// Get iterator. This is called at the worker nodes.
|
2012-12-26 22:09:01 -05:00
|
|
|
def iterator(split: Split, context: TaskContext): Iterator[T] = {
|
|
|
|
rdd.firstParent[T].iterator(split, context)
|
2012-12-11 02:36:37 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private[spark] object RDDCheckpointData {
|
|
|
|
def checkpointCompleted() {
|
|
|
|
ShuffleMapTask.clearCache()
|
|
|
|
ResultTask.clearCache()
|
2012-12-05 01:10:25 -05:00
|
|
|
}
|
|
|
|
}
|