[SPARK-23838][WEBUI] Running SQL query is displayed as "completed" in SQL tab

## What changes were proposed in this pull request?

A running SQL query would appear as completed in the Spark UI:
![image1](https://user-images.githubusercontent.com/1097932/38170733-3d7cb00c-35bf-11e8-994c-43f2d4fa285d.png)

We can see the query in "Completed queries", while in in the job page we see it's still running Job 132.
![image2](https://user-images.githubusercontent.com/1097932/38170735-48f2c714-35bf-11e8-8a41-6fae23543c46.png)

After some time in the query still appears in "Completed queries" (while it's still running), but the "Duration" gets increased.
![image3](https://user-images.githubusercontent.com/1097932/38170737-50f87ea4-35bf-11e8-8b60-000f6f918964.png)

To reproduce, we can run a query with multiple jobs. E.g. Run TPCDS q6.

The reason is that updates from executions are written into kvstore periodically, and the job start event may be missed.

## How was this patch tested?
Manually run the job again and check the SQL Tab. The fix is pretty simple.

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #20955 from gengliangwang/jobCompleted.
This commit is contained in:
Gengliang Wang 2018-04-04 15:43:58 -07:00 committed by Marcelo Vanzin
parent cccaaa14ad
commit d8379e5bc3
2 changed files with 6 additions and 3 deletions

View file

@ -39,7 +39,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()
sqlStore.executionsList().foreach { e =>
val isRunning = e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING }
val isRunning = e.completionTime.isEmpty ||
e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING }
val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
if (isRunning) {
running += e

View file

@ -88,7 +88,7 @@ class SQLAppStatusListener(
exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
exec.stages ++= event.stageIds.toSet
update(exec)
update(exec, force = true)
}
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
@ -308,11 +308,13 @@ class SQLAppStatusListener(
})
}
private def update(exec: LiveExecutionData): Unit = {
private def update(exec: LiveExecutionData, force: Boolean = false): Unit = {
val now = System.nanoTime()
if (exec.endEvents >= exec.jobs.size + 1) {
exec.write(kvstore, now)
liveExecutions.remove(exec.executionId)
} else if (force) {
exec.write(kvstore, now)
} else if (liveUpdatePeriodNs >= 0) {
if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
exec.write(kvstore, now)