taskInfo tracks if a task is run on a preferred host
This commit is contained in:
parent
6f62a57858
commit
d0bfac3eed
|
@ -11,7 +11,8 @@ class TaskInfo(
|
||||||
val index: Int,
|
val index: Int,
|
||||||
val launchTime: Long,
|
val launchTime: Long,
|
||||||
val executorId: String,
|
val executorId: String,
|
||||||
val host: String) {
|
val host: String,
|
||||||
|
val preferred: Boolean) {
|
||||||
var finishTime: Long = 0
|
var finishTime: Long = 0
|
||||||
var failed = false
|
var failed = false
|
||||||
|
|
||||||
|
|
|
@ -208,7 +208,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
|
||||||
taskSet.id, index, taskId, execId, host, prefStr))
|
taskSet.id, index, taskId, execId, host, prefStr))
|
||||||
// Do various bookkeeping
|
// Do various bookkeeping
|
||||||
copiesRunning(index) += 1
|
copiesRunning(index) += 1
|
||||||
val info = new TaskInfo(taskId, index, time, execId, host)
|
val info = new TaskInfo(taskId, index, time, execId, host, preferred)
|
||||||
taskInfos(taskId) = info
|
taskInfos(taskId) = info
|
||||||
taskAttempts(index) = info :: taskAttempts(index)
|
taskAttempts(index) = info :: taskAttempts(index)
|
||||||
if (preferred) {
|
if (preferred) {
|
||||||
|
|
|
@ -53,7 +53,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
|
||||||
|
|
||||||
def runTask(task: Task[_], idInJob: Int, attemptId: Int) {
|
def runTask(task: Task[_], idInJob: Int, attemptId: Int) {
|
||||||
logInfo("Running " + task)
|
logInfo("Running " + task)
|
||||||
val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local")
|
val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local", true)
|
||||||
// Set the Spark execution environment for the worker thread
|
// Set the Spark execution environment for the worker thread
|
||||||
SparkEnv.set(env)
|
SparkEnv.set(env)
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in a new issue