[SPARK-27019][SQL][WEBUI] onJobStart happens after onExecutionEnd shouldn't overwrite kvstore

## What changes were proposed in this pull request?
Currently, when the event reordering happens, especially onJobStart event come after onExecutionEnd event, SQL page in the UI displays weirdly.(for eg:test mentioned in JIRA and also this issue randomly occurs when the TPCDS query  fails due to broadcast timeout etc.)

The reason is that, In the SQLAppstatusListener, we remove the liveExecutions entry once the execution ends. So, if a jobStart event come after that, then we create a new liveExecution entry corresponding to the execId. Eventually this will overwrite the kvstore and UI displays confusing entries.

## How was this patch tested?

Added UT, Also manually tested with the eventLog, provided in the jira, of the failed query.

Before fix:
![screenshot from 2019-03-03 03-05-52](https://user-images.githubusercontent.com/23054875/53687929-53e2b800-3d61-11e9-9dca-620fa41e605c.png)

After fix:
![screenshot from 2019-03-03 02-40-18](https://user-images.githubusercontent.com/23054875/53687928-4f1e0400-3d61-11e9-86aa-584646ac68f9.png)

Closes #23939 from shahidki31/SPARK-27019.

Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Shahid 2019-03-06 14:02:30 -08:00 committed by Marcelo Vanzin
parent b6375097bc
commit 62fd133f74
2 changed files with 66 additions and 9 deletions

View file

@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.ui
import java.util.Date
import java.util.{Date, NoSuchElementException}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function
@ -77,7 +77,29 @@ class SQLAppStatusListener(
val executionId = executionIdString.toLong
val jobId = event.jobId
val exec = getOrCreateExecution(executionId)
val exec = Option(liveExecutions.get(executionId))
.orElse {
try {
// Should not overwrite the kvstore with new entry, if it already has the SQLExecution
// data corresponding to the execId.
val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId)
val executionData = new LiveExecutionData(executionId)
executionData.description = sqlStoreData.description
executionData.details = sqlStoreData.details
executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
executionData.metrics = sqlStoreData.metrics
executionData.submissionTime = sqlStoreData.submissionTime
executionData.completionTime = sqlStoreData.completionTime
executionData.jobs = sqlStoreData.jobs
executionData.stages = sqlStoreData.stages
executionData.metricsValues = sqlStoreData.metricValues
executionData.endEvents = sqlStoreData.jobs.size + 1
liveExecutions.put(executionId, executionData)
Some(executionData)
} catch {
case _: NoSuchElementException => None
}
}.getOrElse(getOrCreateExecution(executionId))
// Record the accumulator IDs for the stages of this job, so that the code that keeps
// track of the metrics knows which accumulators to look at.
@ -275,16 +297,20 @@ class SQLAppStatusListener(
exec.endEvents += 1
update(exec)
// Remove stale LiveStageMetrics objects for stages that are not active anymore.
val activeStages = liveExecutions.values().asScala.flatMap { other =>
if (other != exec) other.stages else Nil
}.toSet
stageMetrics.keySet().asScala
.filter(!activeStages.contains(_))
.foreach(stageMetrics.remove)
removeStaleMetricsData(exec)
}
}
private def removeStaleMetricsData(exec: LiveExecutionData): Unit = {
// Remove stale LiveStageMetrics objects for stages that are not active anymore.
val activeStages = liveExecutions.values().asScala.flatMap { other =>
if (other != exec) other.stages else Nil
}.toSet
stageMetrics.keySet().asScala
.filter(!activeStages.contains(_))
.foreach(stageMetrics.remove)
}
private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
Option(liveExecutions.get(executionId)).foreach { exec =>
@ -311,6 +337,7 @@ class SQLAppStatusListener(
val now = System.nanoTime()
if (exec.endEvents >= exec.jobs.size + 1) {
exec.write(kvstore, now)
removeStaleMetricsData(exec)
liveExecutions.remove(exec.executionId)
} else if (force) {
exec.write(kvstore, now)

View file

@ -384,6 +384,36 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
assertJobs(statusStore.execution(executionId), failed = Seq(0))
}
test("onJobStart happens after onExecutionEnd shouldn't overwrite kvstore") {
val statusStore = createStatusStore()
val listener = statusStore.listener.get
val executionId = 0
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq(createStageInfo(0, 0)),
createProperties(executionId)))
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobFailed(new RuntimeException("Oops"))))
assert(listener.noLiveData())
assert(statusStore.execution(executionId).get.completionTime.nonEmpty)
}
test("handle one execution with multiple jobs") {
val statusStore = createStatusStore()
val listener = statusStore.listener.get