[SPARK-19793] Use clock.getTimeMillis when mark task as finished in TaskSetManager.

## What changes were proposed in this pull request?

TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set by `clock`), the result is not correct.

## How was this patch tested?
Existing tests.

Author: jinxing <jinxing6042@126.com>

Closes #17133 from jinxing64/SPARK-19793.
This commit is contained in:
jinxing 2017-03-09 10:56:19 -08:00 committed by Marcelo Vanzin
parent b60b9fc10a
commit 3232e54f2f
4 changed files with 17 additions and 7 deletions

View file

@ -70,11 +70,13 @@ class TaskInfo(
var killed = false var killed = false
private[spark] def markGettingResult(time: Long = System.currentTimeMillis) { private[spark] def markGettingResult(time: Long) {
gettingResultTime = time gettingResultTime = time
} }
private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) { private[spark] def markFinished(state: TaskState, time: Long) {
// finishTime should be set larger than 0, otherwise "finished" below will return false.
assert(time > 0)
finishTime = time finishTime = time
if (state == TaskState.FAILED) { if (state == TaskState.FAILED) {
failed = true failed = true

View file

@ -667,7 +667,7 @@ private[spark] class TaskSetManager(
*/ */
def handleTaskGettingResult(tid: Long): Unit = { def handleTaskGettingResult(tid: Long): Unit = {
val info = taskInfos(tid) val info = taskInfos(tid)
info.markGettingResult() info.markGettingResult(clock.getTimeMillis())
sched.dagScheduler.taskGettingResult(info) sched.dagScheduler.taskGettingResult(info)
} }
@ -695,7 +695,7 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid) val info = taskInfos(tid)
val index = info.index val index = info.index
info.markFinished(TaskState.FINISHED) info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid) removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
@ -739,7 +739,7 @@ private[spark] class TaskSetManager(
return return
} }
removeRunningTask(tid) removeRunningTask(tid)
info.markFinished(state) info.markFinished(state, clock.getTimeMillis())
val index = info.index val index = info.index
copiesRunning(index) -= 1 copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty

View file

@ -192,6 +192,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined) assert(taskOption.isDefined)
clock.advance(1)
// Tell it the task has finished // Tell it the task has finished
manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates)) manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
assert(sched.endedTasks(0) === Success) assert(sched.endedTasks(0) === Success)
@ -377,6 +378,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1")) sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1) val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock val clock = new ManualClock
clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@ -394,6 +396,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1")) sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1) val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock val clock = new ManualClock
clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
@ -427,6 +430,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// affinity to exec1 on host1 - which we will fail. // affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1"))) val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
val clock = new ManualClock val clock = new ManualClock
clock.advance(1)
// We don't directly use the application blacklist, but its presence triggers blacklisting // We don't directly use the application blacklist, but its presence triggers blacklisting
// within the taskset. // within the taskset.
val mockListenerBus = mock(classOf[LiveListenerBus]) val mockListenerBus = mock(classOf[LiveListenerBus])
@ -551,7 +555,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execB")), Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")), Seq(TaskLocation("host2", "execC")),
Seq()) Seq())
val manager = new TaskSetManager(sched, taskSet, 1, clock = new ManualClock) val clock = new ManualClock()
clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, 1, clock = clock)
sched.addExecutor("execA", "host1") sched.addExecutor("execA", "host1")
manager.executorAdded() manager.executorAdded()
sched.addExecutor("execC", "host2") sched.addExecutor("execC", "host2")
@ -904,6 +910,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(task.executorId === k) assert(task.executorId === k)
} }
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
clock.advance(1)
// Complete the 3 tasks and leave 1 task in running // Complete the 3 tasks and leave 1 task in running
for (id <- Set(0, 1, 2)) { for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
@ -961,6 +968,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
tasks += task tasks += task
} }
assert(sched.startedTasks.toSet === (0 until 5).toSet) assert(sched.startedTasks.toSet === (0 until 5).toSet)
clock.advance(1)
// Complete 3 tasks and leave 2 tasks in running // Complete 3 tasks and leave 2 tasks in running
for (id <- Set(0, 1, 2)) { for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))

View file

@ -77,7 +77,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false) val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markFinished(TaskState.FINISHED) taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis())
val taskMetrics = TaskMetrics.empty val taskMetrics = TaskMetrics.empty
taskMetrics.incPeakExecutionMemory(peakExecutionMemory) taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
jobListener.onTaskEnd( jobListener.onTaskEnd(