[SPARK-33741][CORE] Add min threshold time speculation config

### What changes were proposed in this pull request?
Add min threshold time speculation config

### Why are the changes needed?
When we turn on speculation with default configs we have the last 10% of the tasks subject to speculation. There are a lot of stages where the stage runs for few seconds to minutes. Also in general we don't want to speculate tasks that run within a minimum threshold. By setting a minimum threshold for speculation config gives us better control for speculative tasks

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #30710 from redsanket/SPARK-33741.

Lead-authored-by: schintap <schintap@verizonmedia.com>
Co-authored-by: Sanket Chintapalli <chintapalli.sanketreddy@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
schintap 2021-01-13 08:57:56 -06:00 committed by Thomas Graves
parent 62d82b5b27
commit bd5039fc35
7 changed files with 55 additions and 7 deletions

View file

@ -1889,6 +1889,14 @@ package object config {
.doubleConf
.createWithDefault(0.75)
private[spark] val SPECULATION_MIN_THRESHOLD =
ConfigBuilder("spark.speculation.min.threshold")
.doc("Minimum amount of time a task runs before being considered for speculation. " +
"This can be used to avoid launching speculative copies of tasks that are very short.")
.version("3.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100)
private[spark] val SPECULATION_TASK_DURATION_THRESHOLD =
ConfigBuilder("spark.speculation.task.duration.threshold")
.doc("Task duration after which scheduler would try to speculative run the task. If " +

View file

@ -94,7 +94,7 @@ private[spark] class Pool(
schedulableQueue.asScala.foreach(_.executorDecommission(executorId))
}
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
override def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue.asScala) {
shouldRevive |= schedulable.checkSpeculatableTasks(minTimeToSpeculation)

View file

@ -45,6 +45,6 @@ private[spark] trait Schedulable {
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def executorDecommission(executorId: String): Unit
def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}

View file

@ -100,7 +100,7 @@ private[spark] class TaskSchedulerImpl(
// Duplicate copies of a task will only be launched if the original copy has been running for
// at least this amount of time. This is to avoid the overhead of launching speculative copies
// of tasks that are very short.
val MIN_TIME_TO_SPECULATION = 100
val MIN_TIME_TO_SPECULATION = conf.get(SPECULATION_MIN_THRESHOLD)
private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

View file

@ -1035,7 +1035,7 @@ private[spark] class TaskSetManager(
* by the TaskScheduler.
*
*/
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
override def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean = {
// No need to speculate if the task set is zombie or is from a barrier stage. If there is only
// one task we don't speculate since we don't have metrics to decide whether it's taking too
// long or not, unless a task duration threshold is explicitly provided.

View file

@ -1901,7 +1901,8 @@ class TaskSetManagerSuite
speculationQuantile: Double,
numTasks: Int,
numExecutorCores: Int,
numCoresPerTask: Int): (TaskSetManager, ManualClock) = {
numCoresPerTask: Int,
speculationMinimumThreshold: Option[String]): (TaskSetManager, ManualClock) = {
val conf = new SparkConf()
conf.set(config.SPECULATION_ENABLED, true)
conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString)
@ -1911,6 +1912,9 @@ class TaskSetManagerSuite
if (speculationThresholdOpt.isDefined) {
conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, speculationThresholdOpt.get)
}
if (speculationMinimumThreshold.isDefined) {
conf.set(config.SPECULATION_MIN_THRESHOLD.key, speculationMinimumThreshold.get)
}
sc = new SparkContext("local", "test", conf)
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
// Create a task set with the given number of tasks
@ -1937,7 +1941,8 @@ class TaskSetManagerSuite
speculationQuantile = 1.0,
numTasks,
numSlots,
numCoresPerTask = 1
numCoresPerTask = 1,
None
)
// if the time threshold has not been exceeded, no speculative run should be triggered
@ -2091,7 +2096,8 @@ class TaskSetManagerSuite
speculationQuantile = 0.5,
numTasks = 2,
numExecutorCores = 2,
numCoresPerTask = 1
numCoresPerTask = 1,
None
)
// Task duration can't be 0, advance 1 sec
@ -2211,6 +2217,31 @@ class TaskSetManagerSuite
assert(manager.invokePrivate(numFailures())(index1) === 1)
}
}
test("SPARK-33741 Test minimum amount of time a task runs " +
"before being considered for speculation") {
val (manager, clock) = testSpeculationDurationSetup(
None,
speculationQuantile = 0.5,
numTasks = 2,
numExecutorCores = 2,
numCoresPerTask = 1,
Some("3000") // spark.speculation.min.threshold
)
// Task duration can't be 0, advance 1 sec
clock.advance(1000)
// Mark one of the task succeeded, which should satisfy the quantile
manager.handleSuccessfulTask(0, createTaskResult(0))
// Advance 1 more second so the remaining task takes longer
clock.advance(1000)
manager.checkSpeculatableTasks(sched.MIN_TIME_TO_SPECULATION)
// The task is not considered as speculative task due to minimum threshold interval of 3s
assert(sched.speculativeTasks.size == 0)
clock.advance(2000)
manager.checkSpeculatableTasks(sched.MIN_TIME_TO_SPECULATION)
// After 3s have elapsed now the task is marked as speculative task
assert(sched.speculativeTasks.size == 1)
}
}
class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, partitionId) {

View file

@ -2308,6 +2308,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
<td>0.6.0</td>
</tr>
<tr>
<td><code>spark.speculation.min.threshold</code></td>
<td>100ms</td>
<td>
Minimum amount of time a task runs before being considered for speculation.
This can be used to avoid launching speculative copies of tasks that are very short.
</td>
<td>3.2.0</td>
</tr>
<tr>
<td><code>spark.speculation.task.duration.threshold</code></td>
<td>None</td>