Revert [SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions

## What changes were proposed in this pull request?

Our customer has a very complicated job. Sometimes it successes and sometimes it fails with
```
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4  has failed the maximum allowable number of times: 4.
Most recent failure reason: org.apache.spark.shuffle.FetchFailedException
```

However, with the patch https://github.com/apache/spark/pull/23871 , the job hangs forever.

When I investigated it, I found that `DAGScheduler` and `TaskSchedulerImpl` define stage completion differently. `DAGScheduler` thinks a stage is completed if all its partitions are marked as completed ([result stage](https://github.com/apache/spark/blob/v2.4.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1362-L1368) and [shuffle stage](https://github.com/apache/spark/blob/v2.4.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1400)). `TaskSchedulerImpl` thinks a stage's task set is completed when all tasks finish (see the [code](https://github.com/apache/spark/blob/v2.4.1/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L779-L784)).

Ideally this two definition should be consistent, but #23871 breaks it. In our customer's Spark log, I found that, a stage's task set completes, but the stage never completes. More specifically, `DAGScheduler` submits a task set for stage 4.1 with 1000 tasks, but the `TaskSetManager` skips to run the first 100 tasks. Later on, `TaskSetManager` finishes 900 tasks and marks the task set as completed. However, `DAGScheduler` doesn't agree with it and hangs forever, waiting for more task completion events of stage 4.1.

With hindsight, I think `TaskSchedulerIImpl.stageIdToFinishedPartitions` is fragile. We need to pay more effort to make sure this is consistent with `DAGScheduler`'s knowledge. When `DAGScheduler` marks some partitions from finished to unfinished, `TaskSchedulerIImpl.stageIdToFinishedPartitions` should be updated as well.

This PR reverts #23871, let's think of a more robust idea later.

## How was this patch tested?

N/A

Closes #24359 from cloud-fan/revert.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2019-04-14 16:57:41 +08:00
parent eea3f55a31
commit 0bb716bac3
3 changed files with 19 additions and 78 deletions

View file

@ -22,7 +22,7 @@ import java.util.{Locale, Timer, TimerTask}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random
import org.apache.spark._
@ -101,9 +101,6 @@ private[spark] class TaskSchedulerImpl(
// Protected by `this`
val taskIdToExecutorId = new HashMap[Long, String]
// Protected by `this`
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet]
@volatile private var hasReceivedTask = false
@volatile private var hasLaunchedTask = false
private val starvationTimer = new Timer(true)
@ -252,20 +249,7 @@ private[spark] class TaskSchedulerImpl(
private[scheduler] def createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager = {
// only create a BitSet once for a certain stage since we only remove
// that stage when an active TaskSetManager succeed.
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
// TaskSet got submitted by DAGScheduler may have some already completed
// tasks since DAGScheduler does not always know all the tasks that have
// been completed by other tasksets when completing a stage, so we mark
// those tasks as finished here to avoid launching duplicate tasks, while
// holding the TaskSchedulerImpl lock.
// See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
}
tsm
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
}
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@ -886,31 +870,19 @@ private[spark] class TaskSchedulerImpl(
}
/**
* Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage.
* Marks the task has completed in all TaskSetManagers for the given stage.
*
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
* do not also submit those same tasks. That also means that a task completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
* And there is also the possibility that the DAGScheduler submits another taskset at the same
* time as we're marking a task completed here -- that taskset would have a task for a partition
* that was already completed. We maintain the set of finished partitions in
* stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset
* is submitted. See SPARK-25250 for more details.
*
* note: this method must be called with a lock on this.
*/
private[scheduler] def markPartitionCompletedInAllTaskSets(
stageId: Int,
partitionId: Int,
taskInfo: TaskInfo) = {
// if we do not find a BitSet for this stage, which means an active TaskSetManager
// has already succeeded and removed the stage.
stageIdToFinishedPartitions.get(stageId).foreach{
finishedPartitions => finishedPartitions += partitionId
}
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
tsm.markPartitionCompleted(partitionId, Some(taskInfo))
tsm.markPartitionCompleted(partitionId, taskInfo)
}
}

View file

@ -21,7 +21,7 @@ import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.math.max
import scala.util.control.NonFatal
@ -800,11 +800,7 @@ private[spark] class TaskSetManager(
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
// clean up finished partitions for the stage when the active TaskSetManager succeed
if (!isZombie) {
sched.stageIdToFinishedPartitions -= stageId
isZombie = true
}
isZombie = true
}
} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
@ -823,21 +819,16 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}
private[scheduler] def markPartitionCompleted(
partitionId: Int,
taskInfo: Option[TaskInfo]): Unit = {
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled && !isZombie) {
taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
successfulTaskDurations.insert(taskInfo.duration)
}
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
if (!isZombie) {
sched.stageIdToFinishedPartitions -= stageId
isZombie = true
}
isZombie = true
}
maybeFinishTaskSet()
}

View file

@ -1121,7 +1121,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
}
test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") {
test("Completions in zombie tasksets update status of non-zombie taskset") {
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
val valueSer = SparkEnv.get.serializer.newInstance()
@ -1133,9 +1133,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
// two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
// to really happen, you'd need the previous stage to also get restarted, and then succeed,
// in between each attempt, but that happens outside what we're mocking here.)
// two times, so we have three active task sets for one stage. (For this to really happen,
// you'd need the previous stage to also get restarted, and then succeed, in between each
// attempt, but that happens outside what we're mocking here.)
val zombieAttempts = (0 until 2).map { stageAttempt =>
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
taskScheduler.submitTasks(attempt)
@ -1152,33 +1152,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(tsm.runningTasks === 9)
tsm
}
// we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active
// attempt exists in taskScheduler by now.
// finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
// This is possible since the behaviour of submitting new attempt and handling successful task
// is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
// separately.
(0 until 2).foreach { i =>
completeTaskSuccessfully(zombieAttempts(i), i + 1)
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1))
}
// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
// the stage, but this time with insufficient resources so not all tasks are active.
// Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
// "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
// already completed tasks. And this time with insufficient resources so not all tasks are
// active.
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
taskScheduler.submitTasks(finalAttempt)
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
// Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should
// realize that 2 tasks have already completed, and mark them appropriately, so it won't launch
// any duplicate tasks later (SPARK-25250).
(0 until 2).map(_ + 1).foreach { partitionId =>
val index = finalTsm.partitionToIndex(partitionId)
assert(finalTsm.successful(index))
}
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
finalAttempt.tasks(task.index).partitionId
@ -1186,17 +1166,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(finalTsm.runningTasks === 5)
assert(!finalTsm.isZombie)
// We continually simulate late completions from our zombie tasksets(but this time, there's one
// active attempt exists in taskScheduler), corresponding to all the pending partitions in our
// final attempt. This means we're only waiting on the tasks we've already launched.
// We simulate late completions from our zombie tasksets, corresponding to all the pending
// partitions in our final attempt. This means we're only waiting on the tasks we've already
// launched.
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
finalAttemptPendingPartitions.foreach { partition =>
completeTaskSuccessfully(zombieAttempts(0), partition)
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
}
// If there is another resource offer, we shouldn't run anything. Though our final attempt
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
// remaining tasks to compute are already active in the non-zombie attempt.
assert(
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
@ -1244,7 +1223,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// perspective, as the failures weren't from a problem w/ the tasks themselves.
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), any())
}
assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
}
test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {