[SPARK-24954][CORE] Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled

## What changes were proposed in this pull request?

We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that we acquire some executors (but not enough to launch all the tasks in a barrier stage) and later release them due to executor idle time expire, and then acquire again).

We perform the check on job submit and fail fast if running a barrier stage with dynamic resource allocation enabled.

## How was this patch tested?

Added new test suite `BarrierStageOnSubmittedSuite` to cover all the fail fast cases that submitted a job containing one or more barrier stages.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21915 from jiangxb1987/SPARK-24954.
This commit is contained in:
Xingbo Jiang 2018-08-03 09:36:56 -07:00 committed by Xiangrui Meng
parent c32dbd6bd5
commit 92b48842b9
2 changed files with 71 additions and 11 deletions

View file

@ -364,6 +364,7 @@ class DAGScheduler(
*/
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
@ -384,6 +385,23 @@ class DAGScheduler(
stage
}
/**
* We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead
* to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that
* we acquire some executors (but not enough to launch all the tasks in a barrier stage) and
* later release them due to executor idle time expire, and then acquire again).
*
* We perform the check on job submit and fail fast if running a barrier stage with dynamic
* resource allocation enabled.
*
* TODO SPARK-24942 Improve cluster resource management with jobs containing barrier stage
*/
private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = {
if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) {
throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}
}
/**
* Create a ResultStage associated with the provided jobId.
*/
@ -393,6 +411,7 @@ class DAGScheduler(
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
@ -2001,4 +2020,10 @@ private[spark] object DAGScheduler {
"PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " +
"(scala) or barrierRdd.collect()[0] (python).\n" +
"2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))."
// Error message when running a barrier stage with dynamic resource allocation enabled.
val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " +
"now. You can disable dynamic resource allocation by setting Spark conf " +
"\"spark.dynamicAllocation.enabled\" to \"false\"."
}

View file

@ -20,8 +20,6 @@ package org.apache.spark
import scala.concurrent.duration._
import scala.language.postfixOps
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.scheduler.DAGScheduler
import org.apache.spark.util.ThreadUtils
@ -30,16 +28,13 @@ import org.apache.spark.util.ThreadUtils
* This test suite covers all the cases that shall fail fast on job submitted that contains one
* of more barrier stages.
*/
class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
with LocalSparkContext {
class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext {
override def beforeEach(): Unit = {
super.beforeEach()
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("test")
sc = new SparkContext(conf)
private def createSparkContext(conf: Option[SparkConf] = None): SparkContext = {
new SparkContext(conf.getOrElse(
new SparkConf()
.setMaster("local[4]")
.setAppName("test")))
}
private def testSubmitJob(
@ -62,6 +57,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}
test("submit a barrier ResultStage that contains PartitionPruningRDD") {
sc = createSparkContext()
val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1)
val rdd = prunedRdd
.barrier()
@ -71,6 +67,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}
test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") {
sc = createSparkContext()
val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1)
val rdd = prunedRdd
.barrier()
@ -82,6 +79,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}
test("submit a barrier stage that doesn't contain PartitionPruningRDD") {
sc = createSparkContext()
val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1)
val rdd = prunedRdd
.repartition(2)
@ -93,6 +91,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}
test("submit a barrier stage with partial partitions") {
sc = createSparkContext()
val rdd = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
@ -101,6 +100,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}
test("submit a barrier stage with union()") {
sc = createSparkContext()
val rdd1 = sc.parallelize(1 to 10, 2)
.barrier()
.mapPartitions((iter, context) => iter)
@ -114,6 +114,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}
test("submit a barrier stage with coalesce()") {
sc = createSparkContext()
val rdd = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
@ -125,6 +126,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}
test("submit a barrier stage that contains an RDD that depends on multiple barrier RDDs") {
sc = createSparkContext()
val rdd1 = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
@ -139,6 +141,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
}
test("submit a barrier stage with zip()") {
sc = createSparkContext()
val rdd1 = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
@ -150,4 +153,36 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
val result = rdd3.collect().sorted
assert(result === Seq(12, 14, 16, 18, 20, 22, 24, 26, 28, 30))
}
test("submit a barrier ResultStage with dynamic resource allocation enabled") {
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}
test("submit a barrier ShuffleMapStage with dynamic resource allocation enabled") {
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 4)
.barrier()
.mapPartitions((iter, context) => iter)
.repartition(2)
.map(x => x + 1)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}
}