Merge pull request #677 from jerryshao/fix_stage_clean

Clean StageToInfos periodically when spark.cleaner.ttl is enabled
This commit is contained in:
Matei Zaharia 2013-07-04 21:32:29 -07:00
commit 6ad85d0918

View file

@ -312,7 +312,7 @@ class DAGScheduler(
handleExecutorLost(execId) handleExecutorLost(execId)
case completion: CompletionEvent => case completion: CompletionEvent =>
sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task, sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
completion.reason, completion.taskInfo, completion.taskMetrics))) completion.reason, completion.taskInfo, completion.taskMetrics)))
handleTaskCompletion(completion) handleTaskCompletion(completion)
@ -651,7 +651,7 @@ class DAGScheduler(
"(generation " + currentGeneration + ")") "(generation " + currentGeneration + ")")
} }
} }
private def handleExecutorGained(execId: String, hostPort: String) { private def handleExecutorGained(execId: String, hostPort: String) {
// remove from failedGeneration(execId) ? // remove from failedGeneration(execId) ?
if (failedGeneration.contains(execId)) { if (failedGeneration.contains(execId)) {
@ -747,6 +747,10 @@ class DAGScheduler(
sizeBefore = pendingTasks.size sizeBefore = pendingTasks.size
pendingTasks.clearOldValues(cleanupTime) pendingTasks.clearOldValues(cleanupTime)
logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size) logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)
sizeBefore = stageToInfos.size
stageToInfos.clearOldValues(cleanupTime)
logInfo("stageToInfos " + sizeBefore + " --> " + stageToInfos.size)
} }
def stop() { def stop() {