[SPARK-24819][CORE] Fail fast when no enough slots to launch the barrier stage on job submitted

## What changes were proposed in this pull request?

We shall check whether the barrier stage requires more slots (to be able to launch all tasks in the barrier stage together) than the total number of active slots currently, and fail fast if trying to submit a barrier stage that requires more slots than current total number.

This PR proposes to add a new method `getNumSlots()` to try to get the total number of currently active slots in `SchedulerBackend`, support of this new method has been added to all the first-class scheduler backends except `MesosFineGrainedSchedulerBackend`.

## How was this patch tested?

Added new test cases in `BarrierStageOnSubmittedSuite`.

Closes #22001 from jiangxb1987/SPARK-24819.

Lead-authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Co-authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
This commit is contained in:
Xingbo Jiang 2018-08-15 13:31:28 -07:00 committed by Xiangrui Meng
parent 4d8ae0d1c8
commit bfb74394a5
16 changed files with 364 additions and 32 deletions

View file

@ -1602,6 +1602,15 @@ class SparkContext(config: SparkConf) extends Logging {
}
}
/**
* Get the max number of tasks that can be concurrent launched currently.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @return The max number of tasks that can be concurrent launched currently.
*/
private[spark] def maxNumConcurrentTasks(): Int = schedulerBackend.maxNumConcurrentTasks()
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.

View file

@ -577,4 +577,31 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.checkValue(v => v > 0, "The value should be a positive time value.")
.createWithDefaultString("365d")
private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
.doc("Time in seconds to wait between a max concurrent tasks check failure and the next " +
"check. A max concurrent tasks check ensures the cluster can launch more concurrent " +
"tasks than required by a barrier stage on job submitted. The check can fail in case " +
"a cluster has just started and not enough executors have registered, so we wait for a " +
"little while and try to perform the check again. If the check fails more than a " +
"configured max failure times for a job then fail current job submission. Note this " +
"config only applies to jobs that contain one or more barrier stages, we won't perform " +
"the check on non-barrier jobs.")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("15s")
private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures")
.doc("Number of max concurrent tasks check failures allowed before fail a job submission. " +
"A max concurrent tasks check ensures the cluster can launch more concurrent tasks than " +
"required by a barrier stage on job submitted. The check can fail in case a cluster " +
"has just started and not enough executors have registered, so we wait for a little " +
"while and try to perform the check again. If the check fails more than a configured " +
"max failure times for a job then fail current job submission. Note this config only " +
"applies to jobs that contain one or more barrier stages, we won't perform the check on " +
"non-barrier jobs.")
.intConf
.checkValue(v => v > 0, "The max failures should be a positive value.")
.createWithDefault(40)
}

View file

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import org.apache.spark.SparkException
/**
* Exception thrown when submit a job with barrier stage(s) failing a required check.
*/
private[spark] class BarrierJobAllocationFailed(message: String) extends SparkException(message)
private[spark] class BarrierJobUnsupportedRDDChainException
extends BarrierJobAllocationFailed(
BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
private[spark] class BarrierJobRunWithDynamicAllocationException
extends BarrierJobAllocationFailed(
BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
private[spark] class BarrierJobSlotsNumberCheckFailed
extends BarrierJobAllocationFailed(
BarrierJobAllocationFailed.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
private[spark] object BarrierJobAllocationFailed {
// Error message when running a barrier stage that have unsupported RDD chain pattern.
val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN =
"[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " +
"RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " +
"partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" +
"PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " +
"(scala) or barrierRdd.collect()[0] (python).\n" +
"2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))."
// Error message when running a barrier stage with dynamic resource allocation enabled.
val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " +
"now. You can disable dynamic resource allocation by setting Spark conf " +
"\"spark.dynamicAllocation.enabled\" to \"false\"."
// Error message when running a barrier stage that requires more slots than current total number.
val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
"[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " +
"more slots than the total number of slots in the cluster currently. Please init a new " +
"cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " +
"slots required to run this barrier stage."
}

View file

@ -19,8 +19,9 @@ package org.apache.spark.scheduler
import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.BiFunction
import scala.annotation.tailrec
import scala.collection.Map
@ -39,7 +40,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData}
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@ -111,8 +112,7 @@ import org.apache.spark.util._
* - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
* include the new structure. This will help to catch memory leaks.
*/
private[spark]
class DAGScheduler(
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
@ -203,6 +203,24 @@ class DAGScheduler(
sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
/**
* Number of max concurrent tasks check failures for each barrier job.
*/
private[scheduler] val barrierJobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int]
/**
* Time in seconds to wait between a max concurrent tasks check failure and the next check.
*/
private val timeIntervalNumTasksCheck = sc.getConf
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL)
/**
* Max number of max concurrent tasks check failures allowed for a job before fail the job
* submission.
*/
private val maxFailureNumTasksCheck = sc.getConf
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES)
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
@ -351,8 +369,7 @@ class DAGScheduler(
val predicate: RDD[_] => Boolean = (r =>
r.getNumPartitions == numTasksInStage && r.dependencies.filter(_.rdd.isBarrier()).size <= 1)
if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) {
throw new SparkException(
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
throw new BarrierJobUnsupportedRDDChainException
}
}
@ -365,6 +382,7 @@ class DAGScheduler(
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
@ -398,7 +416,20 @@ class DAGScheduler(
*/
private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = {
if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) {
throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
throw new BarrierJobRunWithDynamicAllocationException
}
}
/**
* Check whether the barrier stage requires more slots (to be able to launch all tasks in the
* barrier stage together) than the total number of active slots currently. Fail current check
* if trying to submit a barrier stage that requires more slots than current total number. If
* the check fails consecutively beyond a configured number for a job, then fail current job
* submission.
*/
private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
if (rdd.isBarrier() && rdd.getNumPartitions > sc.maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed
}
}
@ -412,6 +443,7 @@ class DAGScheduler(
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
@ -929,11 +961,38 @@ class DAGScheduler(
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
"than the total number of slots in the cluster currently.")
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
new BiFunction[Int, Int, Int] {
override def apply(key: Int, value: Int): Int = value + 1
})
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
@ -2011,19 +2070,4 @@ private[spark] object DAGScheduler {
// Number of consecutive stage attempts allowed before a stage is aborted
val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
// Error message when running a barrier stage that have unsupported RDD chain pattern.
val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN =
"[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " +
"RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " +
"partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" +
"PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " +
"(scala) or barrierRdd.collect()[0] (python).\n" +
"2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))."
// Error message when running a barrier stage with dynamic resource allocation enabled.
val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " +
"now. You can disable dynamic resource allocation by setting Spark conf " +
"\"spark.dynamicAllocation.enabled\" to \"false\"."
}

View file

@ -69,4 +69,13 @@ private[spark] trait SchedulerBackend {
*/
def getDriverLogUrls: Option[Map[String, String]] = None
/**
* Get the max number of tasks that can be concurrent launched currently.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @return The max number of tasks that can be concurrent launched currently.
*/
def maxNumConcurrentTasks(): Int
}

View file

@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.keySet.toSeq
}
override def maxNumConcurrentTasks(): Int = {
executorDataMap.values.map { executor =>
executor.totalCores / scheduler.CPUS_PER_TASK
}.sum
}
/**
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged.

View file

@ -156,6 +156,8 @@ private[spark] class LocalSchedulerBackend(
override def applicationId(): String = appId
override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK
private def stop(finalState: SparkAppHandle.State): Unit = {
localEndpoint.ask(StopExecutor)
try {

View file

@ -21,6 +21,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.scheduler.BarrierJobAllocationFailed._
import org.apache.spark.scheduler.DAGScheduler
import org.apache.spark.util.ThreadUtils
@ -63,7 +64,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
.barrier()
.mapPartitions(iter => iter)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
}
test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") {
@ -75,7 +76,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
.repartition(2)
.map(x => x + 1)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
}
test("submit a barrier stage that doesn't contain PartitionPruningRDD") {
@ -96,7 +97,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
.barrier()
.mapPartitions(iter => iter)
testSubmitJob(sc, rdd, Some(Seq(1, 3)),
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
}
test("submit a barrier stage with union()") {
@ -110,7 +111,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
.map(x => x * 2)
// Fail the job on submit because the barrier RDD (rdd1) may be not assigned Task 0.
testSubmitJob(sc, rdd3,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
}
test("submit a barrier stage with coalesce()") {
@ -122,7 +123,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
// Fail the job on submit because the barrier RDD requires to run on 4 tasks, but the stage
// only launches 1 task.
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
}
test("submit a barrier stage that contains an RDD that depends on multiple barrier RDDs") {
@ -137,7 +138,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
.zip(rdd2)
.map(x => x._1 + x._2)
testSubmitJob(sc, rdd3,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
}
test("submit a barrier stage with zip()") {
@ -166,7 +167,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
.barrier()
.mapPartitions(iter => iter)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
message = ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}
test("submit a barrier ShuffleMapStage with dynamic resource allocation enabled") {
@ -183,6 +184,80 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
.repartition(2)
.map(x => x + 1)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
message = ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}
test("submit a barrier ResultStage that requires more slots than current total under local " +
"mode") {
val conf = new SparkConf()
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 5)
.barrier()
.mapPartitions(iter => iter)
testSubmitJob(sc, rdd,
message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}
test("submit a barrier ShuffleMapStage that requires more slots than current total under " +
"local mode") {
val conf = new SparkConf()
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 5)
.barrier()
.mapPartitions(iter => iter)
.repartition(2)
.map(x => x + 1)
testSubmitJob(sc, rdd,
message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}
test("submit a barrier ResultStage that requires more slots than current total under " +
"local-cluster mode") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 5)
.barrier()
.mapPartitions(iter => iter)
testSubmitJob(sc, rdd,
message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}
test("submit a barrier ShuffleMapStage that requires more slots than current total under " +
"local-cluster mode") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 5)
.barrier()
.mapPartitions(iter => iter)
.repartition(2)
.map(x => x + 1)
testSubmitJob(sc, rdd,
message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}
}

View file

@ -1376,6 +1376,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
override def defaultParallelism(): Int = sb.defaultParallelism()
override def maxNumConcurrentTasks(): Int = sb.maxNumConcurrentTasks()
override def killExecutorsOnHost(host: String): Boolean = {
false
}

View file

@ -654,6 +654,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
.setMaster("local-cluster[3, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()

View file

@ -17,10 +17,18 @@
package org.apache.spark.scheduler
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration._
import org.scalatest.concurrent.Eventually
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{RpcUtils, SerializableBuffer}
class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext {
class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext
with Eventually {
test("serialized task larger than max RPC message size") {
val conf = new SparkConf
@ -38,4 +46,83 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
assert(smaller.size === 4)
}
test("compute max number of concurrent tasks can be launched") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = new SparkContext(conf)
eventually(timeout(10.seconds)) {
// Ensure all executors have been launched.
assert(sc.getExecutorIds().length == 4)
}
assert(sc.maxNumConcurrentTasks() == 12)
}
test("compute max number of concurrent tasks can be launched when spark.task.cpus > 1") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = new SparkContext(conf)
eventually(timeout(10.seconds)) {
// Ensure all executors have been launched.
assert(sc.getExecutorIds().length == 4)
}
// Each executor can only launch one task since `spark.task.cpus` is 2.
assert(sc.maxNumConcurrentTasks() == 4)
}
test("compute max number of concurrent tasks can be launched when some executors are busy") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 10, 4).mapPartitions { iter =>
Thread.sleep(5000)
iter
}
var taskStarted = new AtomicBoolean(false)
var taskEnded = new AtomicBoolean(false)
val listener = new SparkListener() {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
taskStarted.set(true)
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEnded.set(true)
}
}
try {
sc.addSparkListener(listener)
eventually(timeout(10.seconds)) {
// Ensure all executors have been launched.
assert(sc.getExecutorIds().length == 4)
}
// Submit a job to trigger some tasks on active executors.
testSubmitJob(sc, rdd)
eventually(timeout(10.seconds)) {
// Ensure some tasks have started and no task finished, so some executors must be busy.
assert(taskStarted.get() == true)
assert(taskEnded.get() == false)
// Assert we count in slots on both busy and free executors.
assert(sc.maxNumConcurrentTasks() == 4)
}
} finally {
sc.removeSparkListener(listener)
}
}
private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = {
sc.submitJob(
rdd,
(iter: Iterator[Int]) => iter.toArray,
0 until rdd.partitions.length,
{ case (_, _) => return }: (Int, Array[Int]) => Unit,
{ return }
)
}
}

View file

@ -215,7 +215,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
private def init(testConf: SparkConf): Unit = {
sc = new SparkContext("local", "DAGSchedulerSuite", testConf)
sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf)
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()

View file

@ -69,6 +69,7 @@ private class DummySchedulerBackend extends SchedulerBackend {
def stop() {}
def reviveOffers() {}
def defaultParallelism(): Int = 1
def maxNumConcurrentTasks(): Int = 0
}
private class DummyTaskScheduler extends TaskScheduler {

View file

@ -385,6 +385,8 @@ private[spark] abstract class MockBackend(
}.toIndexedSeq
}
override def maxNumConcurrentTasks(): Int = 0
/**
* This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks
* completes (which will be in a result-getter thread), and by the reviveOffers thread for delay

View file

@ -36,6 +36,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
def stop() {}
def reviveOffers() {}
def defaultParallelism(): Int = 1
def maxNumConcurrentTasks(): Int = 0
}
class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach

View file

@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
super.applicationId
}
override def maxNumConcurrentTasks(): Int = {
// TODO SPARK-25074 support this method for MesosFineGrainedSchedulerBackend
0
}
}