[SPARK-26399][WEBUI][CORE] Add new stage-level REST APIs and parameters

### What changes were proposed in this pull request?
Add more flexable parameters for stage end point
endpoint /application/{app-id}/stages.  It can be:

/application/{app-id}/stages?details=[true|false]&status=[ACTIVE|COMPLETE|FAILED|PENDING|SKIPPED]&withSummaries=[true|false]$quantiles=[comma separated quantiles string]&taskStatus=[RUNNING|SUCCESS|FAILED|PENDING]

where
```
query parameter details=true is to show the detailed task information within each stage.  The default value is details=false;
query parameter status can select those stages with the specified status.  When status parameter is not specified, a list of all stages are generated.  
query parameter withSummaries=true is to show both task summary information in percentile distribution and executor summary information in percentile distribution.  The default value is withSummaries=false.
query parameter quantiles support user defined quantiles, default quantiles is `0.0,0.25,0.5,0.75,1.0`
query parameter taskStatus is to show only those tasks with the specified status within their corresponding stages.  This parameter will be set when details=true (i.e. this parameter will be ignored when details=false).
```

### Why are the changes needed?
More flexable restful API

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?
UT

Closes #31204 from AngersZhuuuu/SPARK-26399-NEW.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
Angerszhuuuu 2021-04-01 12:48:26 -05:00 committed by Sean Owen
parent 1b553da2a1
commit 2796812cea
3 changed files with 47 additions and 6 deletions

View file

@ -104,13 +104,23 @@ private[spark] class AppStatusStore(
listener.map(_.activeStages()).getOrElse(Nil)
}
def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = {
def stageList(
statuses: JList[v1.StageStatus],
details: Boolean = false,
withSummaries: Boolean = false,
unsortedQuantiles: Array[Double] = Array.empty,
taskStatus: JList[v1.TaskStatus] = List().asJava): Seq[v1.StageData] = {
val quantiles = unsortedQuantiles.sorted
val it = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
if (statuses != null && !statuses.isEmpty()) {
val ret = if (statuses != null && !statuses.isEmpty()) {
it.filter { s => statuses.contains(s.status) }.toSeq
} else {
it.toSeq
}
ret.map { s =>
newStageData(s, withDetail = details, taskStatus = taskStatus,
withSummaries = withSummaries, unsortedQuantiles = quantiles)
}
}
def stageData(
@ -472,7 +482,7 @@ private[spark] class AppStatusStore(
def newStageData(
stage: v1.StageData,
withDetail: Boolean = false,
taskStatus: JList[v1.TaskStatus],
taskStatus: JList[v1.TaskStatus] = List().asJava,
withSummaries: Boolean = false,
unsortedQuantiles: Array[Double] = Array.empty[Double]): v1.StageData = {
if (!withDetail && !withSummaries) {

View file

@ -20,6 +20,9 @@ import java.util.{HashMap, List => JList, Locale}
import javax.ws.rs.{NotFoundException => _, _}
import javax.ws.rs.core.{Context, MediaType, MultivaluedMap, UriInfo}
import scala.collection.JavaConverters._
import org.apache.spark.status.api.v1.TaskStatus._
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.jobs.ApiHelper._
import org.apache.spark.util.Utils
@ -28,8 +31,32 @@ import org.apache.spark.util.Utils
private[v1] class StagesResource extends BaseAppResource {
@GET
def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = {
withUI(_.store.stageList(statuses))
def stageList(
@QueryParam("status") statuses: JList[StageStatus],
@QueryParam("details") @DefaultValue("false") details: Boolean,
@QueryParam("withSummaries") @DefaultValue("false") withSummaries: Boolean,
@QueryParam("quantiles") @DefaultValue("0.0,0.25,0.5,0.75,1.0") quantileString: String,
@QueryParam("taskStatus") taskStatus: JList[TaskStatus]): Seq[StageData] = {
withUI {
val quantiles = parseQuantileString(quantileString)
ui => {
ui.store.stageList(statuses, details, withSummaries, quantiles, taskStatus)
.filter { stage =>
if (details && taskStatus.asScala.nonEmpty) {
taskStatus.asScala.exists {
case FAILED => stage.numFailedTasks > 0
case KILLED => stage.numKilledTasks > 0
case RUNNING => stage.numActiveTasks > 0
case SUCCESS => stage.numCompleteTasks > 0
case UNKNOWN => stage.numTasks - stage.numFailedTasks - stage.numKilledTasks -
stage.numActiveTasks - stage.numCompleteTasks > 0
}
} else {
true
}
}
}
}
}
@GET

View file

@ -472,7 +472,11 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
<td><code>/applications/[app-id]/stages</code></td>
<td>
A list of all stages for a given application.
<br><code>?status=[active|complete|pending|failed]</code> list only stages in the state.
<br><code>?status=[active|complete|pending|failed]</code> list only stages in the given state.
<br><code>?details=true</code> lists all stages with the task data.
<br><code>?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING]</code> lists stages only those tasks with the specified task status. Query parameter taskStatus takes effect only when <code>details=true</code>.
<br><code>?withSummaries=true</code> lists stages with task metrics distribution and executor metrics distribution.
<br><code>?quantiles=0.0,0.25,0.5,0.75,1.0</code> summarize the metrics with the given quantiles. Query parameter quantiles takes effect only when <code>withSummaries=true</code>. Default value is <code>0.0,0.25,0.5,0.75,1.0</code>.
</td>
</tr>
<tr>