Bug fixes

This commit is contained in:
Tathagata Das 2012-11-28 08:36:55 +00:00
parent b18d70870a
commit d5e7aad039
3 changed files with 20 additions and 10 deletions

View file

@ -78,7 +78,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
val running = new HashSet[Stage] // Stages we are running right now
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage
var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits
val activeJobs = new HashSet[ActiveJob]
@ -595,8 +595,17 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
def cleanup(cleanupTime: Long) {
var sizeBefore = idToStage.size
idToStage.cleanup(cleanupTime)
logInfo("idToStage " + sizeBefore + " --> " + idToStage.size)
sizeBefore = shuffleToMapStage.size
shuffleToMapStage.cleanup(cleanupTime)
logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size)
sizeBefore = pendingTasks.size
pendingTasks.cleanup(cleanupTime)
logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)
}
def stop() {

View file

@ -5,24 +5,25 @@ import java.util.{TimerTask, Timer}
import spark.Logging
class CleanupTask(name: String, cleanupFunc: (Long) => Unit) extends Logging {
val delayMins = System.getProperty("spark.cleanup.delay", "-100").toInt
val periodMins = System.getProperty("spark.cleanup.period", (delayMins / 10).toString).toInt
val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt
val periodSeconds = math.max(10, delaySeconds / 10)
val timer = new Timer(name + " cleanup timer", true)
val task = new TimerTask {
def run() {
try {
if (delayMins > 0) {
cleanupFunc(System.currentTimeMillis() - (delayMins * 60 * 1000))
if (delaySeconds > 0) {
cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
logInfo("Ran cleanup task for " + name)
}
}
} catch {
case e: Exception => logError("Error running cleanup task for " + name, e)
}
}
}
if (periodMins > 0) {
timer.schedule(task, periodMins * 60 * 1000, periodMins * 60 * 1000)
if (periodSeconds > 0) {
logInfo("Starting cleanup task for " + name + " with delay of " + delaySeconds + " seconds and "
+ "period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
}
def cancel() {

View file

@ -264,7 +264,7 @@ class StreamingContext private (
object StreamingContext {
def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
if (System.getProperty("spark.cleanup.delay", "-1").toInt < 0) {
if (System.getProperty("spark.cleanup.delay", "-1").toDouble < 0) {
System.setProperty("spark.cleanup.delay", "60")
}
new SparkContext(master, frameworkName)