add metadatacleaner for persisentRdd map

This commit is contained in:
Imran Rashid 2013-01-25 17:04:16 -08:00
parent a1d9d1767d
commit 49c05608f5

View file

@ -44,7 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import util.TimeStampedHashMap
import util.{MetadataCleaner, TimeStampedHashMap}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@ -113,6 +113,9 @@ class SparkContext(
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
private[spark] val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
// Add each JAR given through the constructor
jars.foreach { addJar(_) }
@ -512,6 +515,7 @@ class SparkContext(
/** Shut down the SparkContext. */
def stop() {
if (dagScheduler != null) {
metadataCleaner.cancel()
dagScheduler.stop()
dagScheduler = null
taskScheduler = null
@ -654,6 +658,12 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
private[spark] def cleanup(cleanupTime: Long) {
var sizeBefore = persistentRdds.size
persistentRdds.clearOldValues(cleanupTime)
logInfo("idToStage " + sizeBefore + " --> " + persistentRdds.size)
}
}
/**