[SPARK-30359][CORE] Don't clear executorsPendingToRemove at the beginning of CoarseGrainedSchedulerBackend.reset

### What changes were proposed in this pull request?

Remove `executorsPendingToRemove.clear()` from `CoarseGrainedSchedulerBackend.reset()`.

### Why are the changes needed?

Clear `executorsPendingToRemove` before remove executors will cause all tasks running on those "pending to remove" executors to count failures. But that's not true for the case of `executorsPendingToRemove(execId)=true`.

Besides, `executorsPendingToRemove` will be cleaned up within `removeExecutor()` at the end just as same as `executorsPendingLossReason`.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added a new test in `TaskSetManagerSuite`.

Closes #27017 from Ngone51/dont-clear-eptr-in-reset.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
yi.wu 2020-01-03 22:54:05 +08:00 committed by Wenchen Fan
parent 568ad4e77a
commit 4a093176ea
3 changed files with 69 additions and 6 deletions

View file

@ -95,9 +95,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors we have requested the cluster manager to kill that have not died yet; maps
// the executor ID to whether it was explicitly killed by the driver (and thus shouldn't
// be considered an app-related failure).
// be considered an app-related failure). Visible for testing only.
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private val executorsPendingToRemove = new HashMap[String, Boolean]
private[scheduler] val executorsPendingToRemove = new HashMap[String, Boolean]
// Executors that have been lost, but for which we don't yet know the real exit reason.
private val executorsPendingLossReason = new HashSet[String]
@ -487,12 +487,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
* be called in the yarn-client mode when AM re-registers after a failure.
* Visible for testing only.
* */
protected def reset(): Unit = {
protected[scheduler] def reset(): Unit = {
val executors: Set[String] = synchronized {
requestedTotalExecutors = 0
numPendingExecutors = 0
executorsPendingToRemove.clear()
executorDataMap.keys.toSet
}

View file

@ -21,18 +21,22 @@ import java.util.{Properties, Random}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import org.apache.hadoop.fs.FileAlreadyExistsException
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyString}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.scalatest.Assertions._
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.Eventually
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ManualClock}
@ -179,7 +183,12 @@ class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) {
override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
}
class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logging {
class TaskSetManagerSuite
extends SparkFunSuite
with LocalSparkContext
with PrivateMethodTester
with Eventually
with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL}
private val conf = new SparkConf
@ -1894,4 +1903,58 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
manager.handleFailedTask(offerResult.get.taskId, TaskState.FAILED, reason)
assert(sched.taskSetsFailed.contains(taskSet.id))
}
test("SPARK-30359: don't clean executorsPendingToRemove " +
"at the beginning of CoarseGrainedSchedulerBackend.reset") {
val conf = new SparkConf()
// use local-cluster mode in order to get CoarseGrainedSchedulerBackend
.setMaster("local-cluster[2, 1, 2048]")
// allow to set up at most two executors
.set("spark.cores.max", "2")
.setAppName("CoarseGrainedSchedulerBackend.reset")
sc = new SparkContext(conf)
val sched = sc.taskScheduler
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val Seq(exec0, exec1) = backend.getExecutorIds()
val taskSet = FakeTask.createTaskSet(2)
val stageId = taskSet.stageId
val stageAttemptId = taskSet.stageAttemptId
sched.submitTasks(taskSet)
val taskSetManagers = PrivateMethod[mutable.HashMap[Int, mutable.HashMap[Int, TaskSetManager]]](
Symbol("taskSetsByStageIdAndAttempt"))
// get the TaskSetManager
val manager = sched.invokePrivate(taskSetManagers()).get(stageId).get(stageAttemptId)
val task0 = manager.resourceOffer(exec0, "localhost", TaskLocality.NO_PREF)
val task1 = manager.resourceOffer(exec1, "localhost", TaskLocality.NO_PREF)
assert(task0.isDefined && task1.isDefined)
val (taskId0, index0) = (task0.get.taskId, task0.get.index)
val (taskId1, index1) = (task1.get.taskId, task1.get.index)
// set up two running tasks
assert(manager.taskInfos(taskId0).running)
assert(manager.taskInfos(taskId0).executorId === exec0)
assert(manager.taskInfos(taskId1).running)
assert(manager.taskInfos(taskId1).executorId === exec1)
val numFailures = PrivateMethod[Array[Int]](Symbol("numFailures"))
// no task failures yet
assert(manager.invokePrivate(numFailures())(index0) === 0)
assert(manager.invokePrivate(numFailures())(index1) === 0)
// let exec1 count task failures but exec0 doesn't
backend.executorsPendingToRemove(exec0) = true
backend.executorsPendingToRemove(exec1) = false
backend.reset()
eventually(timeout(10.seconds), interval(100.milliseconds)) {
// executorsPendingToRemove should eventually be empty after reset()
assert(backend.executorsPendingToRemove.isEmpty)
assert(manager.invokePrivate(numFailures())(index0) === 0)
assert(manager.invokePrivate(numFailures())(index1) === 1)
}
}
}

View file

@ -198,7 +198,7 @@ private[spark] abstract class YarnSchedulerBackend(
* and re-registered itself to driver after a failure. The stale state in driver should be
* cleaned.
*/
override protected def reset(): Unit = {
override protected[scheduler] def reset(): Unit = {
super.reset()
sc.executorAllocationManager.foreach(_.reset())
}