[SPARK-29766][SQL] Do metrics aggregation asynchronously in SQL listener

This unblocks the event handling thread, which should help avoid dropped
events when large queries are running.

Existing unit tests should already cover this code.

Closes #26405 from vanzin/SPARK-29766.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Marcelo Vanzin 2019-11-11 14:20:34 -08:00 committed by Dongjoon Hyun
parent a6a2748585
commit 9753a8e330

View file

@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.ui
import java.util.{Arrays, Date, NoSuchElementException}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.collection.mutable
@ -94,7 +95,7 @@ class SQLAppStatusListener(
executionData.jobs = sqlStoreData.jobs
executionData.stages = sqlStoreData.stages
executionData.metricsValues = sqlStoreData.metricValues
executionData.endEvents = sqlStoreData.jobs.size + 1
executionData.endEvents.set(sqlStoreData.jobs.size + 1)
liveExecutions.put(executionId, executionData)
Some(executionData)
} catch {
@ -138,7 +139,7 @@ class SQLAppStatusListener(
case _ => JobExecutionStatus.FAILED
}
exec.jobs = exec.jobs + (event.jobId -> result)
exec.endEvents += 1
exec.endEvents.incrementAndGet()
update(exec)
}
}
@ -320,12 +321,18 @@ class SQLAppStatusListener(
private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
val SparkListenerSQLExecutionEnd(executionId, time) = event
Option(liveExecutions.get(executionId)).foreach { exec =>
exec.metricsValues = aggregateMetrics(exec)
exec.completionTime = Some(new Date(time))
exec.endEvents += 1
update(exec)
removeStaleMetricsData(exec)
// Aggregating metrics can be expensive for large queries, so do it asynchronously. The end
// event count is updated after the metrics have been aggregated, to prevent a job end event
// arriving during aggregation from cleaning up the metrics data.
kvstore.doAsync {
exec.metricsValues = aggregateMetrics(exec)
removeStaleMetricsData(exec)
exec.endEvents.incrementAndGet()
update(exec, force = true)
}
}
}
@ -362,7 +369,7 @@ class SQLAppStatusListener(
private def update(exec: LiveExecutionData, force: Boolean = false): Unit = {
val now = System.nanoTime()
if (exec.endEvents >= exec.jobs.size + 1) {
if (exec.endEvents.get() >= exec.jobs.size + 1) {
exec.write(kvstore, now)
removeStaleMetricsData(exec)
liveExecutions.remove(exec.executionId)
@ -414,7 +421,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
// Just in case job end and execution end arrive out of order, keep track of how many
// end events arrived so that the listener can stop tracking the execution.
var endEvents = 0
val endEvents = new AtomicInteger()
override protected def doUpdate(): Any = {
new SQLExecutionUIData(