[SPARK-4411][WEB UI] Add "kill" link for jobs in the UI

## What changes were proposed in this pull request?

Currently users can kill stages via the web ui but not jobs directly (jobs are killed if one of their stages is). I've added the ability to kill jobs via the web ui. This code change is based on #4823 by lianhuiwang and updated to work with the latest code matching how stages are currently killed. In general I've copied the kill stage code warning and note comments and all. I also updated applicable tests and documentation.

## How was this patch tested?

Manually tested and dev/run-tests

![screen shot 2016-10-11 at 4 49 43 pm](https://cloud.githubusercontent.com/assets/13952758/19292857/12f1b7c0-8fd4-11e6-8982-210249f7b697.png)

Author: Alex Bozarth <ajbozart@us.ibm.com>
Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #15441 from ajbozarth/spark4411.
This commit is contained in:
Alex Bozarth 2016-10-26 14:26:54 +02:00 committed by Sean Owen
parent 2978136475
commit 5d0f81da49
No known key found for this signature in database
GPG key ID: BEB3956D6717BDDC
7 changed files with 106 additions and 31 deletions

View file

@ -58,14 +58,13 @@ private[spark] class SparkUI private (
val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
val stagesTab = new StagesTab(this)
var appId: String = _
/** Initialize all components of the server. */
def initialize() {
attachTab(new JobsTab(this))
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
@ -73,7 +72,9 @@ private[spark] class SparkUI private (
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
// These should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods = Set("GET", "POST")))

View file

@ -218,7 +218,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
request: HttpServletRequest,
tableHeaderId: String,
jobTag: String,
jobs: Seq[JobUIData]): Seq[Node] = {
jobs: Seq[JobUIData],
killEnabled: Boolean): Seq[Node] = {
val allParameters = request.getParameterMap.asScala.toMap
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
.map(para => para._1 + "=" + para._2(0))
@ -264,6 +265,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
parameterOtherTable,
parent.jobProgresslistener.stageIdToInfo,
parent.jobProgresslistener.stageIdToData,
killEnabled,
currentTime,
jobIdTitle,
pageSize = jobPageSize,
@ -290,9 +292,12 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val completedJobs = listener.completedJobs.reverse.toSeq
val failedJobs = listener.failedJobs.reverse.toSeq
val activeJobsTable = jobsTable(request, "active", "activeJob", activeJobs)
val completedJobsTable = jobsTable(request, "completed", "completedJob", completedJobs)
val failedJobsTable = jobsTable(request, "failed", "failedJob", failedJobs)
val activeJobsTable =
jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled)
val completedJobsTable =
jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false)
val failedJobsTable =
jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false)
val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
@ -483,6 +488,7 @@ private[ui] class JobPagedTable(
parameterOtherTable: Iterable[String],
stageIdToInfo: HashMap[Int, StageInfo],
stageIdToData: HashMap[(Int, Int), StageUIData],
killEnabled: Boolean,
currentTime: Long,
jobIdTitle: String,
pageSize: Int,
@ -586,12 +592,30 @@ private[ui] class JobPagedTable(
override def row(jobTableRow: JobTableRowData): Seq[Node] = {
val job = jobTableRow.jobData
val killLink = if (killEnabled) {
val confirm =
s"if (window.confirm('Are you sure you want to kill job ${job.jobId} ?')) " +
"{ this.parentNode.submit(); return true; } else { return false; }"
// SPARK-6846 this should be POST-only but YARN AM won't proxy POST
/*
val killLinkUri = s"$basePathUri/jobs/job/kill/"
<form action={killLinkUri} method="POST" style="display:inline">
<input type="hidden" name="id" value={job.jobId.toString}/>
<a href="#" onclick={confirm} class="kill-link">(kill)</a>
</form>
*/
val killLinkUri = s"$basePath/jobs/job/kill/?id=${job.jobId}"
<a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a>
} else {
Seq.empty
}
<tr id={"job-" + job.jobId}>
<td>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
{jobTableRow.jobDescription}
{jobTableRow.jobDescription} {killLink}
<a href={jobTableRow.detailUrl} class="name-link">{jobTableRow.lastStageName}</a>
</td>
<td>

View file

@ -17,6 +17,8 @@
package org.apache.spark.ui.jobs
import javax.servlet.http.HttpServletRequest
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.{SparkUI, SparkUITab}
@ -35,4 +37,19 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
val jobId = Option(request.getParameter("id")).map(_.toInt)
jobId.foreach { id =>
if (jobProgresslistener.activeJobs.contains(id)) {
sc.foreach(_.cancelJob(id))
// Do a quick pause here to give Spark time to kill the job so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Thread.sleep(100)
}
}
}
}
}

View file

@ -353,12 +353,13 @@ private[ui] class StagePagedTable(
val killLinkUri = s"$basePathUri/stages/stage/kill/"
<form action={killLinkUri} method="POST" style="display:inline">
<input type="hidden" name="id" value={s.stageId.toString}/>
<input type="hidden" name="terminate" value="true"/>
<a href="#" onclick={confirm} class="kill-link">(kill)</a>
</form>
*/
val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}&terminate=true"
val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}"
<a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a>
} else {
Seq.empty
}
val nameLinkUri = s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}"

View file

@ -39,15 +39,16 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) {
sc.get.cancelStage(stageId)
val stageId = Option(request.getParameter("id")).map(_.toInt)
stageId.foreach { id =>
if (progressListener.activeStages.contains(id)) {
sc.foreach(_.cancelStage(id))
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Thread.sleep(100)
}
}
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Thread.sleep(100)
}
}

View file

@ -194,6 +194,22 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
}
withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
assert(hasKillLink)
}
}
withSpark(newSparkContext(killEnabled = false)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
assert(!hasKillLink)
}
}
withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
@ -453,20 +469,24 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
test("kill stage POST/GET response is correct") {
def getResponseCode(url: URL, method: String): Int = {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod(method)
connection.connect()
val code = connection.getResponseCode()
connection.disconnect()
code
}
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
val url = new URL(
sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0&terminate=true")
sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
getResponseCode(url, "GET") should be (200)
getResponseCode(url, "POST") should be (200)
}
}
}
test("kill job POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
val url = new URL(
sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
getResponseCode(url, "GET") should be (200)
getResponseCode(url, "POST") should be (200)
@ -651,6 +671,17 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
def getResponseCode(url: URL, method: String): Int = {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod(method)
try {
connection.connect()
connection.getResponseCode()
} finally {
connection.disconnect()
}
}
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}

View file

@ -632,7 +632,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.ui.killEnabled</code></td>
<td>true</td>
<td>
Allows stages and corresponding jobs to be killed from the web ui.
Allows jobs and stages to be killed from the web UI.
</td>
</tr>
<tr>