[SPARK-27347][MESOS] Fix supervised driver retry logic for outdated tasks
## What changes were proposed in this pull request? This patch fixes a bug where `--supervised` Spark jobs would retry multiple times whenever an agent would crash, come back, and re-register even when those jobs had already relaunched on a different agent. That is: ``` - supervised driver is running on agent1 - agent1 crashes - driver is relaunched on another agent as `<task-id>-retry-1` - agent1 comes back online and re-registers with scheduler - spark relaunches the same job as `<task-id>-retry-2` - now there are two jobs running simultaneously ``` This is because when an agent would come back and re-register it would send a status update `TASK_FAILED` for its old driver-task. Previous logic would indiscriminately remove the `submissionId` from Zookeeper's `launchedDrivers` node and add it to `retryList` node. Then, when a new offer came in, it would relaunch another `-retry-` task even though one was previously running. For example logs, scroll to bottom ## How was this patch tested? - Added a unit test to simulate behavior described above - Tested manually on a DC/OS cluster by ``` - launching a --supervised spark job - dcos node ssh <to the agent with the running spark-driver> - systemctl stop dcos-mesos-slave - docker kill <driver-container-id> - [ wait until spark job is relaunched ] - systemctl start dcos-mesos-slave - [ observe spark driver is not relaunched as `-retry-2` ] ``` Log snippets included below. Notice the `-retry-1` task is running when status update for the old task comes in afterward: ``` 19/01/15 19:21:38 TRACE MesosClusterScheduler: Received offers from Mesos: ... [offers] ... 19/01/15 19:21:39 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2532 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001" ... 19/01/15 19:21:42 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_STARTING message='' 19/01/15 19:21:43 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_RUNNING message='' ... 19/01/15 19:29:12 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_LOST message='health check timed out' reason=REASON_SLAVE_REMOVED ... 19/01/15 19:31:12 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2681 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-1" ... 19/01/15 19:31:15 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_STARTING message='' 19/01/15 19:31:16 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_RUNNING message='' ... 19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Unreachable agent re-reregistered' ... 19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Abnormal executor termination: unknown container' reason=REASON_EXECUTOR_TERMINATED 19/01/15 19:33:45 ERROR MesosClusterScheduler: Unable to find driver with driver-20190115192138-0001 in status update ... 19/01/15 19:33:47 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2729 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-2" ... 19/01/15 19:33:50 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_STARTING message='' 19/01/15 19:33:51 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_RUNNING message='' ``` Closes #24276 from samvantran/SPARK-27347-duplicate-retries. Authored-by: Sam Tran <stran@mesosphere.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
fa5dc0a45a
commit
bcd3b61c4b
|
@ -766,6 +766,10 @@ private[spark] class MesosClusterScheduler(
|
|||
val state = launchedDrivers(subId)
|
||||
// Check if the driver is supervise enabled and can be relaunched.
|
||||
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
|
||||
if (isTaskOutdated(taskId, state)) {
|
||||
// Prevent outdated task from overwriting a more recent status
|
||||
return
|
||||
}
|
||||
removeFromLaunchedDrivers(subId)
|
||||
state.finishDate = Some(new Date())
|
||||
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
|
||||
|
@ -786,6 +790,16 @@ private[spark] class MesosClusterScheduler(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the task is outdated i.e. has already been launched or is pending
|
||||
* If neither, the taskId is outdated and should be ignored
|
||||
* This is to avoid scenarios where an outdated status update arrives
|
||||
* after a supervised driver has already been relaunched
|
||||
*/
|
||||
private def isTaskOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean =
|
||||
taskId != state.taskId.getValue &&
|
||||
!pendingRetryDrivers.exists(_.submissionId == state.driverDescription.submissionId)
|
||||
|
||||
private def retireDriver(
|
||||
submissionId: String,
|
||||
state: MesosClusterSubmissionState) = {
|
||||
|
|
|
@ -450,6 +450,86 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
|
|||
assert(state.finishedDrivers.size == 1)
|
||||
}
|
||||
|
||||
test("SPARK-27347: do not restart outdated supervised drivers") {
|
||||
// Covers scenario where:
|
||||
// - agent goes down
|
||||
// - supervised job is relaunched on another agent
|
||||
// - first agent re-registers and sends status update: TASK_FAILED
|
||||
// - job should NOT be relaunched again
|
||||
val conf = new SparkConf()
|
||||
conf.setMaster("mesos://localhost:5050")
|
||||
conf.setAppName("SparkMesosDriverRetries")
|
||||
setScheduler(conf.getAll.toMap)
|
||||
|
||||
val mem = 1000
|
||||
val cpu = 1
|
||||
val offers = List(
|
||||
Utils.createOffer("o1", "s1", mem, cpu, None),
|
||||
Utils.createOffer("o2", "s2", mem, cpu, None),
|
||||
Utils.createOffer("o3", "s1", mem, cpu, None))
|
||||
|
||||
val response = scheduler.submitDriver(
|
||||
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
|
||||
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date()))
|
||||
assert(response.success)
|
||||
|
||||
// Offer a resource to launch the submitted driver
|
||||
scheduler.resourceOffers(driver, Collections.singletonList(offers.head))
|
||||
var state = scheduler.getSchedulerState()
|
||||
assert(state.launchedDrivers.size == 1)
|
||||
|
||||
// Signal agent lost with status with TASK_LOST
|
||||
val agent1 = SlaveID.newBuilder().setValue("s1").build()
|
||||
var taskStatus = TaskStatus.newBuilder()
|
||||
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
|
||||
.setSlaveId(agent1)
|
||||
.setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
|
||||
.setState(MesosTaskState.TASK_LOST)
|
||||
.build()
|
||||
|
||||
scheduler.statusUpdate(driver, taskStatus)
|
||||
state = scheduler.getSchedulerState()
|
||||
assert(state.pendingRetryDrivers.size == 1)
|
||||
assert(state.pendingRetryDrivers.head.submissionId == taskStatus.getTaskId.getValue)
|
||||
assert(state.launchedDrivers.isEmpty)
|
||||
|
||||
// Offer new resource to retry driver on a new agent
|
||||
Thread.sleep(1500) // sleep to cover nextRetry's default wait time of 1s
|
||||
scheduler.resourceOffers(driver, Collections.singletonList(offers(1)))
|
||||
|
||||
val agent2 = SlaveID.newBuilder().setValue("s2").build()
|
||||
taskStatus = TaskStatus.newBuilder()
|
||||
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
|
||||
.setSlaveId(agent2)
|
||||
.setState(MesosTaskState.TASK_RUNNING)
|
||||
.build()
|
||||
|
||||
scheduler.statusUpdate(driver, taskStatus)
|
||||
state = scheduler.getSchedulerState()
|
||||
assert(state.pendingRetryDrivers.isEmpty)
|
||||
assert(state.launchedDrivers.size == 1)
|
||||
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
|
||||
assert(state.launchedDrivers.head.taskId.getValue != taskStatus.getTaskId.getValue)
|
||||
|
||||
// Agent1 comes back online and sends status update: TASK_FAILED
|
||||
taskStatus = TaskStatus.newBuilder()
|
||||
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
|
||||
.setSlaveId(agent1)
|
||||
.setState(MesosTaskState.TASK_FAILED)
|
||||
.setMessage("Abnormal executor termination")
|
||||
.setReason(TaskStatus.Reason.REASON_EXECUTOR_TERMINATED)
|
||||
.build()
|
||||
|
||||
scheduler.statusUpdate(driver, taskStatus)
|
||||
scheduler.resourceOffers(driver, Collections.singletonList(offers.last))
|
||||
|
||||
// Assert driver does not restart 2nd time
|
||||
state = scheduler.getSchedulerState()
|
||||
assert(state.pendingRetryDrivers.isEmpty)
|
||||
assert(state.launchedDrivers.size == 1)
|
||||
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
|
||||
}
|
||||
|
||||
test("Declines offer with refuse seconds = 120.") {
|
||||
setScheduler()
|
||||
|
||||
|
|
Loading…
Reference in a new issue