From d0bfac3eeda09c18c7af521584cfe9db9a5a10c3 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 Feb 2013 22:27:36 -0800 Subject: [PATCH] taskInfo tracks if a task is run on a preferred host --- core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala | 3 ++- .../main/scala/spark/scheduler/cluster/TaskSetManager.scala | 2 +- core/src/main/scala/spark/scheduler/local/LocalScheduler.scala | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala index a7e14094fb..53a3c5dc4d 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala @@ -11,7 +11,8 @@ class TaskInfo( val index: Int, val launchTime: Long, val executorId: String, - val host: String) { + val host: String, + val preferred: Boolean) { var finishTime: Long = 0 var failed = false diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 48876ffd79..236f81bb9f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -208,7 +208,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe taskSet.id, index, taskId, execId, host, prefStr)) // Do various bookkeeping copiesRunning(index) += 1 - val info = new TaskInfo(taskId, index, time, execId, host) + val info = new TaskInfo(taskId, index, time, execId, host, preferred) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) if (preferred) { diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 6b91728b74..a76253ea14 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -53,7 +53,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon def runTask(task: Task[_], idInJob: Int, attemptId: Int) { logInfo("Running " + task) - val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local") + val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local", true) // Set the Spark execution environment for the worker thread SparkEnv.set(env) try {