diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ef71db8979..f631a047a7 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -58,14 +58,13 @@ private[spark] class SparkUI private ( val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) - - val stagesTab = new StagesTab(this) - var appId: String = _ /** Initialize all components of the server. */ def initialize() { - attachTab(new JobsTab(this)) + val jobsTab = new JobsTab(this) + attachTab(jobsTab) + val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) @@ -73,7 +72,9 @@ private[spark] class SparkUI private ( attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) - // This should be POST only, but, the YARN AM proxy won't proxy POSTs + // These should be POST only, but, the YARN AM proxy won't proxy POSTs + attachHandler(createRedirectHandler( + "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST"))) attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST"))) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index f6713097b9..173fc3cf31 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -218,7 +218,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { request: HttpServletRequest, tableHeaderId: String, jobTag: String, - jobs: Seq[JobUIData]): Seq[Node] = { + jobs: Seq[JobUIData], + killEnabled: Boolean): Seq[Node] = { val allParameters = request.getParameterMap.asScala.toMap val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag)) .map(para => para._1 + "=" + para._2(0)) @@ -264,6 +265,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { parameterOtherTable, parent.jobProgresslistener.stageIdToInfo, parent.jobProgresslistener.stageIdToData, + killEnabled, currentTime, jobIdTitle, pageSize = jobPageSize, @@ -290,9 +292,12 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { val completedJobs = listener.completedJobs.reverse.toSeq val failedJobs = listener.failedJobs.reverse.toSeq - val activeJobsTable = jobsTable(request, "active", "activeJob", activeJobs) - val completedJobsTable = jobsTable(request, "completed", "completedJob", completedJobs) - val failedJobsTable = jobsTable(request, "failed", "failedJob", failedJobs) + val activeJobsTable = + jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled) + val completedJobsTable = + jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false) + val failedJobsTable = + jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false) val shouldShowActiveJobs = activeJobs.nonEmpty val shouldShowCompletedJobs = completedJobs.nonEmpty @@ -483,6 +488,7 @@ private[ui] class JobPagedTable( parameterOtherTable: Iterable[String], stageIdToInfo: HashMap[Int, StageInfo], stageIdToData: HashMap[(Int, Int), StageUIData], + killEnabled: Boolean, currentTime: Long, jobIdTitle: String, pageSize: Int, @@ -586,12 +592,30 @@ private[ui] class JobPagedTable( override def row(jobTableRow: JobTableRowData): Seq[Node] = { val job = jobTableRow.jobData + val killLink = if (killEnabled) { + val confirm = + s"if (window.confirm('Are you sure you want to kill job ${job.jobId} ?')) " + + "{ this.parentNode.submit(); return true; } else { return false; }" + // SPARK-6846 this should be POST-only but YARN AM won't proxy POST + /* + val killLinkUri = s"$basePathUri/jobs/job/kill/" +
+ + (kill) +
+ */ + val killLinkUri = s"$basePath/jobs/job/kill/?id=${job.jobId}" + (kill) + } else { + Seq.empty + } + {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} - {jobTableRow.jobDescription} + {jobTableRow.jobDescription} {killLink} {jobTableRow.lastStageName} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 7b00b558d5..620c54c2dc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui.jobs +import javax.servlet.http.HttpServletRequest + import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.ui.{SparkUI, SparkUITab} @@ -35,4 +37,19 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) + + def handleKillRequest(request: HttpServletRequest): Unit = { + if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { + val jobId = Option(request.getParameter("id")).map(_.toInt) + jobId.foreach { id => + if (jobProgresslistener.activeJobs.contains(id)) { + sc.foreach(_.cancelJob(id)) + // Do a quick pause here to give Spark time to kill the job so it shows up as + // killed after the refresh. Note that this will block the serving thread so the + // time should be limited in duration. + Thread.sleep(100) + } + } + } + } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 9b9b4681ba..c9d0431e2d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -353,12 +353,13 @@ private[ui] class StagePagedTable( val killLinkUri = s"$basePathUri/stages/stage/kill/"
- (kill)
*/ - val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}&terminate=true" + val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}" (kill) + } else { + Seq.empty } val nameLinkUri = s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}" diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 573192ac17..c1f2511437 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -39,15 +39,16 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" def handleKillRequest(request: HttpServletRequest): Unit = { if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { - val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean - val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt - if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) { - sc.get.cancelStage(stageId) + val stageId = Option(request.getParameter("id")).map(_.toInt) + stageId.foreach { id => + if (progressListener.activeStages.contains(id)) { + sc.foreach(_.cancelStage(id)) + // Do a quick pause here to give Spark time to kill the stage so it shows up as + // killed after the refresh. Note that this will block the serving thread so the + // time should be limited in duration. + Thread.sleep(100) + } } - // Do a quick pause here to give Spark time to kill the stage so it shows up as - // killed after the refresh. Note that this will block the serving thread so the - // time should be limited in duration. - Thread.sleep(100) } } diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index fd12a21b79..e5d408a167 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -194,6 +194,22 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync() } + withSpark(newSparkContext(killEnabled = true)) { sc => + runSlowJob(sc) + eventually(timeout(5 seconds), interval(50 milliseconds)) { + goToUi(sc, "/jobs") + assert(hasKillLink) + } + } + + withSpark(newSparkContext(killEnabled = false)) { sc => + runSlowJob(sc) + eventually(timeout(5 seconds), interval(50 milliseconds)) { + goToUi(sc, "/jobs") + assert(!hasKillLink) + } + } + withSpark(newSparkContext(killEnabled = true)) { sc => runSlowJob(sc) eventually(timeout(5 seconds), interval(50 milliseconds)) { @@ -453,20 +469,24 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } test("kill stage POST/GET response is correct") { - def getResponseCode(url: URL, method: String): Int = { - val connection = url.openConnection().asInstanceOf[HttpURLConnection] - connection.setRequestMethod(method) - connection.connect() - val code = connection.getResponseCode() - connection.disconnect() - code - } - withSpark(newSparkContext(killEnabled = true)) { sc => sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync() eventually(timeout(5 seconds), interval(50 milliseconds)) { val url = new URL( - sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0&terminate=true") + sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0") + // SPARK-6846: should be POST only but YARN AM doesn't proxy POST + getResponseCode(url, "GET") should be (200) + getResponseCode(url, "POST") should be (200) + } + } + } + + test("kill job POST/GET response is correct") { + withSpark(newSparkContext(killEnabled = true)) { sc => + sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync() + eventually(timeout(5 seconds), interval(50 milliseconds)) { + val url = new URL( + sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/kill/?id=0") // SPARK-6846: should be POST only but YARN AM doesn't proxy POST getResponseCode(url, "GET") should be (200) getResponseCode(url, "POST") should be (200) @@ -651,6 +671,17 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } + def getResponseCode(url: URL, method: String): Int = { + val connection = url.openConnection().asInstanceOf[HttpURLConnection] + connection.setRequestMethod(method) + try { + connection.connect() + connection.getResponseCode() + } finally { + connection.disconnect() + } + } + def goToUi(sc: SparkContext, path: String): Unit = { goToUi(sc.ui.get, path) } diff --git a/docs/configuration.md b/docs/configuration.md index b07867d99a..6600cb6c0a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -632,7 +632,7 @@ Apart from these, the following properties are also available, and may be useful spark.ui.killEnabled true - Allows stages and corresponding jobs to be killed from the web ui. + Allows jobs and stages to be killed from the web UI.