0ec95bb7df
## What changes were proposed in this pull request? In this PR stage blacklisting is propagated to UI by introducing a new Spark listener event (SparkListenerExecutorBlacklistedForStage) which indicates the executor is blacklisted for a stage. Either because of the number of failures are exceeded a limit given for an executor (spark.blacklist.stage.maxFailedTasksPerExecutor) or because of the whole node is blacklisted for a stage (spark.blacklist.stage.maxFailedExecutorsPerNode). In case of the node is blacklisting all executors will listed as blacklisted for the stage. Blacklisting state for a selected stage can be seen "Aggregated Metrics by Executor" table's blacklisting column, where after this change three possible labels could be found: - "for application": when the executor is blacklisted for the application (see the configuration spark.blacklist.application.maxFailedTasksPerExecutor for details) - "for stage": when the executor is **only** blacklisted for the stage - "false" : when the executor is not blacklisted at all ## How was this patch tested? It is tested both manually and with unit tests. #### Unit tests - HistoryServerSuite - TaskSetBlacklistSuite - AppStatusListenerSuite #### Manual test for executor blacklisting Running Spark as a local cluster: ``` $ bin/spark-shell --master "local-cluster[2,1,1024]" --conf "spark.blacklist.enabled=true" --conf "spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf "spark.blacklist.application.maxFailedTasksPerExecutor=10" --conf "spark.eventLog.enabled=true" ``` Executing: ``` scala import org.apache.spark.SparkEnv sc.parallelize(1 to 10, 10).map { x => if (SparkEnv.get.executorId == "0") throw new RuntimeException("Bad executor") else (x % 3, x) }.reduceByKey((a, b) => a + b).collect() ``` To see result check the "Aggregated Metrics by Executor" section at the bottom of picture: ![UI screenshot for stage level blacklisting executor](https://issues.apache.org/jira/secure/attachment/12905283/stage_blacklisting.png) #### Manual test for node blacklisting Running Spark as on a cluster: ``` bash ./bin/spark-shell --master yarn --deploy-mode client --executor-memory=2G --num-executors=8 --conf "spark.blacklist.enabled=true" --conf "spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf "spark.blacklist.stage.maxFailedExecutorsPerNode=1" --conf "spark.blacklist.application.maxFailedTasksPerExecutor=10" --conf "spark.eventLog.enabled=true" ``` And the job was: ``` scala import org.apache.spark.SparkEnv sc.parallelize(1 to 10000, 10).map { x => if (SparkEnv.get.executorId.toInt >= 4) throw new RuntimeException("Bad executor") else (x % 3, x) }.reduceByKey((a, b) => a + b).collect() ``` The result is: ![UI screenshot for stage level node blacklisting](https://issues.apache.org/jira/secure/attachment/12906833/node_blacklisting_for_stage.png) Here you can see apiros3.gce.test.com was node blacklisted for the stage because of failures on executor 4 and 5. As expected executor 3 is also blacklisted even it has no failures itself but sharing the node with 4 and 5. Author: “attilapiros” <piros.attila.zsolt@gmail.com> Author: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com> Closes #20203 from attilapiros/SPARK-22577.
108 lines
1.6 KiB
Plaintext
108 lines
1.6 KiB
Plaintext
target
|
|
cache
|
|
.gitignore
|
|
.gitattributes
|
|
.project
|
|
.classpath
|
|
.mima-excludes
|
|
.generated-mima-excludes
|
|
.generated-mima-class-excludes
|
|
.generated-mima-member-excludes
|
|
.rat-excludes
|
|
.*md
|
|
derby.log
|
|
TAGS
|
|
RELEASE
|
|
control
|
|
docs
|
|
slaves
|
|
spark-env.cmd
|
|
bootstrap-tooltip.js
|
|
jquery-1.11.1.min.js
|
|
d3.min.js
|
|
dagre-d3.min.js
|
|
graphlib-dot.min.js
|
|
sorttable.js
|
|
vis.min.js
|
|
vis.min.css
|
|
dataTables.bootstrap.css
|
|
dataTables.bootstrap.min.js
|
|
dataTables.rowsGroup.js
|
|
jquery.blockUI.min.js
|
|
jquery.cookies.2.2.0.min.js
|
|
jquery.dataTables.1.10.4.min.css
|
|
jquery.dataTables.1.10.4.min.js
|
|
jquery.mustache.js
|
|
jsonFormatter.min.css
|
|
jsonFormatter.min.js
|
|
.*avsc
|
|
.*txt
|
|
.*json
|
|
.*data
|
|
.*log
|
|
cloudpickle.py
|
|
heapq3.py
|
|
join.py
|
|
SparkExprTyper.scala
|
|
SparkILoop.scala
|
|
SparkILoopInit.scala
|
|
SparkIMain.scala
|
|
SparkImports.scala
|
|
SparkJLineCompletion.scala
|
|
SparkJLineReader.scala
|
|
SparkMemberHandlers.scala
|
|
SparkReplReporter.scala
|
|
sbt
|
|
sbt-launch-lib.bash
|
|
plugins.sbt
|
|
work
|
|
.*\.q
|
|
.*\.qv
|
|
golden
|
|
test.out/*
|
|
.*iml
|
|
service.properties
|
|
db.lck
|
|
build/*
|
|
dist/*
|
|
.*out
|
|
.*ipr
|
|
.*iws
|
|
logs
|
|
.*scalastyle-output.xml
|
|
.*dependency-reduced-pom.xml
|
|
known_translations
|
|
json_expectation
|
|
app-20180109111548-0000
|
|
app-20161115172038-0000
|
|
app-20161116163331-0000
|
|
application_1516285256255_0012
|
|
local-1422981759269
|
|
local-1422981780767
|
|
local-1425081759269
|
|
local-1426533911241
|
|
local-1426633911242
|
|
local-1430917381534
|
|
local-1430917381535_1
|
|
local-1430917381535_2
|
|
DESCRIPTION
|
|
NAMESPACE
|
|
test_support/*
|
|
.*Rd
|
|
help/*
|
|
html/*
|
|
INDEX
|
|
.lintr
|
|
gen-java.*
|
|
.*avpr
|
|
.*parquet
|
|
spark-deps-.*
|
|
.*csv
|
|
.*tsv
|
|
.*\.sql
|
|
.Rbuildignore
|
|
META-INF/*
|
|
spark-warehouse
|
|
structured-streaming/*
|
|
kafka-source-initial-offset-version-2.1.0.bin
|