[SPARK-23189][CORE][WEB UI] Reflect stage level blacklisting on executor tab

## What changes were proposed in this pull request?

The purpose of this PR to reflect the stage level blacklisting on the executor tab for the currently active stages.

After this change in the executor tab at the Status column one of the following label will be:

- "Blacklisted" when the executor is blacklisted application level (old flag)
- "Dead" when the executor is not Blacklisted and not Active
- "Blacklisted in Stages: [...]" when the executor is Active but the there are active blacklisted stages for the executor. Within the [] coma separated active stageIDs are listed.
- "Active" when the executor is Active and there is no active blacklisted stages for the executor

## How was this patch tested?

Both with unit tests and manually.

#### Manual test

Spark was started as:

```bash
 bin/spark-shell --master "local-cluster[2,1,1024]" --conf "spark.blacklist.enabled=true" --conf "spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf "spark.blacklist.application.maxFailedTasksPerExecutor=10"
```

And the job was:
```scala
import org.apache.spark.SparkEnv

val pairs = sc.parallelize(1 to 10000, 10).map { x =>
  if (SparkEnv.get.executorId.toInt == 0) throw new RuntimeException("Bad executor")
  else  {
    Thread.sleep(10)
    (x % 10, x)
  }
}

val all = pairs.cogroup(pairs)

all.collect()
```

UI screenshots about the running:

- One executor is blacklisted in the two stages:

![One executor is blacklisted in two stages](https://issues.apache.org/jira/secure/attachment/12908314/multiple_stages_1.png)

- One stage completes the other one is still running:

![One stage completes the other is still running](https://issues.apache.org/jira/secure/attachment/12908315/multiple_stages_2.png)

- Both stages are completed:

![Both stages are completed](https://issues.apache.org/jira/secure/attachment/12908316/multiple_stages_3.png)

### Unit tests

In AppStatusListenerSuite.scala both the node blacklisting for a stage and the executor blacklisting for stage are tested.

Author: “attilapiros” <piros.attila.zsolt@gmail.com>

Closes #20408 from attilapiros/SPARK-23189.
This commit is contained in:
“attilapiros” 2018-02-13 09:54:52 -06:00 committed by Imran Rashid
parent 116c581d26
commit d6e1958a24
9 changed files with 114 additions and 37 deletions

View file

@ -25,12 +25,18 @@ function getThreadDumpEnabled() {
return threadDumpEnabled;
}
function formatStatus(status, type) {
if (status) {
return "Active"
} else {
return "Dead"
function formatStatus(status, type, row) {
if (row.isBlacklisted) {
return "Blacklisted";
}
if (status) {
if (row.blacklistedInStages.length == 0) {
return "Active"
}
return "Active (Blacklisted in Stages: [" + row.blacklistedInStages.join(", ") + "])";
}
return "Dead"
}
jQuery.extend(jQuery.fn.dataTableExt.oSort, {
@ -415,9 +421,10 @@ $(document).ready(function () {
}
},
{data: 'hostPort'},
{data: 'isActive', render: function (data, type, row) {
if (row.isBlacklisted) return "Blacklisted";
else return formatStatus (data, type);
{
data: 'isActive',
render: function (data, type, row) {
return formatStatus (data, type, row);
}
},
{data: 'rddBlocks'},

View file

@ -213,11 +213,13 @@ private[spark] class AppStatusListener(
override def onExecutorBlacklistedForStage(
event: SparkListenerExecutorBlacklistedForStage): Unit = {
val now = System.nanoTime()
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
val now = System.nanoTime()
val esummary = stage.executorSummary(event.executorId)
esummary.isBlacklisted = true
maybeUpdate(esummary, now)
setStageBlackListStatus(stage, now, event.executorId)
}
liveExecutors.get(event.executorId).foreach { exec =>
addBlackListedStageTo(exec, event.stageId, now)
}
}
@ -226,14 +228,27 @@ private[spark] class AppStatusListener(
// Implicitly blacklist every available executor for the stage associated with this node
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
liveExecutors.values.foreach { exec =>
if (exec.hostname == event.hostId) {
val esummary = stage.executorSummary(exec.executorId)
esummary.isBlacklisted = true
maybeUpdate(esummary, now)
}
}
val executorIds = liveExecutors.values.filter(_.host == event.hostId).map(_.executorId).toSeq
setStageBlackListStatus(stage, now, executorIds: _*)
}
liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec =>
addBlackListedStageTo(exec, event.stageId, now)
}
}
private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: Long): Unit = {
exec.blacklistedInStages += stageId
liveUpdate(exec, now)
}
private def setStageBlackListStatus(stage: LiveStage, now: Long, executorIds: String*): Unit = {
executorIds.foreach { executorId =>
val executorStageSummary = stage.executorSummary(executorId)
executorStageSummary.isBlacklisted = true
maybeUpdate(executorStageSummary, now)
}
stage.blackListedExecutors ++= executorIds
maybeUpdate(stage, now)
}
override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
@ -594,12 +609,24 @@ private[spark] class AppStatusListener(
stage.executorSummaries.values.foreach(update(_, now))
update(stage, now, last = true)
val executorIdsForStage = stage.blackListedExecutors
executorIdsForStage.foreach { executorId =>
liveExecutors.get(executorId).foreach { exec =>
removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
}
}
}
appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
kvstore.write(appSummary)
}
private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = {
exec.blacklistedInStages -= stageId
liveUpdate(exec, now)
}
override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
// This needs to set fields that are already set by onExecutorAdded because the driver is
// considered an "executor" in the UI, but does not have a SparkListenerExecutorAdded event.

View file

@ -20,6 +20,7 @@ package org.apache.spark.status
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.immutable.{HashSet, TreeSet}
import scala.collection.mutable.HashMap
import com.google.common.collect.Interners
@ -254,6 +255,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
var totalShuffleRead = 0L
var totalShuffleWrite = 0L
var isBlacklisted = false
var blacklistedInStages: Set[Int] = TreeSet()
var executorLogs = Map[String, String]()
@ -299,7 +301,8 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
Option(removeTime),
Option(removeReason),
executorLogs,
memoryMetrics)
memoryMetrics,
blacklistedInStages)
new ExecutorSummaryWrapper(info)
}
@ -371,6 +374,8 @@ private class LiveStage extends LiveEntity {
val executorSummaries = new HashMap[String, LiveExecutorStageSummary]()
var blackListedExecutors = new HashSet[String]()
// Used for cleanup of tasks after they reach the configured limit. Not written to the store.
@volatile var cleaning = false
var savedTasks = new AtomicInteger(0)

View file

@ -95,7 +95,8 @@ class ExecutorSummary private[spark](
val removeTime: Option[Date],
val removeReason: Option[String],
val executorLogs: Map[String, String],
val memoryMetrics: Option[MemoryMetrics])
val memoryMetrics: Option[MemoryMetrics],
val blacklistedInStages: Set[Int])
class MemoryMetrics private[spark](
val usedOnHeapStorageMemory: Long,

View file

@ -19,5 +19,6 @@
"isBlacklisted" : false,
"maxMemory" : 278302556,
"addTime" : "2015-02-03T16:43:00.906GMT",
"executorLogs" : { }
"executorLogs" : { },
"blacklistedInStages" : [ ]
} ]

View file

@ -25,7 +25,8 @@
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 524288000
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "3",
"hostPort" : "172.22.0.167:51485",
@ -56,7 +57,8 @@
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 524288000
}
},
"blacklistedInStages" : [ ]
} ,{
"id" : "2",
"hostPort" : "172.22.0.167:51487",
@ -87,7 +89,8 @@
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 524288000
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "1",
"hostPort" : "172.22.0.167:51490",
@ -118,7 +121,8 @@
"usedOffHeapStorageMemory": 0,
"totalOnHeapStorageMemory": 384093388,
"totalOffHeapStorageMemory": 524288000
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "0",
"hostPort" : "172.22.0.167:51491",
@ -149,5 +153,6 @@
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 524288000
}
},
"blacklistedInStages" : [ ]
} ]

View file

@ -25,7 +25,8 @@
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 524288000
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "3",
"hostPort" : "172.22.0.167:51485",
@ -56,7 +57,8 @@
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 524288000
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "2",
"hostPort" : "172.22.0.167:51487",
@ -87,7 +89,8 @@
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 524288000
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "1",
"hostPort" : "172.22.0.167:51490",
@ -118,7 +121,8 @@
"usedOffHeapStorageMemory": 0,
"totalOnHeapStorageMemory": 384093388,
"totalOffHeapStorageMemory": 524288000
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "0",
"hostPort" : "172.22.0.167:51491",
@ -149,5 +153,6 @@
"usedOffHeapStorageMemory": 0,
"totalOnHeapStorageMemory": 384093388,
"totalOffHeapStorageMemory": 524288000
}
},
"blacklistedInStages" : [ ]
} ]

View file

@ -19,7 +19,8 @@
"isBlacklisted" : false,
"maxMemory" : 384093388,
"addTime" : "2016-11-15T23:20:38.836GMT",
"executorLogs" : { }
"executorLogs" : { },
"blacklistedInStages" : [ ]
}, {
"id" : "3",
"hostPort" : "172.22.0.111:64543",
@ -44,7 +45,8 @@
"executorLogs" : {
"stdout" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stdout",
"stderr" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr"
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "2",
"hostPort" : "172.22.0.111:64539",
@ -69,7 +71,8 @@
"executorLogs" : {
"stdout" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stdout",
"stderr" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr"
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "1",
"hostPort" : "172.22.0.111:64541",
@ -94,7 +97,8 @@
"executorLogs" : {
"stdout" : "http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stdout",
"stderr" : "http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stderr"
}
},
"blacklistedInStages" : [ ]
}, {
"id" : "0",
"hostPort" : "172.22.0.111:64540",
@ -119,5 +123,6 @@
"executorLogs" : {
"stdout" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stdout",
"stderr" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stderr"
}
},
"blacklistedInStages" : [ ]
} ]

View file

@ -273,6 +273,10 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(exec.info.isBlacklistedForStage === expectedBlacklistedFlag)
}
check[ExecutorSummaryWrapper](execIds.head) { exec =>
assert(exec.info.blacklistedInStages === Set(stages.head.stageId))
}
// Blacklisting node for stage
time += 1
listener.onNodeBlacklistedForStage(SparkListenerNodeBlacklistedForStage(
@ -439,6 +443,10 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(stage.info.numCompleteTasks === pending.size)
}
check[ExecutorSummaryWrapper](execIds.head) { exec =>
assert(exec.info.blacklistedInStages === Set())
}
// Submit stage 2.
time += 1
stages.last.submissionTime = Some(time)
@ -453,6 +461,19 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(stage.info.submissionTime === Some(new Date(stages.last.submissionTime.get)))
}
// Blacklisting node for stage
time += 1
listener.onNodeBlacklistedForStage(SparkListenerNodeBlacklistedForStage(
time = time,
hostId = "1.example.com",
executorFailures = 1,
stageId = stages.last.stageId,
stageAttemptId = stages.last.attemptId))
check[ExecutorSummaryWrapper](execIds.head) { exec =>
assert(exec.info.blacklistedInStages === Set(stages.last.stageId))
}
// Start and fail all tasks of stage 2.
time += 1
val s2Tasks = createTasks(4, execIds)