From 7633933e54ffb08ab9d959be5f76c26fae29d1d9 Mon Sep 17 00:00:00 2001 From: Davis Shepherd Date: Thu, 27 Apr 2017 18:06:12 +0000 Subject: [PATCH] [SPARK-20483] Mesos Coarse mode may starve other Mesos frameworks ## What changes were proposed in this pull request? Set maxCores to be a multiple of the smallest executor that can be launched. This ensures that we correctly detect the condition where no more executors will be launched when spark.cores.max is not a multiple of spark.executor.cores ## How was this patch tested? This was manually tested with other sample frameworks measuring their incoming offers to determine if starvation would occur. dbtsai mgummelt Author: Davis Shepherd Closes #17786 from dgshep/fix_mesos_max_cores. --- .../MesosCoarseGrainedSchedulerBackend.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 2a36ec4fa8..8f5b97ccb1 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -60,8 +60,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt) + private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt) + + private val minCoresPerExecutor = executorCoresOption.getOrElse(1) + // Maximum number of cores to acquire - private val maxCores = maxCoresOption.getOrElse(Int.MaxValue) + private val maxCores = { + val cores = maxCoresOption.getOrElse(Int.MaxValue) + // Set maxCores to a multiple of smallest executor we can launch + cores - (cores % minCoresPerExecutor) + } private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) @@ -489,8 +497,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } private def executorCores(offerCPUs: Int): Int = { - sc.conf.getInt("spark.executor.cores", - math.min(offerCPUs, maxCores - totalCoresAcquired)) + executorCoresOption.getOrElse( + math.min(offerCPUs, maxCores - totalCoresAcquired) + ) } override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {