[SPARK-19702][MESOS] Increase default refuse_seconds timeout in the Mesos Spark Dispatcher

## What changes were proposed in this pull request?

Increase default refuse_seconds timeout, and make it configurable.  See JIRA for details on how this reduces the risk of starvation.

## How was this patch tested?

Unit tests, Manual testing, and Mesos/Spark integration test suite

cc susanxhuynh skonto jmlvanre

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #17031 from mgummelt/SPARK-19702-suppress-revive.
This commit is contained in:
Michael Gummelt 2017-03-07 21:29:08 +00:00 committed by Sean Owen
parent 6f4684622a
commit 2e30c0b9bc
7 changed files with 187 additions and 105 deletions

View file

@ -152,6 +152,7 @@ private[spark] class MesosClusterScheduler(
// is registered with Mesos master.
@volatile protected var ready = false
private var masterInfo: Option[MasterInfo] = None
private var schedulerDriver: SchedulerDriver = _
def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
val c = new CreateSubmissionResponse
@ -168,9 +169,8 @@ private[spark] class MesosClusterScheduler(
return c
}
c.submissionId = desc.submissionId
queuedDriversState.persist(desc.submissionId, desc)
queuedDrivers += desc
c.success = true
addDriverToQueue(desc)
}
c
}
@ -191,7 +191,7 @@ private[spark] class MesosClusterScheduler(
// 4. Check if it has already completed.
if (launchedDrivers.contains(submissionId)) {
val task = launchedDrivers(submissionId)
mesosDriver.killTask(task.taskId)
schedulerDriver.killTask(task.taskId)
k.success = true
k.message = "Killing running driver"
} else if (removeFromQueuedDrivers(submissionId)) {
@ -324,7 +324,7 @@ private[spark] class MesosClusterScheduler(
ready = false
metricsSystem.report()
metricsSystem.stop()
mesosDriver.stop(true)
schedulerDriver.stop(true)
}
override def registered(
@ -340,6 +340,8 @@ private[spark] class MesosClusterScheduler(
stateLock.synchronized {
this.masterInfo = Some(masterInfo)
this.schedulerDriver = driver
if (!pendingRecover.isEmpty) {
// Start task reconciliation if we need to recover.
val statuses = pendingRecover.collect {
@ -506,11 +508,10 @@ private[spark] class MesosClusterScheduler(
}
private class ResourceOffer(
val offerId: OfferID,
val slaveId: SlaveID,
var resources: JList[Resource]) {
val offer: Offer,
var remainingResources: JList[Resource]) {
override def toString(): String = {
s"Offer id: ${offerId}, resources: ${resources}"
s"Offer id: ${offer.getId}, resources: ${remainingResources}"
}
}
@ -518,16 +519,16 @@ private[spark] class MesosClusterScheduler(
val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.resources, "cpus", desc.cores)
partitionResources(offer.remainingResources, "cpus", desc.cores)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", desc.mem)
offer.resources = finalResources.asJava
offer.remainingResources = finalResources.asJava
val appName = desc.conf.get("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.slaveId)
.setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
@ -549,23 +550,29 @@ private[spark] class MesosClusterScheduler(
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o =>
getResource(o.resources, "cpus") >= driverCpu &&
getResource(o.resources, "mem") >= driverMem
val offerOption = currentOffers.find { offer =>
getResource(offer.remainingResources, "cpus") >= driverCpu &&
getResource(offer.remainingResources, "mem") >= driverMem
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
try {
val task = createTaskInfo(submission, offer)
queuedTasks += task
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
None, new Date(), None, getDriverFrameworkID(submission))
val newState = new MesosClusterSubmissionState(
submission,
task.getTaskId,
offer.offer.getSlaveId,
None,
new Date(),
None,
getDriverFrameworkID(submission))
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
@ -588,7 +595,7 @@ private[spark] class MesosClusterScheduler(
val currentTime = new Date()
val currentOffers = offers.asScala.map {
o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
offer => new ResourceOffer(offer, offer.getResourcesList)
}.toList
stateLock.synchronized {
@ -615,8 +622,8 @@ private[spark] class MesosClusterScheduler(
driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
}
for (o <- currentOffers if !tasks.contains(o.offerId)) {
driver.declineOffer(o.offerId)
for (offer <- currentOffers if !tasks.contains(offer.offer.getId)) {
declineOffer(driver, offer.offer, None, Some(getRejectOfferDuration(conf)))
}
}
@ -662,6 +669,12 @@ private[spark] class MesosClusterScheduler(
override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
val taskId = status.getTaskId.getValue
logInfo(s"Received status update: taskId=${taskId}" +
s" state=${status.getState}" +
s" message=${status.getMessage}" +
s" reason=${status.getReason}");
stateLock.synchronized {
if (launchedDrivers.contains(taskId)) {
if (status.getReason == Reason.REASON_RECONCILIATION &&
@ -682,8 +695,7 @@ private[spark] class MesosClusterScheduler(
val newDriverDescription = state.driverDescription.copy(
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
pendingRetryDrivers += newDriverDescription
pendingRetryDriversState.persist(taskId, newDriverDescription)
addDriverToPending(newDriverDescription, taskId);
} else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
removeFromLaunchedDrivers(taskId)
state.finishDate = Some(new Date())
@ -746,4 +758,21 @@ private[spark] class MesosClusterScheduler(
def getQueuedDriversSize: Int = queuedDrivers.size
def getLaunchedDriversSize: Int = launchedDrivers.size
def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
private def addDriverToQueue(desc: MesosDriverDescription): Unit = {
queuedDriversState.persist(desc.submissionId, desc)
queuedDrivers += desc
revive()
}
private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = {
pendingRetryDriversState.persist(taskId, desc)
pendingRetryDrivers += desc
revive()
}
private def revive(): Unit = {
logInfo("Reviving Offers.")
schedulerDriver.reviveOffers()
}
}

View file

@ -26,6 +26,7 @@ import scala.collection.mutable
import scala.concurrent.Future
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.network.netty.SparkTransportConf
@ -119,11 +120,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)
getRejectOfferDurationForUnmetConstraints(sc.conf)
// Reject offers when we reached the maximum number of cores for this framework
private val rejectOfferDurationForReachedMaxCores =
getRejectOfferDurationForReachedMaxCores(sc)
getRejectOfferDurationForReachedMaxCores(sc.conf)
// A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
@ -146,6 +147,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
@volatile var appId: String = _
private var schedulerDriver: SchedulerDriver = _
def newMesosTaskId(): String = {
val id = nextMesosTaskId
nextMesosTaskId += 1
@ -252,9 +255,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
override def registered(
d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
appId = frameworkId.getValue
mesosExternalShuffleClient.foreach(_.init(appId))
driver: org.apache.mesos.SchedulerDriver,
frameworkId: FrameworkID,
masterInfo: MasterInfo) {
this.appId = frameworkId.getValue
this.mesosExternalShuffleClient.foreach(_.init(appId))
this.schedulerDriver = driver
markRegistered()
}
@ -293,46 +299,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
private def declineUnmatchedOffers(
d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
offers.foreach { offer =>
declineOffer(d, offer, Some("unmet constraints"),
declineOffer(
driver,
offer,
Some("unmet constraints"),
Some(rejectOfferDurationForUnmetConstraints))
}
}
private def declineOffer(
d: org.apache.mesos.SchedulerDriver,
offer: Offer,
reason: Option[String] = None,
refuseSeconds: Option[Long] = None): Unit = {
val id = offer.getId.getValue
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")
val ports = getRangeResource(offer.getResourcesList, "ports")
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
reason.map(r => s" (reason: $r)").getOrElse(""))
refuseSeconds match {
case Some(seconds) =>
val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
d.declineOffer(offer.getId, filters)
case _ => d.declineOffer(offer.getId)
}
}
/**
* Launches executors on accepted offers, and declines unused offers. Executors are launched
* round-robin on offers.
*
* @param d SchedulerDriver
* @param driver SchedulerDriver
* @param offers Mesos offers that match attribute constraints
*/
private def handleMatchedOffers(
d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
val tasks = buildMesosTasks(offers)
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
@ -358,15 +343,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
s" ports: $ports")
}
d.launchTasks(
driver.launchTasks(
Collections.singleton(offer.getId),
offerTasks.asJava)
} else if (totalCoresAcquired >= maxCores) {
// Reject an offer for a configurable amount of time to avoid starving other frameworks
declineOffer(d, offer, Some("reached spark.cores.max"),
declineOffer(driver,
offer,
Some("reached spark.cores.max"),
Some(rejectOfferDurationForReachedMaxCores))
} else {
declineOffer(d, offer)
declineOffer(
driver,
offer)
}
}
}
@ -582,8 +571,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Close the mesos external shuffle client if used
mesosExternalShuffleClient.foreach(_.close())
if (mesosDriver != null) {
mesosDriver.stop()
if (schedulerDriver != null) {
schedulerDriver.stop()
}
}
@ -634,13 +623,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
if (mesosDriver == null) {
if (schedulerDriver == null) {
logWarning("Asked to kill executors before the Mesos driver was started.")
false
} else {
for (executorId <- executorIds) {
val taskId = TaskID.newBuilder().setValue(executorId).build()
mesosDriver.killTask(taskId)
schedulerDriver.killTask(taskId)
}
// no need to adjust `executorLimitOption` since the AllocationManager already communicated
// the desired limit through a call to `doRequestTotalExecutors`.

View file

@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver
import org.apache.mesos.protobuf.ByteString
import org.apache.spark.{SparkContext, SparkException, TaskState}
@ -65,7 +66,9 @@ private[spark] class MesosFineGrainedSchedulerBackend(
// reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)
getRejectOfferDurationForUnmetConstraints(sc.conf)
private var schedulerDriver: SchedulerDriver = _
@volatile var appId: String = _
@ -89,6 +92,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
/**
* Creates a MesosExecutorInfo that is used to launch a Mesos executor.
*
* @param availableResources Available resources that is offered by Mesos
* @param execId The executor id to assign to this new executor.
* @return A tuple of the new mesos executor info and the remaining available resources.
@ -178,10 +182,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
override def registered(
d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
driver: org.apache.mesos.SchedulerDriver,
frameworkId: FrameworkID,
masterInfo: MasterInfo) {
inClassLoader() {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
this.schedulerDriver = driver
markRegistered()
}
}
@ -383,13 +390,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
}
override def stop() {
if (mesosDriver != null) {
mesosDriver.stop()
if (schedulerDriver != null) {
schedulerDriver.stop()
}
}
override def reviveOffers() {
mesosDriver.reviveOffers()
schedulerDriver.reviveOffers()
}
override def frameworkMessage(
@ -426,7 +433,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
mesosDriver.killTask(
schedulerDriver.killTask(
TaskID.newBuilder()
.setValue(taskId.toString).build()
)

View file

@ -46,9 +46,6 @@ trait MesosSchedulerUtils extends Logging {
// Lock used to wait for scheduler to be registered
private final val registerLatch = new CountDownLatch(1)
// Driver for talking to Mesos
protected var mesosDriver: SchedulerDriver = null
/**
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
*
@ -115,10 +112,6 @@ trait MesosSchedulerUtils extends Logging {
*/
def startScheduler(newDriver: SchedulerDriver): Unit = {
synchronized {
if (mesosDriver != null) {
registerLatch.await()
return
}
@volatile
var error: Option[Exception] = None
@ -128,8 +121,7 @@ trait MesosSchedulerUtils extends Logging {
setDaemon(true)
override def run() {
try {
mesosDriver = newDriver
val ret = mesosDriver.run()
val ret = newDriver.run()
logInfo("driver.run() returned with code " + ret)
if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
@ -379,12 +371,24 @@ trait MesosSchedulerUtils extends Logging {
}
}
protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
private def getRejectOfferDurationStr(conf: SparkConf): String = {
conf.get("spark.mesos.rejectOfferDuration", "120s")
}
protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
protected def getRejectOfferDuration(conf: SparkConf): Long = {
Utils.timeStringAsSeconds(getRejectOfferDurationStr(conf))
}
protected def getRejectOfferDurationForUnmetConstraints(conf: SparkConf): Long = {
conf.getTimeAsSeconds(
"spark.mesos.rejectOfferDurationForUnmetConstraints",
getRejectOfferDurationStr(conf))
}
protected def getRejectOfferDurationForReachedMaxCores(conf: SparkConf): Long = {
conf.getTimeAsSeconds(
"spark.mesos.rejectOfferDurationForReachedMaxCores",
getRejectOfferDurationStr(conf))
}
/**
@ -438,6 +442,7 @@ trait MesosSchedulerUtils extends Logging {
/**
* The values of the non-zero ports to be used by the executor process.
*
* @param conf the spark config to use
* @return the ono-zero values of the ports
*/
@ -521,4 +526,33 @@ trait MesosSchedulerUtils extends Logging {
case TaskState.KILLED => MesosTaskState.TASK_KILLED
case TaskState.LOST => MesosTaskState.TASK_LOST
}
protected def declineOffer(
driver: org.apache.mesos.SchedulerDriver,
offer: Offer,
reason: Option[String] = None,
refuseSeconds: Option[Long] = None): Unit = {
val id = offer.getId.getValue
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")
val ports = getRangeResource(offer.getResourcesList, "ports")
logDebug(s"Declining offer: $id with " +
s"attributes: $offerAttributes " +
s"mem: $mem " +
s"cpu: $cpus " +
s"port: $ports " +
refuseSeconds.map(s => s"for ${s} seconds ").getOrElse("") +
reason.map(r => s" (reason: $r)").getOrElse(""))
refuseSeconds match {
case Some(seconds) =>
val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
driver.declineOffer(offer.getId, filters)
case _ =>
driver.declineOffer(offer.getId)
}
}
}

View file

@ -53,19 +53,32 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
override def start(): Unit = { ready = true }
}
scheduler.start()
scheduler.registered(driver, Utils.TEST_FRAMEWORK_ID, Utils.TEST_MASTER_INFO)
}
private def testDriverDescription(submissionId: String): MesosDriverDescription = {
new MesosDriverDescription(
"d1",
"jar",
1000,
1,
true,
command,
Map[String, String](),
submissionId,
new Date())
}
test("can queue drivers") {
setScheduler()
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date()))
val response = scheduler.submitDriver(testDriverDescription("s1"))
assert(response.success)
val response2 =
scheduler.submitDriver(new MesosDriverDescription(
"d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
verify(driver, times(1)).reviveOffers()
val response2 = scheduler.submitDriver(testDriverDescription("s2"))
assert(response2.success)
val state = scheduler.getSchedulerState()
val queuedDrivers = state.queuedDrivers.toList
assert(queuedDrivers(0).submissionId == response.submissionId)
@ -75,9 +88,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
test("can kill queued drivers") {
setScheduler()
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date()))
val response = scheduler.submitDriver(testDriverDescription("s1"))
assert(response.success)
val killResponse = scheduler.killDriver(response.submissionId)
assert(killResponse.success)
@ -238,18 +249,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("can kill supervised drivers") {
val driver = mock[SchedulerDriver]
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
scheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
override def start(): Unit = {
ready = true
mesosDriver = driver
}
}
scheduler.start()
setScheduler(conf.getAll.toMap)
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
@ -291,4 +294,16 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(state.launchedDrivers.isEmpty)
assert(state.finishedDrivers.size == 1)
}
test("Declines offer with refuse seconds = 120.") {
setScheduler()
val filter = Filters.newBuilder().setRefuseSeconds(120).build()
val offerId = OfferID.newBuilder().setValue("o1").build()
val offer = Utils.createOffer(offerId.getValue, "s1", 1000, 1)
scheduler.resourceOffers(driver, Collections.singletonList(offer))
verify(driver, times(1)).declineOffer(offerId, filter)
}
}

View file

@ -552,17 +552,14 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
// override to avoid race condition with the driver thread on `mesosDriver`
override def startScheduler(newDriver: SchedulerDriver): Unit = {
mesosDriver = newDriver
}
override def startScheduler(newDriver: SchedulerDriver): Unit = {}
override def stopExecutors(): Unit = {
stopCalled = true
}
markRegistered()
}
backend.start()
backend.registered(driver, Utils.TEST_FRAMEWORK_ID, Utils.TEST_MASTER_INFO)
backend
}

View file

@ -28,6 +28,17 @@ import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
object Utils {
val TEST_FRAMEWORK_ID = FrameworkID.newBuilder()
.setValue("test-framework-id")
.build()
val TEST_MASTER_INFO = MasterInfo.newBuilder()
.setId("test-master")
.setIp(0)
.setPort(0)
.build()
def createOffer(
offerId: String,
slaveId: String,