[SPARK-11714][MESOS] Make Spark on Mesos honor port restrictions on coarse grain mode
- Make mesos coarse grained scheduler accept port offers and pre-assign ports Previous attempt was for fine grained: https://github.com/apache/spark/pull/10808 Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com> Author: Stavros Kontopoulos <stavros.kontopoulos@typesafe.com> Closes #11157 from skonto/honour_ports_coarse.
This commit is contained in:
parent
2a3d286f34
commit
1a028bdefa
|
@ -231,6 +231,7 @@ object SparkEnv extends Logging {
|
|||
conf.set("spark.driver.port", rpcEnv.address.port.toString)
|
||||
} else if (rpcEnv.address != null) {
|
||||
conf.set("spark.executor.port", rpcEnv.address.port.toString)
|
||||
logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
|
||||
}
|
||||
|
||||
// Create an instance of the class with the given name, possibly initializing it with our conf
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantLock
|
|||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable
|
||||
import scala.collection.mutable.{Buffer, HashMap, HashSet}
|
||||
|
||||
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
|
||||
|
||||
|
@ -71,13 +70,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
|
||||
|
||||
// Cores we have acquired with each Mesos task ID
|
||||
val coresByTaskId = new HashMap[String, Int]
|
||||
val coresByTaskId = new mutable.HashMap[String, Int]
|
||||
var totalCoresAcquired = 0
|
||||
|
||||
// SlaveID -> Slave
|
||||
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
|
||||
// we need to maintain e.g. failure state and connection state.
|
||||
private val slaves = new HashMap[String, Slave]
|
||||
private val slaves = new mutable.HashMap[String, Slave]
|
||||
|
||||
/**
|
||||
* The total number of executors we aim to have. Undefined when not using dynamic allocation.
|
||||
|
@ -285,7 +284,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
}
|
||||
|
||||
private def declineUnmatchedOffers(
|
||||
d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = {
|
||||
d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
|
||||
offers.foreach { offer =>
|
||||
declineOffer(d, offer, Some("unmet constraints"),
|
||||
Some(rejectOfferDurationForUnmetConstraints))
|
||||
|
@ -302,9 +301,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
val offerAttributes = toAttributeMap(offer.getAttributesList)
|
||||
val mem = getResource(offer.getResourcesList, "mem")
|
||||
val cpus = getResource(offer.getResourcesList, "cpus")
|
||||
val ports = getRangeResource(offer.getResourcesList, "ports")
|
||||
|
||||
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
|
||||
s" cpu: $cpus for $refuseSeconds seconds" +
|
||||
s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
|
||||
reason.map(r => s" (reason: $r)").getOrElse(""))
|
||||
|
||||
refuseSeconds match {
|
||||
|
@ -323,26 +323,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
* @param offers Mesos offers that match attribute constraints
|
||||
*/
|
||||
private def handleMatchedOffers(
|
||||
d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = {
|
||||
d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
|
||||
val tasks = buildMesosTasks(offers)
|
||||
for (offer <- offers) {
|
||||
val offerAttributes = toAttributeMap(offer.getAttributesList)
|
||||
val offerMem = getResource(offer.getResourcesList, "mem")
|
||||
val offerCpus = getResource(offer.getResourcesList, "cpus")
|
||||
val offerPorts = getRangeResource(offer.getResourcesList, "ports")
|
||||
val id = offer.getId.getValue
|
||||
|
||||
if (tasks.contains(offer.getId)) { // accept
|
||||
val offerTasks = tasks(offer.getId)
|
||||
|
||||
logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
|
||||
s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.")
|
||||
s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." +
|
||||
s" Launching ${offerTasks.size} Mesos tasks.")
|
||||
|
||||
for (task <- offerTasks) {
|
||||
val taskId = task.getTaskId
|
||||
val mem = getResource(task.getResourcesList, "mem")
|
||||
val cpus = getResource(task.getResourcesList, "cpus")
|
||||
val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")
|
||||
|
||||
logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.")
|
||||
logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
|
||||
s" ports: $ports")
|
||||
}
|
||||
|
||||
d.launchTasks(
|
||||
|
@ -365,9 +369,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
* @param offers Mesos offers that match attribute constraints
|
||||
* @return A map from OfferID to a list of Mesos tasks to launch on that offer
|
||||
*/
|
||||
private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
|
||||
private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
|
||||
// offerID -> tasks
|
||||
val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
|
||||
val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
|
||||
|
||||
// offerID -> resources
|
||||
val remainingResources = mutable.Map(offers.map(offer =>
|
||||
|
@ -397,18 +401,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
|
||||
slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
|
||||
|
||||
val (afterCPUResources, cpuResourcesToUse) =
|
||||
partitionResources(resources, "cpus", taskCPUs)
|
||||
val (resourcesLeft, memResourcesToUse) =
|
||||
partitionResources(afterCPUResources.asJava, "mem", taskMemory)
|
||||
val (resourcesLeft, resourcesToUse) =
|
||||
partitionTaskResources(resources, taskCPUs, taskMemory)
|
||||
|
||||
val taskBuilder = MesosTaskInfo.newBuilder()
|
||||
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
|
||||
.setSlaveId(offer.getSlaveId)
|
||||
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
|
||||
.setName("Task " + taskId)
|
||||
.addAllResources(cpuResourcesToUse.asJava)
|
||||
.addAllResources(memResourcesToUse.asJava)
|
||||
|
||||
taskBuilder.addAllResources(resourcesToUse.asJava)
|
||||
|
||||
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
|
||||
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
|
||||
|
@ -428,18 +430,39 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
tasks.toMap
|
||||
}
|
||||
|
||||
/** Extracts task needed resources from a list of available resources. */
|
||||
private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
|
||||
: (List[Resource], List[Resource]) = {
|
||||
|
||||
// partition cpus & mem
|
||||
val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
|
||||
val (afterMemResources, memResourcesToUse) =
|
||||
partitionResources(afterCPUResources.asJava, "mem", taskMemory)
|
||||
|
||||
// If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
|
||||
// on the same host. This essentially means one executor per host.
|
||||
// TODO: handle network isolator case
|
||||
val (nonPortResources, portResourcesToUse) =
|
||||
partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)
|
||||
|
||||
(nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
|
||||
}
|
||||
|
||||
private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
|
||||
val offerMem = getResource(resources, "mem")
|
||||
val offerCPUs = getResource(resources, "cpus").toInt
|
||||
val cpus = executorCores(offerCPUs)
|
||||
val mem = executorMemory(sc)
|
||||
val ports = getRangeResource(resources, "ports")
|
||||
val meetsPortRequirements = checkPorts(sc.conf, ports)
|
||||
|
||||
cpus > 0 &&
|
||||
cpus <= offerCPUs &&
|
||||
cpus + totalCoresAcquired <= maxCores &&
|
||||
mem <= offerMem &&
|
||||
numExecutors() < executorLimit &&
|
||||
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES
|
||||
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
|
||||
meetsPortRequirements
|
||||
}
|
||||
|
||||
private def executorCores(offerCPUs: Int): Int = {
|
||||
|
@ -613,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
}
|
||||
|
||||
private class Slave(val hostname: String) {
|
||||
val taskIDs = new HashSet[String]()
|
||||
val taskIDs = new mutable.HashSet[String]()
|
||||
var taskFailures = 0
|
||||
var shuffleRegistered = false
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
|
|||
|
||||
/**
|
||||
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
|
||||
*
|
||||
* @param masterUrl The url to connect to Mesos master
|
||||
* @param scheduler the scheduler class to receive scheduler callbacks
|
||||
* @param sparkUser User to impersonate with when running tasks
|
||||
|
@ -147,6 +148,20 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
|
|||
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms a range resource to a list of ranges
|
||||
*
|
||||
* @param res the mesos resource list
|
||||
* @param name the name of the resource
|
||||
* @return the list of ranges returned
|
||||
*/
|
||||
protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = {
|
||||
// A resource can have multiple values in the offer since it can either be from
|
||||
// a specific role or wildcard.
|
||||
res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala
|
||||
.map(r => (r.getBegin, r.getEnd)).toList).toList
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal that the scheduler has registered with Mesos.
|
||||
*/
|
||||
|
@ -172,6 +187,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
|
|||
/**
|
||||
* Partition the existing set of resources into two groups, those remaining to be
|
||||
* scheduled and those requested to be used for a new task.
|
||||
*
|
||||
* @param resources The full list of available resources
|
||||
* @param resourceName The name of the resource to take from the available resources
|
||||
* @param amountToUse The amount of resources to take from the available resources
|
||||
|
@ -223,7 +239,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
|
|||
/**
|
||||
* Converts the attributes from the resource offer into a Map of name -> Attribute Value
|
||||
* The attribute values are the mesos attribute types and they are
|
||||
* @param offerAttributes
|
||||
*
|
||||
* @param offerAttributes the attributes offered
|
||||
* @return
|
||||
*/
|
||||
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
|
||||
|
@ -333,6 +350,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
|
|||
/**
|
||||
* Return the amount of memory to allocate to each executor, taking into account
|
||||
* container overheads.
|
||||
*
|
||||
* @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value
|
||||
* @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
|
||||
* (whichever is larger)
|
||||
|
@ -357,6 +375,111 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
|
|||
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks executor ports if they are within some range of the offered list of ports ranges,
|
||||
*
|
||||
* @param conf the Spark Config
|
||||
* @param ports the list of ports to check
|
||||
* @return true if ports are within range false otherwise
|
||||
*/
|
||||
protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean = {
|
||||
|
||||
def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
|
||||
ps.exists{case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port }
|
||||
}
|
||||
|
||||
val portsToCheck = nonZeroPortValuesFromConfig(conf)
|
||||
val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
|
||||
// make sure we have enough ports to allocate per offer
|
||||
val enoughPorts =
|
||||
ports.map{case (rangeStart, rangeEnd) => rangeEnd - rangeStart + 1}.sum >= portsToCheck.size
|
||||
enoughPorts && withinRange
|
||||
}
|
||||
|
||||
/**
|
||||
* Partitions port resources.
|
||||
*
|
||||
* @param requestedPorts non-zero ports to assign
|
||||
* @param offeredResources the resources offered
|
||||
* @return resources left, port resources to be used.
|
||||
*/
|
||||
def partitionPortResources(requestedPorts: List[Long], offeredResources: List[Resource])
|
||||
: (List[Resource], List[Resource]) = {
|
||||
if (requestedPorts.isEmpty) {
|
||||
(offeredResources, List[Resource]())
|
||||
} else {
|
||||
// partition port offers
|
||||
val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources)
|
||||
|
||||
val portsAndRoles = requestedPorts.
|
||||
map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))
|
||||
|
||||
val assignedPortResources = createResourcesFromPorts(portsAndRoles)
|
||||
|
||||
// ignore non-assigned port resources, they will be declined implicitly by mesos
|
||||
// no need for splitting port resources.
|
||||
(resourcesWithoutPorts, assignedPortResources)
|
||||
}
|
||||
}
|
||||
|
||||
val managedPortNames = List("spark.executor.port", "spark.blockManager.port")
|
||||
|
||||
/**
|
||||
* The values of the non-zero ports to be used by the executor process.
|
||||
* @param conf the spark config to use
|
||||
* @return the ono-zero values of the ports
|
||||
*/
|
||||
def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
|
||||
managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0)
|
||||
}
|
||||
|
||||
/** Creates a mesos resource for a specific port number. */
|
||||
private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = {
|
||||
portsAndRoles.flatMap{ case (port, role) =>
|
||||
createMesosPortResource(List((port, port)), Some(role))}
|
||||
}
|
||||
|
||||
/** Helper to create mesos resources for specific port ranges. */
|
||||
private def createMesosPortResource(
|
||||
ranges: List[(Long, Long)],
|
||||
role: Option[String] = None): List[Resource] = {
|
||||
ranges.map { case (rangeStart, rangeEnd) =>
|
||||
val rangeValue = Value.Range.newBuilder()
|
||||
.setBegin(rangeStart)
|
||||
.setEnd(rangeEnd)
|
||||
val builder = Resource.newBuilder()
|
||||
.setName("ports")
|
||||
.setType(Value.Type.RANGES)
|
||||
.setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
|
||||
role.foreach(r => builder.setRole(r))
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to assign a port to an offered range and get the latter's role
|
||||
* info to use it later on.
|
||||
*/
|
||||
private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource])
|
||||
: String = {
|
||||
|
||||
val ranges = portResources.
|
||||
map(resource =>
|
||||
(resource.getRole, resource.getRanges.getRangeList.asScala
|
||||
.map(r => (r.getBegin, r.getEnd)).toList))
|
||||
|
||||
val rangePortRole = ranges
|
||||
.find { case (role, rangeList) => rangeList
|
||||
.exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}}
|
||||
// this is safe since we have previously checked about the ranges (see checkPorts method)
|
||||
rangePortRole.map{ case (role, rangeList) => role}.get
|
||||
}
|
||||
|
||||
/** Retrieves the port resources from a list of mesos offered resources */
|
||||
private def filterPortResources(resources: List[Resource]): (List[Resource], List[Resource]) = {
|
||||
resources.partition { r => !(r.getType == Value.Type.RANGES && r.getName == "ports") }
|
||||
}
|
||||
|
||||
/**
|
||||
* spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver
|
||||
* submissions with frameworkIDs. However, this causes issues when a driver process launches
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.spark.scheduler.cluster.mesos
|
||||
|
||||
import java.util.Collections
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.reflect.ClassTag
|
||||
|
@ -212,6 +210,46 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
|
|||
.registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong)
|
||||
}
|
||||
|
||||
test("Port offer decline when there is no appropriate range") {
|
||||
setBackend(Map("spark.blockManager.port" -> "30100"))
|
||||
val offeredPorts = (31100L, 31200L)
|
||||
val (mem, cpu) = (backend.executorMemory(sc), 4)
|
||||
|
||||
val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
|
||||
backend.resourceOffers(driver, List(offer1).asJava)
|
||||
verify(driver, times(1)).declineOffer(offer1.getId)
|
||||
}
|
||||
|
||||
test("Port offer accepted when ephemeral ports are used") {
|
||||
setBackend()
|
||||
val offeredPorts = (31100L, 31200L)
|
||||
val (mem, cpu) = (backend.executorMemory(sc), 4)
|
||||
|
||||
val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
|
||||
backend.resourceOffers(driver, List(offer1).asJava)
|
||||
verifyTaskLaunched(driver, "o1")
|
||||
}
|
||||
|
||||
test("Port offer accepted with user defined port numbers") {
|
||||
val port = 30100
|
||||
setBackend(Map("spark.blockManager.port" -> s"$port"))
|
||||
val offeredPorts = (30000L, 31000L)
|
||||
val (mem, cpu) = (backend.executorMemory(sc), 4)
|
||||
|
||||
val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
|
||||
backend.resourceOffers(driver, List(offer1).asJava)
|
||||
val taskInfo = verifyTaskLaunched(driver, "o1")
|
||||
|
||||
val taskPortResources = taskInfo.head.getResourcesList.asScala.
|
||||
find(r => r.getType == Value.Type.RANGES && r.getName == "ports")
|
||||
|
||||
val isPortInOffer = (r: Resource) => {
|
||||
r.getRanges().getRangeList
|
||||
.asScala.exists(range => range.getBegin == port && range.getEnd == port)
|
||||
}
|
||||
assert(taskPortResources.exists(isPortInOffer))
|
||||
}
|
||||
|
||||
test("mesos kills an executor when told") {
|
||||
setBackend()
|
||||
|
||||
|
|
|
@ -17,9 +17,10 @@
|
|||
|
||||
package org.apache.spark.scheduler.cluster.mesos
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.language.reflectiveCalls
|
||||
|
||||
import org.apache.mesos.Protos.Value
|
||||
import org.apache.mesos.Protos.{Resource, Value}
|
||||
import org.mockito.Mockito._
|
||||
import org.scalatest._
|
||||
import org.scalatest.mock.MockitoSugar
|
||||
|
@ -35,6 +36,41 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
|
|||
val sc = mock[SparkContext]
|
||||
when(sc.conf).thenReturn(sparkConf)
|
||||
}
|
||||
|
||||
private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = {
|
||||
val rangeValue = Value.Range.newBuilder()
|
||||
rangeValue.setBegin(range._1)
|
||||
rangeValue.setEnd(range._2)
|
||||
val builder = Resource.newBuilder()
|
||||
.setName("ports")
|
||||
.setType(Value.Type.RANGES)
|
||||
.setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
|
||||
|
||||
role.foreach { r => builder.setRole(r) }
|
||||
builder.build()
|
||||
}
|
||||
|
||||
private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = {
|
||||
resources.flatMap{resource => resource.getRanges.getRangeList
|
||||
.asScala.map(range => (range.getBegin, range.getEnd))}
|
||||
}
|
||||
|
||||
def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)])
|
||||
: Boolean = {
|
||||
array1.sortBy(identity).deep == array2.sortBy(identity).deep
|
||||
}
|
||||
|
||||
def arePortsEqual(array1: Array[Long], array2: Array[Long])
|
||||
: Boolean = {
|
||||
array1.sortBy(identity).deep == array2.sortBy(identity).deep
|
||||
}
|
||||
|
||||
def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {
|
||||
resources.flatMap{ resource =>
|
||||
resource.getRanges.getRangeList.asScala.toList.map{
|
||||
range => (range.getBegin, range.getEnd)}}
|
||||
}
|
||||
|
||||
val utils = new MesosSchedulerUtils { }
|
||||
// scalastyle:on structural.type
|
||||
|
||||
|
@ -140,4 +176,80 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
|
|||
utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
|
||||
}
|
||||
|
||||
test("Port reservation is done correctly with user specified ports only") {
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.executor.port", "3000" )
|
||||
conf.set("spark.blockManager.port", "4000")
|
||||
val portResource = createTestPortResource((3000, 5000), Some("my_role"))
|
||||
|
||||
val (resourcesLeft, resourcesToBeUsed) = utils
|
||||
.partitionPortResources(List(3000, 4000), List(portResource))
|
||||
resourcesToBeUsed.length shouldBe 2
|
||||
|
||||
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
|
||||
|
||||
portsToUse.length shouldBe 2
|
||||
arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
|
||||
|
||||
val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
|
||||
|
||||
val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
|
||||
|
||||
arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
|
||||
}
|
||||
|
||||
test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.executor.port", "3100" )
|
||||
val portResource = createTestPortResource((3000, 5000), Some("my_role"))
|
||||
|
||||
val (resourcesLeft, resourcesToBeUsed) = utils
|
||||
.partitionPortResources(List(3100), List(portResource))
|
||||
|
||||
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
|
||||
|
||||
portsToUse.length shouldBe 1
|
||||
portsToUse.contains(3100) shouldBe true
|
||||
}
|
||||
|
||||
test("Port reservation is done correctly with all random ports") {
|
||||
val conf = new SparkConf()
|
||||
val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
|
||||
|
||||
val (resourcesLeft, resourcesToBeUsed) = utils
|
||||
.partitionPortResources(List(), List(portResource))
|
||||
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
|
||||
|
||||
portsToUse.isEmpty shouldBe true
|
||||
}
|
||||
|
||||
test("Port reservation is done correctly with user specified ports only - multiple ranges") {
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.executor.port", "2100" )
|
||||
conf.set("spark.blockManager.port", "4000")
|
||||
val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
|
||||
createTestPortResource((2000, 2500), Some("other_role")))
|
||||
val (resourcesLeft, resourcesToBeUsed) = utils
|
||||
.partitionPortResources(List(2100, 4000), portResourceList)
|
||||
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
|
||||
|
||||
portsToUse.length shouldBe 2
|
||||
val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
|
||||
val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
|
||||
|
||||
val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
|
||||
|
||||
arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
|
||||
arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
|
||||
}
|
||||
|
||||
test("Port reservation is done correctly with all random ports - multiple ranges") {
|
||||
val conf = new SparkConf()
|
||||
val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
|
||||
createTestPortResource((2000, 2500), Some("other_role")))
|
||||
val (resourcesLeft, resourcesToBeUsed) = utils
|
||||
.partitionPortResources(List(), portResourceList)
|
||||
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
|
||||
portsToUse.isEmpty shouldBe true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,15 +19,21 @@ package org.apache.spark.scheduler.cluster.mesos
|
|||
|
||||
import java.util.Collections
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.mesos.Protos._
|
||||
import org.apache.mesos.Protos.Value.Scalar
|
||||
import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
|
||||
import org.apache.mesos.SchedulerDriver
|
||||
import org.mockito.{ArgumentCaptor, Matchers}
|
||||
import org.mockito.Mockito._
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
object Utils {
|
||||
def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
|
||||
def createOffer(
|
||||
offerId: String,
|
||||
slaveId: String,
|
||||
mem: Int,
|
||||
cpu: Int,
|
||||
ports: Option[(Long, Long)] = None): Offer = {
|
||||
val builder = Offer.newBuilder()
|
||||
builder.addResourcesBuilder()
|
||||
.setName("mem")
|
||||
|
@ -37,6 +43,13 @@ object Utils {
|
|||
.setName("cpus")
|
||||
.setType(Value.Type.SCALAR)
|
||||
.setScalar(Scalar.newBuilder().setValue(cpu))
|
||||
ports.foreach { resourcePorts =>
|
||||
builder.addResourcesBuilder()
|
||||
.setName("ports")
|
||||
.setType(Value.Type.RANGES)
|
||||
.setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
|
||||
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
|
||||
}
|
||||
builder.setId(createOfferId(offerId))
|
||||
.setFrameworkId(FrameworkID.newBuilder()
|
||||
.setValue("f1"))
|
||||
|
@ -69,3 +82,4 @@ object Utils {
|
|||
TaskID.newBuilder().setValue(taskId).build()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue