From d5e7aad039603a8a02d11f9ebda001422ca4c341 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 28 Nov 2012 08:36:55 +0000 Subject: [PATCH] Bug fixes --- .../scala/spark/scheduler/DAGScheduler.scala | 11 ++++++++++- .../src/main/scala/spark/util/CleanupTask.scala | 17 +++++++++-------- .../spark/streaming/StreamingContext.scala | 2 +- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 3af877b817..affacb43ca 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -78,7 +78,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done val running = new HashSet[Stage] // Stages we are running right now val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures - val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage + val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits val activeJobs = new HashSet[ActiveJob] @@ -595,8 +595,17 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } def cleanup(cleanupTime: Long) { + var sizeBefore = idToStage.size idToStage.cleanup(cleanupTime) + logInfo("idToStage " + sizeBefore + " --> " + idToStage.size) + + sizeBefore = shuffleToMapStage.size shuffleToMapStage.cleanup(cleanupTime) + logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size) + + sizeBefore = pendingTasks.size + pendingTasks.cleanup(cleanupTime) + logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size) } def stop() { diff --git a/core/src/main/scala/spark/util/CleanupTask.scala b/core/src/main/scala/spark/util/CleanupTask.scala index ccc28803e0..a4357c62c6 100644 --- a/core/src/main/scala/spark/util/CleanupTask.scala +++ b/core/src/main/scala/spark/util/CleanupTask.scala @@ -5,24 +5,25 @@ import java.util.{TimerTask, Timer} import spark.Logging class CleanupTask(name: String, cleanupFunc: (Long) => Unit) extends Logging { - val delayMins = System.getProperty("spark.cleanup.delay", "-100").toInt - val periodMins = System.getProperty("spark.cleanup.period", (delayMins / 10).toString).toInt + val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt + val periodSeconds = math.max(10, delaySeconds / 10) val timer = new Timer(name + " cleanup timer", true) val task = new TimerTask { def run() { try { - if (delayMins > 0) { - - cleanupFunc(System.currentTimeMillis() - (delayMins * 60 * 1000)) + if (delaySeconds > 0) { + cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) logInfo("Ran cleanup task for " + name) - } + } } catch { case e: Exception => logError("Error running cleanup task for " + name, e) } } } - if (periodMins > 0) { - timer.schedule(task, periodMins * 60 * 1000, periodMins * 60 * 1000) + if (periodSeconds > 0) { + logInfo("Starting cleanup task for " + name + " with delay of " + delaySeconds + " seconds and " + + "period of " + periodSeconds + " secs") + timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000) } def cancel() { diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 58123dc82c..90dd560752 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -264,7 +264,7 @@ class StreamingContext private ( object StreamingContext { def createNewSparkContext(master: String, frameworkName: String): SparkContext = { - if (System.getProperty("spark.cleanup.delay", "-1").toInt < 0) { + if (System.getProperty("spark.cleanup.delay", "-1").toDouble < 0) { System.setProperty("spark.cleanup.delay", "60") } new SparkContext(master, frameworkName)