diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index f384290a6f..c5bbcb9bef 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -647,7 +647,6 @@ private[spark] class MesosClusterScheduler( */ private def shouldRelaunch(state: MesosTaskState): Boolean = { state == MesosTaskState.TASK_FAILED || - state == MesosTaskState.TASK_KILLED || state == MesosTaskState.TASK_LOST } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 74e5ce227d..b9d098486b 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date} import scala.collection.JavaConverters._ -import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.Protos.Value.{Scalar, Type} import org.apache.mesos.SchedulerDriver import org.mockito.{ArgumentCaptor, Matchers} @@ -236,4 +236,59 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi assert(networkInfos.size == 1) assert(networkInfos.get(0).getName == "test-network-name") } + + test("can kill supervised drivers") { + val driver = mock[SchedulerDriver] + val conf = new SparkConf() + conf.setMaster("mesos://localhost:5050") + conf.setAppName("spark mesos") + scheduler = new MesosClusterScheduler( + new BlackHoleMesosClusterPersistenceEngineFactory, conf) { + override def start(): Unit = { + ready = true + mesosDriver = driver + } + } + scheduler.start() + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 100, 1, true, command, + Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "s1", new Date())) + assert(response.success) + val slaveId = SlaveID.newBuilder().setValue("s1").build() + val offer = Offer.newBuilder() + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR)) + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1000).build()) + .setName("mem") + .setType(Type.SCALAR)) + .setId(OfferID.newBuilder().setValue("o1").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build()) + .setSlaveId(slaveId) + .setHostname("host1") + .build() + // Offer the resource to launch the submitted driver + scheduler.resourceOffers(driver, Collections.singletonList(offer)) + var state = scheduler.getSchedulerState() + assert(state.launchedDrivers.size == 1) + // Issue the request to kill the launched driver + val killResponse = scheduler.killDriver(response.submissionId) + assert(killResponse.success) + + val taskStatus = TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build()) + .setSlaveId(slaveId) + .setState(MesosTaskState.TASK_KILLED) + .build() + // Update the status of the killed task + scheduler.statusUpdate(driver, taskStatus) + // Driver should be moved to finishedDrivers for kill + state = scheduler.getSchedulerState() + assert(state.pendingRetryDrivers.isEmpty) + assert(state.launchedDrivers.isEmpty) + assert(state.finishedDrivers.size == 1) + } }