diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a7ffb354c0..e5b1e0ecd1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1602,6 +1602,15 @@ class SparkContext(config: SparkConf) extends Logging { } } + /** + * Get the max number of tasks that can be concurrent launched currently. + * Note that please don't cache the value returned by this method, because the number can change + * due to add/remove executors. + * + * @return The max number of tasks that can be concurrent launched currently. + */ + private[spark] def maxNumConcurrentTasks(): Int = schedulerBackend.maxNumConcurrentTasks() + /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index eb08628ce1..a8aa6914ff 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -577,4 +577,31 @@ package object config { .timeConf(TimeUnit.SECONDS) .checkValue(v => v > 0, "The value should be a positive time value.") .createWithDefaultString("365d") + + private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL = + ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval") + .doc("Time in seconds to wait between a max concurrent tasks check failure and the next " + + "check. A max concurrent tasks check ensures the cluster can launch more concurrent " + + "tasks than required by a barrier stage on job submitted. The check can fail in case " + + "a cluster has just started and not enough executors have registered, so we wait for a " + + "little while and try to perform the check again. If the check fails more than a " + + "configured max failure times for a job then fail current job submission. Note this " + + "config only applies to jobs that contain one or more barrier stages, we won't perform " + + "the check on non-barrier jobs.") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("15s") + + private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES = + ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures") + .doc("Number of max concurrent tasks check failures allowed before fail a job submission. " + + "A max concurrent tasks check ensures the cluster can launch more concurrent tasks than " + + "required by a barrier stage on job submitted. The check can fail in case a cluster " + + "has just started and not enough executors have registered, so we wait for a little " + + "while and try to perform the check again. If the check fails more than a configured " + + "max failure times for a job then fail current job submission. Note this config only " + + "applies to jobs that contain one or more barrier stages, we won't perform the check on " + + "non-barrier jobs.") + .intConf + .checkValue(v => v > 0, "The max failures should be a positive value.") + .createWithDefault(40) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala new file mode 100644 index 0000000000..803a0a1226 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkException + +/** + * Exception thrown when submit a job with barrier stage(s) failing a required check. + */ +private[spark] class BarrierJobAllocationFailed(message: String) extends SparkException(message) + +private[spark] class BarrierJobUnsupportedRDDChainException + extends BarrierJobAllocationFailed( + BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + +private[spark] class BarrierJobRunWithDynamicAllocationException + extends BarrierJobAllocationFailed( + BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + +private[spark] class BarrierJobSlotsNumberCheckFailed + extends BarrierJobAllocationFailed( + BarrierJobAllocationFailed.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) + +private[spark] object BarrierJobAllocationFailed { + + // Error message when running a barrier stage that have unsupported RDD chain pattern. + val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN = + "[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " + + "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " + + "partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" + + "PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " + + "(scala) or barrierRdd.collect()[0] (python).\n" + + "2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))." + + // Error message when running a barrier stage with dynamic resource allocation enabled. + val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION = + "[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " + + "now. You can disable dynamic resource allocation by setting Spark conf " + + "\"spark.dynamicAllocation.enabled\" to \"false\"." + + // Error message when running a barrier stage that requires more slots than current total number. + val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER = + "[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " + + "more slots than the total number of slots in the cluster currently. Please init a new " + + "cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " + + "slots required to run this barrier stage." +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cf1fcbce78..2b0ca13485 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -19,8 +19,9 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.util.Properties -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import java.util.function.BiFunction import scala.annotation.tailrec import scala.collection.Map @@ -39,7 +40,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData} +import org.apache.spark.rdd.{RDD, RDDCheckpointData} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat @@ -111,8 +112,7 @@ import org.apache.spark.util._ * - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to * include the new structure. This will help to catch memory leaks. */ -private[spark] -class DAGScheduler( +private[spark] class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, @@ -203,6 +203,24 @@ class DAGScheduler( sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) + /** + * Number of max concurrent tasks check failures for each barrier job. + */ + private[scheduler] val barrierJobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int] + + /** + * Time in seconds to wait between a max concurrent tasks check failure and the next check. + */ + private val timeIntervalNumTasksCheck = sc.getConf + .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL) + + /** + * Max number of max concurrent tasks check failures allowed for a job before fail the job + * submission. + */ + private val maxFailureNumTasksCheck = sc.getConf + .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES) + private val messageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") @@ -351,8 +369,7 @@ class DAGScheduler( val predicate: RDD[_] => Boolean = (r => r.getNumPartitions == numTasksInStage && r.dependencies.filter(_.rdd.isBarrier()).size <= 1) if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) { - throw new SparkException( - DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + throw new BarrierJobUnsupportedRDDChainException } } @@ -365,6 +382,7 @@ class DAGScheduler( def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd checkBarrierStageWithDynamicAllocation(rdd) + checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) @@ -398,7 +416,20 @@ class DAGScheduler( */ private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = { if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) { - throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + throw new BarrierJobRunWithDynamicAllocationException + } + } + + /** + * Check whether the barrier stage requires more slots (to be able to launch all tasks in the + * barrier stage together) than the total number of active slots currently. Fail current check + * if trying to submit a barrier stage that requires more slots than current total number. If + * the check fails consecutively beyond a configured number for a job, then fail current job + * submission. + */ + private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = { + if (rdd.isBarrier() && rdd.getNumPartitions > sc.maxNumConcurrentTasks) { + throw new BarrierJobSlotsNumberCheckFailed } } @@ -412,6 +443,7 @@ class DAGScheduler( jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) + checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() @@ -929,11 +961,38 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: BarrierJobSlotsNumberCheckFailed => + logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + + "than the total number of slots in the cluster currently.") + // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically. + val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, + new BiFunction[Int, Int, Int] { + override def apply(key: Int, value: Int): Int = value + 1 + }) + if (numCheckFailures <= maxFailureNumTasksCheck) { + messageScheduler.schedule( + new Runnable { + override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, + partitions, callSite, listener, properties)) + }, + timeIntervalNumTasksCheck, + TimeUnit.SECONDS + ) + return + } else { + // Job failed, clear internal data. + barrierJobIdToNumTasksCheckFailures.remove(jobId) + listener.jobFailed(e) + return + } + case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } + // Job submitted, clear internal data. + barrierJobIdToNumTasksCheckFailures.remove(jobId) val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() @@ -2011,19 +2070,4 @@ private[spark] object DAGScheduler { // Number of consecutive stage attempts allowed before a stage is aborted val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4 - - // Error message when running a barrier stage that have unsupported RDD chain pattern. - val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN = - "[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " + - "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " + - "partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" + - "PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " + - "(scala) or barrierRdd.collect()[0] (python).\n" + - "2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))." - - // Error message when running a barrier stage with dynamic resource allocation enabled. - val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION = - "[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " + - "now. You can disable dynamic resource allocation by setting Spark conf " + - "\"spark.dynamicAllocation.enabled\" to \"false\"." } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 22db3350ab..c187ee1463 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -69,4 +69,13 @@ private[spark] trait SchedulerBackend { */ def getDriverLogUrls: Option[Map[String, String]] = None + /** + * Get the max number of tasks that can be concurrent launched currently. + * Note that please don't cache the value returned by this method, because the number can change + * due to add/remove executors. + * + * @return The max number of tasks that can be concurrent launched currently. + */ + def maxNumConcurrentTasks(): Int + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 375aeb0c34..747e8c7dc0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.keySet.toSeq } + override def maxNumConcurrentTasks(): Int = { + executorDataMap.values.map { executor => + executor.totalCores / scheduler.CPUS_PER_TASK + }.sum + } + /** * Request an additional number of executors from the cluster manager. * @return whether the request is acknowledged. diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index cf8b0ff4f7..0de57fbd56 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -156,6 +156,8 @@ private[spark] class LocalSchedulerBackend( override def applicationId(): String = appId + override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK + private def stop(finalState: SparkAppHandle.State): Unit = { localEndpoint.ask(StopExecutor) try { diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala index 2f21e61ce9..d49ab4aa7d 100644 --- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala +++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala @@ -21,6 +21,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.scheduler.BarrierJobAllocationFailed._ import org.apache.spark.scheduler.DAGScheduler import org.apache.spark.util.ThreadUtils @@ -63,7 +64,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .barrier() .mapPartitions(iter => iter) testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") { @@ -75,7 +76,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .repartition(2) .map(x => x + 1) testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage that doesn't contain PartitionPruningRDD") { @@ -96,7 +97,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .barrier() .mapPartitions(iter => iter) testSubmitJob(sc, rdd, Some(Seq(1, 3)), - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage with union()") { @@ -110,7 +111,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .map(x => x * 2) // Fail the job on submit because the barrier RDD (rdd1) may be not assigned Task 0. testSubmitJob(sc, rdd3, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage with coalesce()") { @@ -122,7 +123,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext // Fail the job on submit because the barrier RDD requires to run on 4 tasks, but the stage // only launches 1 task. testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage that contains an RDD that depends on multiple barrier RDDs") { @@ -137,7 +138,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .zip(rdd2) .map(x => x._1 + x._2) testSubmitJob(sc, rdd3, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage with zip()") { @@ -166,7 +167,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .barrier() .mapPartitions(iter => iter) testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) } test("submit a barrier ShuffleMapStage with dynamic resource allocation enabled") { @@ -183,6 +184,80 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .repartition(2) .map(x => x + 1) testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + } + + test("submit a barrier ResultStage that requires more slots than current total under local " + + "mode") { + val conf = new SparkConf() + // Shorten the time interval between two failed checks to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + // Reduce max check failures allowed to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .setMaster("local[4]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + val rdd = sc.parallelize(1 to 10, 5) + .barrier() + .mapPartitions(iter => iter) + testSubmitJob(sc, rdd, + message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) + } + + test("submit a barrier ShuffleMapStage that requires more slots than current total under " + + "local mode") { + val conf = new SparkConf() + // Shorten the time interval between two failed checks to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + // Reduce max check failures allowed to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .setMaster("local[4]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + val rdd = sc.parallelize(1 to 10, 5) + .barrier() + .mapPartitions(iter => iter) + .repartition(2) + .map(x => x + 1) + testSubmitJob(sc, rdd, + message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) + } + + test("submit a barrier ResultStage that requires more slots than current total under " + + "local-cluster mode") { + val conf = new SparkConf() + .set("spark.task.cpus", "2") + // Shorten the time interval between two failed checks to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + // Reduce max check failures allowed to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + val rdd = sc.parallelize(1 to 10, 5) + .barrier() + .mapPartitions(iter => iter) + testSubmitJob(sc, rdd, + message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) + } + + test("submit a barrier ShuffleMapStage that requires more slots than current total under " + + "local-cluster mode") { + val conf = new SparkConf() + .set("spark.task.cpus", "2") + // Shorten the time interval between two failed checks to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + // Reduce max check failures allowed to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + val rdd = sc.parallelize(1 to 10, 5) + .barrier() + .mapPartitions(iter => iter) + .repartition(2) + .map(x => x + 1) + testSubmitJob(sc, rdd, + message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 3cfb0a9feb..659ebb60fe 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1376,6 +1376,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def defaultParallelism(): Int = sb.defaultParallelism() + override def maxNumConcurrentTasks(): Int = sb.maxNumConcurrentTasks() + override def killExecutorsOnHost(host: String): Boolean = { false } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index cb44110e30..e1666a3527 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -654,6 +654,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .setMaster("local-cluster[3, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) + val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 04cccc67e3..80c9c6f042 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -17,10 +17,18 @@ package org.apache.spark.scheduler +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.duration._ + +import org.scalatest.concurrent.Eventually + import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.rdd.RDD import org.apache.spark.util.{RpcUtils, SerializableBuffer} -class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext { +class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext + with Eventually { test("serialized task larger than max RPC message size") { val conf = new SparkConf @@ -38,4 +46,83 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(smaller.size === 4) } + test("compute max number of concurrent tasks can be launched") { + val conf = new SparkConf() + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = new SparkContext(conf) + eventually(timeout(10.seconds)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 4) + } + assert(sc.maxNumConcurrentTasks() == 12) + } + + test("compute max number of concurrent tasks can be launched when spark.task.cpus > 1") { + val conf = new SparkConf() + .set("spark.task.cpus", "2") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = new SparkContext(conf) + eventually(timeout(10.seconds)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 4) + } + // Each executor can only launch one task since `spark.task.cpus` is 2. + assert(sc.maxNumConcurrentTasks() == 4) + } + + test("compute max number of concurrent tasks can be launched when some executors are busy") { + val conf = new SparkConf() + .set("spark.task.cpus", "2") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = new SparkContext(conf) + val rdd = sc.parallelize(1 to 10, 4).mapPartitions { iter => + Thread.sleep(5000) + iter + } + var taskStarted = new AtomicBoolean(false) + var taskEnded = new AtomicBoolean(false) + val listener = new SparkListener() { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + taskStarted.set(true) + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + taskEnded.set(true) + } + } + + try { + sc.addSparkListener(listener) + eventually(timeout(10.seconds)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 4) + } + + // Submit a job to trigger some tasks on active executors. + testSubmitJob(sc, rdd) + + eventually(timeout(10.seconds)) { + // Ensure some tasks have started and no task finished, so some executors must be busy. + assert(taskStarted.get() == true) + assert(taskEnded.get() == false) + // Assert we count in slots on both busy and free executors. + assert(sc.maxNumConcurrentTasks() == 4) + } + } finally { + sc.removeSparkListener(listener) + } + } + + private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = { + sc.submitJob( + rdd, + (iter: Iterator[Int]) => iter.toArray, + 0 until rdd.partitions.length, + { case (_, _) => return }: (Int, Array[Int]) => Unit, + { return } + ) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3fbe636607..6eeddbb763 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -215,7 +215,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } private def init(testConf: SparkConf): Unit = { - sc = new SparkContext("local", "DAGSchedulerSuite", testConf) + sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf) sparkListener.submittedStageInfos.clear() sparkListener.successfulStages.clear() sparkListener.failedStages.clear() diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 02b19e01ce..b4705914b9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -69,6 +69,7 @@ private class DummySchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism(): Int = 1 + def maxNumConcurrentTasks(): Int = 0 } private class DummyTaskScheduler extends TaskScheduler { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 75ea409e16..cea7f173c8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -385,6 +385,8 @@ private[spark] abstract class MockBackend( }.toIndexedSeq } + override def maxNumConcurrentTasks(): Int = 0 + /** * This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks * completes (which will be in a result-getter thread), and by the reviveOffers thread for delay diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 38e26a82e7..ca9bf08cee 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -36,6 +36,7 @@ class FakeSchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism(): Int = 1 + def maxNumConcurrentTasks(): Int = 0 } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 71a70ff048..0bb6fe0fa4 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } + override def maxNumConcurrentTasks(): Int = { + // TODO SPARK-25074 support this method for MesosFineGrainedSchedulerBackend + 0 + } }