diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala deleted file mode 100644 index 62df9657a6..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.PriorityQueue -import scala.util.Random - -import org.apache.spark.SparkConf - -case class OfferState(workOffer: WorkerOffer, var cores: Int) { - // Build a list of tasks to assign to each worker. - val tasks = new ArrayBuffer[TaskDescription](cores) -} - -abstract class TaskAssigner(conf: SparkConf) { - var offer: Seq[OfferState] = _ - val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) - - // The final assigned offer returned to TaskScheduler. - def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) - - // construct the assigner by the workoffer. - def construct(workOffer: Seq[WorkerOffer]): Unit = { - offer = workOffer.map(o => OfferState(o, o.cores)) - } - - // Invoked in each round of Taskset assignment to initialize the internal structure. - def init(): Unit - - // Indicating whether there is offer available to be used by one round of Taskset assignment. - def hasNext(): Boolean - - // Next available offer returned to one round of Taskset assignment. - def getNext(): OfferState - - // Called by the TaskScheduler to indicate whether the current offer is accepted - // In order to decide whether the current is valid for the next offering. - def taskAssigned(assigned: Boolean): Unit - - // Release internally maintained resources. Subclass is responsible to - // release its own private resources. - def reset: Unit = { - offer = null - } -} - -class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) { - var i = 0 - override def construct(workOffer: Seq[WorkerOffer]): Unit = { - offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores))) - } - override def init(): Unit = { - i = 0 - } - override def hasNext: Boolean = { - i < offer.size - } - override def getNext(): OfferState = { - offer(i) - } - override def taskAssigned(assigned: Boolean): Unit = { - i += 1 - } - override def reset: Unit = { - super.reset - i = 0 - } -} - -class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) { - var maxHeap: PriorityQueue[OfferState] = _ - var current: OfferState = _ - - override def construct(workOffer: Seq[WorkerOffer]): Unit = { - offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores))) - } - implicit val ord: Ordering[OfferState] = new Ordering[OfferState] { - def compare(x: OfferState, y: OfferState): Int = { - return Ordering[Int].compare(x.cores, y.cores) - } - } - def init(): Unit = { - maxHeap = new PriorityQueue[OfferState]() - offer.filter(_.cores >= CPUS_PER_TASK).foreach(maxHeap.enqueue(_)) - } - override def hasNext: Boolean = { - maxHeap.size > 0 - } - override def getNext(): OfferState = { - current = maxHeap.dequeue() - current - } - - override def taskAssigned(assigned: Boolean): Unit = { - if (current.cores >= CPUS_PER_TASK && assigned) { - maxHeap.enqueue(current) - } - } - override def reset: Unit = { - super.reset - maxHeap = null - current = null - } -} - -class PackedAssigner(conf: SparkConf) extends TaskAssigner(conf) { - - var sorted: Seq[OfferState] = _ - var i = 0 - var current: OfferState = _ - - override def init(): Unit = { - i = 0 - sorted = offer.filter(_.cores >= CPUS_PER_TASK).sortBy(_.cores) - } - - override def hasNext: Boolean = { - i < sorted.size - } - - override def getNext(): OfferState = { - current = sorted(i) - current - } - - def taskAssigned(assigned: Boolean): Unit = { - if (current.cores < CPUS_PER_TASK || !assigned) { - i += 1 - } - } - - override def reset: Unit = { - super.reset - sorted = null - current = null - i = 0 - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index fb732ea8e5..3e3f1ad031 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -22,7 +22,9 @@ import java.util.{Timer, TimerTask} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong +import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState @@ -59,21 +61,6 @@ private[spark] class TaskSchedulerImpl( val conf = sc.conf - val DEFAULT_TASK_ASSIGNER = classOf[RoundRobinAssigner].getName - lazy val taskAssigner: TaskAssigner = { - val className = conf.get("spark.task.assigner", DEFAULT_TASK_ASSIGNER) - try { - logInfo(s"""constructing assigner as $className""") - val ctor = Utils.classForName(className).getConstructor(classOf[SparkConf]) - ctor.newInstance(conf).asInstanceOf[TaskAssigner] - } catch { - case _: Throwable => - logWarning( - s"""$className cannot be constructed fallback to default - | $DEFAULT_TASK_ASSIGNER""".stripMargin) - new RoundRobinAssigner(conf) - } - } // How often to check for speculative tasks val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") @@ -263,26 +250,24 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - taskAssigner: TaskAssigner) : Boolean = { + shuffledOffers: Seq[WorkerOffer], + availableCpus: Array[Int], + tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false - taskAssigner.init() - while(taskAssigner.hasNext()) { - var assigned = false - val current = taskAssigner.getNext() - val execId = current.workOffer.executorId - val host = current.workOffer.host - if (current.cores >= CPUS_PER_TASK) { + for (i <- 0 until shuffledOffers.size) { + val execId = shuffledOffers(i).executorId + val host = shuffledOffers(i).host + if (availableCpus(i) >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { - current.tasks += task + tasks(i) += task val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 - current.cores = current.cores - CPUS_PER_TASK - assert(current.cores >= 0) + availableCpus(i) -= CPUS_PER_TASK + assert(availableCpus(i) >= 0) launchedTask = true - assigned = true } } catch { case e: TaskNotSerializableException => @@ -292,10 +277,8 @@ private[spark] class TaskSchedulerImpl( return launchedTask } } - taskAssigner.taskAssigned(assigned) } return launchedTask - } /** @@ -322,8 +305,12 @@ private[spark] class TaskSchedulerImpl( hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } - taskAssigner.construct(offers) + // Randomly shuffle offers to avoid always placing tasks on the same set of workers. + val shuffledOffers = Random.shuffle(offers) + // Build a list of tasks to assign to each worker. + val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) + val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -342,7 +329,7 @@ private[spark] class TaskSchedulerImpl( for (currentMaxLocality <- taskSet.myLocalityLevels) { do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( - taskSet, currentMaxLocality, taskAssigner) + taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } @@ -350,12 +337,10 @@ private[spark] class TaskSchedulerImpl( taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } - val tasks = taskAssigner.tasks - taskAssigner.reset + if (tasks.size > 0) { hasLaunchedTask = true } - return tasks } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 2584f85bc5..f5f1947661 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -109,72 +109,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } - test("Scheduler balance the assignment to the worker with more free cores") { - val taskScheduler = setupScheduler(("spark.task.assigner", classOf[BalancedAssigner].getName)) - val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), - new WorkerOffer("executor1", "host1", 4)) - val selectedExecutorIds = { - val taskSet = FakeTask.createTaskSet(2) - taskScheduler.submitTasks(taskSet) - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten - assert(2 === taskDescriptions.length) - taskDescriptions.map(_.executorId) - } - val count = selectedExecutorIds.count(_ == workerOffers(1).executorId) - assert(count == 2) - assert(!failedTaskSet) - } - - test("Scheduler balance the assignment across workers with same free cores") { - val taskScheduler = setupScheduler(("spark.task.assigner", classOf[BalancedAssigner].getName)) - val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), - new WorkerOffer("executor1", "host1", 2)) - val selectedExecutorIds = { - val taskSet = FakeTask.createTaskSet(2) - taskScheduler.submitTasks(taskSet) - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten - assert(2 === taskDescriptions.length) - taskDescriptions.map(_.executorId) - } - val count = selectedExecutorIds.count(_ == workerOffers(1).executorId) - assert(count == 1) - assert(!failedTaskSet) - } - - test("Scheduler packs the assignment to workers with less free cores") { - val taskScheduler = setupScheduler(("spark.task.assigner", classOf[PackedAssigner].getName)) - val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), - new WorkerOffer("executor1", "host1", 4)) - val selectedExecutorIds = { - val taskSet = FakeTask.createTaskSet(2) - taskScheduler.submitTasks(taskSet) - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten - assert(2 === taskDescriptions.length) - taskDescriptions.map(_.executorId) - } - val count = selectedExecutorIds.count(_ == workerOffers(0).executorId) - assert(count == 2) - assert(!failedTaskSet) - } - - test("Scheduler keeps packing the assignment to the same worker") { - val taskScheduler = setupScheduler(("spark.task.assigner", classOf[PackedAssigner].getName)) - val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 4), - new WorkerOffer("executor1", "host1", 4)) - val selectedExecutorIds = { - val taskSet = FakeTask.createTaskSet(4) - taskScheduler.submitTasks(taskSet) - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten - assert(4 === taskDescriptions.length) - taskDescriptions.map(_.executorId) - } - - val count = selectedExecutorIds.count(_ == workerOffers(0).executorId) - assert(count == 4) - assert(!failedTaskSet) - } - - test("Scheduler correctly accounts for multiple CPUs per task") { val taskCpus = 2 val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) @@ -474,5 +408,4 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(thirdTaskDescs.size === 0) assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3"))) } - } diff --git a/docs/configuration.md b/docs/configuration.md index 6f3fbeb76c..373e22d71a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1334,17 +1334,6 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. - - spark.task.assigner - org.apache.spark.scheduler.RoundRobinAssigner - - The strategy of how to allocate tasks among workers with free cores. - By default, round robin with randomness is used. - org.apache.spark.scheduler.BalancedAssigner tries to balance the task across all workers (allocating tasks to - workers with most free cores). org.apache.spark.scheduler.PackedAssigner tries to allocate tasks to workers - with the least free cores, which may help releasing the resources when dynamic allocation is enabled. - - #### Dynamic Allocation