From bd5039fc3542dc4ac96bea28639f1896d7919388 Mon Sep 17 00:00:00 2001 From: schintap Date: Wed, 13 Jan 2021 08:57:56 -0600 Subject: [PATCH] [SPARK-33741][CORE] Add min threshold time speculation config ### What changes were proposed in this pull request? Add min threshold time speculation config ### Why are the changes needed? When we turn on speculation with default configs we have the last 10% of the tasks subject to speculation. There are a lot of stages where the stage runs for few seconds to minutes. Also in general we don't want to speculate tasks that run within a minimum threshold. By setting a minimum threshold for speculation config gives us better control for speculative tasks ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #30710 from redsanket/SPARK-33741. Lead-authored-by: schintap Co-authored-by: Sanket Chintapalli Signed-off-by: Thomas Graves --- .../spark/internal/config/package.scala | 8 ++++ .../org/apache/spark/scheduler/Pool.scala | 2 +- .../apache/spark/scheduler/Schedulable.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/scheduler/TaskSetManager.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 37 +++++++++++++++++-- docs/configuration.md | 9 +++++ 7 files changed, 55 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 84c6647028..f962bc6e76 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1889,6 +1889,14 @@ package object config { .doubleConf .createWithDefault(0.75) + private[spark] val SPECULATION_MIN_THRESHOLD = + ConfigBuilder("spark.speculation.min.threshold") + .doc("Minimum amount of time a task runs before being considered for speculation. " + + "This can be used to avoid launching speculative copies of tasks that are very short.") + .version("3.2.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(100) + private[spark] val SPECULATION_TASK_DURATION_THRESHOLD = ConfigBuilder("spark.speculation.task.duration.threshold") .doc("Task duration after which scheduler would try to speculative run the task. If " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 7333b31524..de4c9d39dd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -94,7 +94,7 @@ private[spark] class Pool( schedulableQueue.asScala.foreach(_.executorDecommission(executorId)) } - override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { + override def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean = { var shouldRevive = false for (schedulable <- schedulableQueue.asScala) { shouldRevive |= schedulable.checkSpeculatableTasks(minTimeToSpeculation) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index 0626f8fb81..e549ce62e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -45,6 +45,6 @@ private[spark] trait Schedulable { def getSchedulableByName(name: String): Schedulable def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit def executorDecommission(executorId: String): Unit - def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean + def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index b939e40f3b..71b8bc2341 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -100,7 +100,7 @@ private[spark] class TaskSchedulerImpl( // Duplicate copies of a task will only be launched if the original copy has been running for // at least this amount of time. This is to avoid the overhead of launching speculative copies // of tasks that are very short. - val MIN_TIME_TO_SPECULATION = 100 + val MIN_TIME_TO_SPECULATION = conf.get(SPECULATION_MIN_THRESHOLD) private val speculationScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index ad0791fa42..a3819fee98 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1035,7 +1035,7 @@ private[spark] class TaskSetManager( * by the TaskScheduler. * */ - override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { + override def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean = { // No need to speculate if the task set is zombie or is from a barrier stage. If there is only // one task we don't speculate since we don't have metrics to decide whether it's taking too // long or not, unless a task duration threshold is explicitly provided. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 3bf6cc226c..da281b1f2b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1901,7 +1901,8 @@ class TaskSetManagerSuite speculationQuantile: Double, numTasks: Int, numExecutorCores: Int, - numCoresPerTask: Int): (TaskSetManager, ManualClock) = { + numCoresPerTask: Int, + speculationMinimumThreshold: Option[String]): (TaskSetManager, ManualClock) = { val conf = new SparkConf() conf.set(config.SPECULATION_ENABLED, true) conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString) @@ -1911,6 +1912,9 @@ class TaskSetManagerSuite if (speculationThresholdOpt.isDefined) { conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, speculationThresholdOpt.get) } + if (speculationMinimumThreshold.isDefined) { + conf.set(config.SPECULATION_MIN_THRESHOLD.key, speculationMinimumThreshold.get) + } sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) // Create a task set with the given number of tasks @@ -1937,7 +1941,8 @@ class TaskSetManagerSuite speculationQuantile = 1.0, numTasks, numSlots, - numCoresPerTask = 1 + numCoresPerTask = 1, + None ) // if the time threshold has not been exceeded, no speculative run should be triggered @@ -2091,7 +2096,8 @@ class TaskSetManagerSuite speculationQuantile = 0.5, numTasks = 2, numExecutorCores = 2, - numCoresPerTask = 1 + numCoresPerTask = 1, + None ) // Task duration can't be 0, advance 1 sec @@ -2211,6 +2217,31 @@ class TaskSetManagerSuite assert(manager.invokePrivate(numFailures())(index1) === 1) } } + + test("SPARK-33741 Test minimum amount of time a task runs " + + "before being considered for speculation") { + val (manager, clock) = testSpeculationDurationSetup( + None, + speculationQuantile = 0.5, + numTasks = 2, + numExecutorCores = 2, + numCoresPerTask = 1, + Some("3000") // spark.speculation.min.threshold + ) + // Task duration can't be 0, advance 1 sec + clock.advance(1000) + // Mark one of the task succeeded, which should satisfy the quantile + manager.handleSuccessfulTask(0, createTaskResult(0)) + // Advance 1 more second so the remaining task takes longer + clock.advance(1000) + manager.checkSpeculatableTasks(sched.MIN_TIME_TO_SPECULATION) + // The task is not considered as speculative task due to minimum threshold interval of 3s + assert(sched.speculativeTasks.size == 0) + clock.advance(2000) + manager.checkSpeculatableTasks(sched.MIN_TIME_TO_SPECULATION) + // After 3s have elapsed now the task is marked as speculative task + assert(sched.speculativeTasks.size == 1) + } } class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, partitionId) { diff --git a/docs/configuration.md b/docs/configuration.md index fe1fc3e473..612d62a96f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2308,6 +2308,15 @@ Apart from these, the following properties are also available, and may be useful 0.6.0 + + spark.speculation.min.threshold + 100ms + + Minimum amount of time a task runs before being considered for speculation. + This can be used to avoid launching speculative copies of tasks that are very short. + + 3.2.0 + spark.speculation.task.duration.threshold None