[SPARK-15555][MESOS] Driver with --supervise option cannot be killed in Mesos mode

## What changes were proposed in this pull request?

Not adding the Killed applications for retry.
## How was this patch tested?

I have verified manually in the Mesos cluster, with the changes the killed applications move to Finished Drivers section and will not retry.

Author: Devaraj K <devaraj@apache.org>

Closes #13323 from devaraj-kavali/SPARK-15555.
This commit is contained in:
Devaraj K 2017-01-03 11:02:42 -08:00 committed by Marcelo Vanzin
parent 7a2b5f93bc
commit 89bf370e4f
2 changed files with 56 additions and 2 deletions

View file

@ -647,7 +647,6 @@ private[spark] class MesosClusterScheduler(
*/ */
private def shouldRelaunch(state: MesosTaskState): Boolean = { private def shouldRelaunch(state: MesosTaskState): Boolean = {
state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_FAILED ||
state == MesosTaskState.TASK_KILLED ||
state == MesosTaskState.TASK_LOST state == MesosTaskState.TASK_LOST
} }

View file

@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import org.apache.mesos.Protos._ import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Value.{Scalar, Type} import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.{ArgumentCaptor, Matchers}
@ -236,4 +236,59 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(networkInfos.size == 1) assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name") assert(networkInfos.get(0).getName == "test-network-name")
} }
test("can kill supervised drivers") {
val driver = mock[SchedulerDriver]
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
scheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
override def start(): Unit = {
ready = true
mesosDriver = driver
}
}
scheduler.start()
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "s1", new Date()))
assert(response.success)
val slaveId = SlaveID.newBuilder().setValue("s1").build()
val offer = Offer.newBuilder()
.addResources(
Resource.newBuilder().setRole("*")
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
.addResources(
Resource.newBuilder().setRole("*")
.setScalar(Scalar.newBuilder().setValue(1000).build())
.setName("mem")
.setType(Type.SCALAR))
.setId(OfferID.newBuilder().setValue("o1").build())
.setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
.setSlaveId(slaveId)
.setHostname("host1")
.build()
// Offer the resource to launch the submitted driver
scheduler.resourceOffers(driver, Collections.singletonList(offer))
var state = scheduler.getSchedulerState()
assert(state.launchedDrivers.size == 1)
// Issue the request to kill the launched driver
val killResponse = scheduler.killDriver(response.submissionId)
assert(killResponse.success)
val taskStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
.setSlaveId(slaveId)
.setState(MesosTaskState.TASK_KILLED)
.build()
// Update the status of the killed task
scheduler.statusUpdate(driver, taskStatus)
// Driver should be moved to finishedDrivers for kill
state = scheduler.getSchedulerState()
assert(state.pendingRetryDrivers.isEmpty)
assert(state.launchedDrivers.isEmpty)
assert(state.finishedDrivers.size == 1)
}
} }