[SPARK-14884][SQL][STREAMING][WEBUI] Fix call site for continuous queries
## What changes were proposed in this pull request? Since we've been processing continuous queries in separate threads, the call sites are then `run at <unknown>:0`. It's not wrong but provides very little information; in addition, we can not distinguish two queries only from their call sites. This patch fixes this. ### Before [Jobs Tab] ![s1a](https://cloud.githubusercontent.com/assets/15843379/14766101/a47246b2-0a30-11e6-8d81-06a9a600113b.png) [SQL Tab] ![s1b](https://cloud.githubusercontent.com/assets/15843379/14766102/a4750226-0a30-11e6-9ada-773d977d902b.png) ### After [Jobs Tab] ![s2a](https://cloud.githubusercontent.com/assets/15843379/14766104/a89705b6-0a30-11e6-9830-0d40ec68527b.png) [SQL Tab] ![s2b](https://cloud.githubusercontent.com/assets/15843379/14766103/a8966728-0a30-11e6-8e4d-c2e326400478.png) ## How was this patch tested? Manually checks - see screenshots above. Author: Liwei Lin <lwlin7@gmail.com> Closes #12650 from lw-lin/fix-call-site.
This commit is contained in:
parent
5503e453ba
commit
5bd9a2f697
|
@ -23,7 +23,6 @@ import org.apache.spark.SparkContext
|
||||||
import org.apache.spark.sql.SparkSession
|
import org.apache.spark.sql.SparkSession
|
||||||
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
|
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
|
||||||
SparkListenerSQLExecutionStart}
|
SparkListenerSQLExecutionStart}
|
||||||
import org.apache.spark.util.Utils
|
|
||||||
|
|
||||||
private[sql] object SQLExecution {
|
private[sql] object SQLExecution {
|
||||||
|
|
||||||
|
@ -46,7 +45,11 @@ private[sql] object SQLExecution {
|
||||||
val executionId = SQLExecution.nextExecutionId
|
val executionId = SQLExecution.nextExecutionId
|
||||||
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
|
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
|
||||||
val r = try {
|
val r = try {
|
||||||
val callSite = Utils.getCallSite()
|
// sparkContext.getCallSite() would first try to pick up any call site that was previously
|
||||||
|
// set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
|
||||||
|
// continuous queries would give us call site like "run at <unknown>:0"
|
||||||
|
val callSite = sparkSession.sparkContext.getCallSite()
|
||||||
|
|
||||||
sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
|
sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
|
||||||
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
|
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
|
||||||
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
|
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.util._
|
||||||
import org.apache.spark.sql.execution.QueryExecution
|
import org.apache.spark.sql.execution.QueryExecution
|
||||||
import org.apache.spark.sql.util.ContinuousQueryListener
|
import org.apache.spark.sql.util.ContinuousQueryListener
|
||||||
import org.apache.spark.sql.util.ContinuousQueryListener._
|
import org.apache.spark.sql.util.ContinuousQueryListener._
|
||||||
import org.apache.spark.util.UninterruptibleThread
|
import org.apache.spark.util.{UninterruptibleThread, Utils}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
|
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
|
||||||
|
@ -101,10 +101,18 @@ class StreamExecution(
|
||||||
@volatile
|
@volatile
|
||||||
private[sql] var streamDeathCause: ContinuousQueryException = null
|
private[sql] var streamDeathCause: ContinuousQueryException = null
|
||||||
|
|
||||||
|
/* Get the call site in the caller thread; will pass this into the micro batch thread */
|
||||||
|
private val callSite = Utils.getCallSite()
|
||||||
|
|
||||||
/** The thread that runs the micro-batches of this stream. */
|
/** The thread that runs the micro-batches of this stream. */
|
||||||
private[sql] val microBatchThread =
|
private[sql] val microBatchThread =
|
||||||
new UninterruptibleThread(s"stream execution thread for $name") {
|
new UninterruptibleThread(s"stream execution thread for $name") {
|
||||||
override def run(): Unit = { runBatches() }
|
override def run(): Unit = {
|
||||||
|
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
|
||||||
|
// thread to this micro batch thread
|
||||||
|
sparkSession.sparkContext.setCallSite(callSite)
|
||||||
|
runBatches()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue