Revert "[SPARK-4525] Mesos should decline unused offers"

This reverts commit b043c27424.

I accidentally committed this using my own authorship credential. However,
I should have given authoriship to the original author: Jongyoul Lee.
This commit is contained in:
Patrick Wendell 2014-11-24 19:16:53 -08:00
parent b043c27424
commit a68d442270
2 changed files with 21 additions and 65 deletions

View file

@ -208,12 +208,10 @@ private[spark] class MesosSchedulerBackend(
*/ */
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() { inClassLoader() {
// Fail-fast on offers we know will be rejected val (acceptedOffers, declinedOffers) = offers.partition { o =>
val (usableOffers, unUsableOffers) = offers.partition { o =>
val mem = getResource(o.getResourcesList, "mem") val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus") val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue val slaveId = o.getSlaveId.getValue
// TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
(mem >= MemoryUtils.calculateTotalMemory(sc) && (mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task // need at least 1 for executor, 1 for task
cpus >= 2 * scheduler.CPUS_PER_TASK) || cpus >= 2 * scheduler.CPUS_PER_TASK) ||
@ -221,12 +219,11 @@ private[spark] class MesosSchedulerBackend(
cpus >= scheduler.CPUS_PER_TASK) cpus >= scheduler.CPUS_PER_TASK)
} }
val workerOffers = usableOffers.map { o => val offerableWorkers = acceptedOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt getResource(o.getResourcesList, "cpus").toInt
} else { } else {
// If the executor doesn't exist yet, subtract CPU for executor // If the executor doesn't exist yet, subtract CPU for executor
// TODO(pwendell): Should below just subtract "1"?
getResource(o.getResourcesList, "cpus").toInt - getResource(o.getResourcesList, "cpus").toInt -
scheduler.CPUS_PER_TASK scheduler.CPUS_PER_TASK
} }
@ -236,20 +233,17 @@ private[spark] class MesosSchedulerBackend(
cpus) cpus)
} }
val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap
val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
val slavesIdsOfAcceptedOffers = HashSet[String]()
// Call into the TaskSchedulerImpl // Call into the TaskSchedulerImpl
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty) scheduler.resourceOffers(offerableWorkers)
acceptedOffers .filter(!_.isEmpty)
.foreach { offer => .foreach { offer =>
offer.foreach { taskDesc => offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId slaveIdsWithExecutors += slaveId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId)) .add(createMesosTask(taskDesc, slaveId))
@ -263,14 +257,7 @@ private[spark] class MesosSchedulerBackend(
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
} }
// Decline offers that weren't used declinedOffers.foreach(o => d.declineOffer(o.getId))
// NOTE: This logic assumes that we only get a single offer for each host in a given batch
for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
d.declineOffer(o.getId)
}
// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))
} }
} }

View file

@ -30,11 +30,9 @@ import java.nio.ByteBuffer
import java.util.Collections import java.util.Collections
import java.util import java.util
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
test("mesos resource offer is launching tasks") {
test("mesos resource offers result in launching tasks") {
def createOffer(id: Int, mem: Int, cpu: Int) = { def createOffer(id: Int, mem: Int, cpu: Int) = {
val builder = Offer.newBuilder() val builder = Offer.newBuilder()
builder.addResourcesBuilder() builder.addResourcesBuilder()
@ -45,61 +43,46 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
.setName("cpus") .setName("cpus")
.setType(Value.Type.SCALAR) .setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu)) .setScalar(Scalar.newBuilder().setValue(cpu))
builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) builder.setId(OfferID.newBuilder().setValue(id.toString).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() .setSlaveId(SlaveID.newBuilder().setValue("s1")).setHostname("localhost").build()
} }
val driver = EasyMock.createMock(classOf[SchedulerDriver]) val driver = EasyMock.createMock(classOf[SchedulerDriver])
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
val sc = EasyMock.createMock(classOf[SparkContext]) val sc = EasyMock.createMock(classOf[SparkContext])
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes() EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
EasyMock.replay(sc) EasyMock.replay(sc)
val minMem = MemoryUtils.calculateTotalMemory(sc).toInt val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val minCpu = 4 val minCpu = 4
val offers = new java.util.ArrayList[Offer]
val mesosOffers = new java.util.ArrayList[Offer] offers.add(createOffer(1, minMem, minCpu))
mesosOffers.add(createOffer(1, minMem, minCpu)) offers.add(createOffer(1, minMem - 1, minCpu))
mesosOffers.add(createOffer(2, minMem - 1, minCpu))
mesosOffers.add(createOffer(3, minMem, minCpu))
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer(
val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) o.getSlaveId.getValue,
expectedWorkerOffers.append(new WorkerOffer( o.getHostname,
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
2
))
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(2).getSlaveId.getValue,
mesosOffers.get(2).getHostname,
2 2
)) ))
val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc))) EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc)))
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler) EasyMock.replay(taskScheduler)
val capture = new Capture[util.Collection[TaskInfo]] val capture = new Capture[util.Collection[TaskInfo]]
EasyMock.expect( EasyMock.expect(
driver.launchTasks( driver.launchTasks(
EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)), EasyMock.eq(Collections.singleton(offers.get(0).getId)),
EasyMock.capture(capture), EasyMock.capture(capture),
EasyMock.anyObject(classOf[Filters]) EasyMock.anyObject(classOf[Filters])
) )
).andReturn(Status.valueOf(1)).once ).andReturn(Status.valueOf(1))
EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1) EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1))
EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver) EasyMock.replay(driver)
backend.resourceOffers(driver, offers)
backend.resourceOffers(driver, mesosOffers)
EasyMock.verify(driver)
assert(capture.getValue.size() == 1) assert(capture.getValue.size() == 1)
val taskInfo = capture.getValue.iterator().next() val taskInfo = capture.getValue.iterator().next()
assert(taskInfo.getName.equals("n1")) assert(taskInfo.getName.equals("n1"))
@ -107,19 +90,5 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
assert(cpus.getName.equals("cpus")) assert(cpus.getName.equals("cpus"))
assert(cpus.getScalar.getValue.equals(2.0)) assert(cpus.getScalar.getValue.equals(2.0))
assert(taskInfo.getSlaveId.getValue.equals("s1")) assert(taskInfo.getSlaveId.getValue.equals("s1"))
// Unwanted resources offered on an existing node. Make sure they are declined
val mesosOffers2 = new java.util.ArrayList[Offer]
mesosOffers2.add(createOffer(1, minMem, minCpu))
EasyMock.reset(taskScheduler)
EasyMock.reset(driver)
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq())))
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)
EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver)
backend.resourceOffers(driver, mesosOffers2)
EasyMock.verify(driver)
} }
} }