[SPARK-26226][SQL] Update query tracker to report timeline for phases

## What changes were proposed in this pull request?
This patch changes the query plan tracker added earlier to report phase timeline, rather than just a duration for each phase. This way, we can easily find time that's unaccounted for.

## How was this patch tested?
Updated test cases to reflect that.

Closes #23183 from rxin/SPARK-26226.

Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
Reynold Xin 2018-11-30 14:23:18 -08:00 committed by gatorsmile
parent 9b23be2e95
commit 36edbac1c8
5 changed files with 55 additions and 33 deletions

View file

@ -41,6 +41,13 @@ object QueryPlanningTracker {
val OPTIMIZATION = "optimization"
val PLANNING = "planning"
/**
* Summary for a rule.
* @param totalTimeNs total amount of time, in nanosecs, spent in this rule.
* @param numInvocations number of times the rule has been invoked.
* @param numEffectiveInvocations number of times the rule has been invoked and
* resulted in a plan change.
*/
class RuleSummary(
var totalTimeNs: Long, var numInvocations: Long, var numEffectiveInvocations: Long) {
@ -51,6 +58,18 @@ object QueryPlanningTracker {
}
}
/**
* Summary of a phase, with start time and end time so we can construct a timeline.
*/
class PhaseSummary(val startTimeMs: Long, val endTimeMs: Long) {
def durationMs: Long = endTimeMs - startTimeMs
override def toString: String = {
s"PhaseSummary($startTimeMs, $endTimeMs)"
}
}
/**
* A thread local variable to implicitly pass the tracker around. This assumes the query planner
* is single-threaded, and avoids passing the same tracker context in every function call.
@ -79,15 +98,25 @@ class QueryPlanningTracker {
// Use a Java HashMap for less overhead.
private val rulesMap = new java.util.HashMap[String, RuleSummary]
// From a phase to time in ns.
private val phaseToTimeNs = new java.util.HashMap[String, Long]
// From a phase to its start time and end time, in ms.
private val phasesMap = new java.util.HashMap[String, PhaseSummary]
/** Measure the runtime of function f, and add it to the time for the specified phase. */
def measureTime[T](phase: String)(f: => T): T = {
val startTime = System.nanoTime()
/**
* Measure the start and end time of a phase. Note that if this function is called multiple
* times for the same phase, the recorded start time will be the start time of the first call,
* and the recorded end time will be the end time of the last call.
*/
def measurePhase[T](phase: String)(f: => T): T = {
val startTime = System.currentTimeMillis()
val ret = f
val timeTaken = System.nanoTime() - startTime
phaseToTimeNs.put(phase, phaseToTimeNs.getOrDefault(phase, 0) + timeTaken)
val endTime = System.currentTimeMillis
if (phasesMap.containsKey(phase)) {
val oldSummary = phasesMap.get(phase)
phasesMap.put(phase, new PhaseSummary(oldSummary.startTimeMs, endTime))
} else {
phasesMap.put(phase, new PhaseSummary(startTime, endTime))
}
ret
}
@ -114,7 +143,7 @@ class QueryPlanningTracker {
def rules: Map[String, RuleSummary] = rulesMap.asScala.toMap
def phases: Map[String, Long] = phaseToTimeNs.asScala.toMap
def phases: Map[String, PhaseSummary] = phasesMap.asScala.toMap
/**
* Returns the top k most expensive rules (as measured by time). If k is larger than the rules

View file

@ -23,19 +23,23 @@ class QueryPlanningTrackerSuite extends SparkFunSuite {
test("phases") {
val t = new QueryPlanningTracker
t.measureTime("p1") {
t.measurePhase("p1") {
Thread.sleep(1)
}
assert(t.phases("p1") > 0)
assert(t.phases("p1").durationMs > 0)
assert(!t.phases.contains("p2"))
}
val old = t.phases("p1")
test("multiple measurePhase call") {
val t = new QueryPlanningTracker
t.measurePhase("p1") { Thread.sleep(1) }
val s1 = t.phases("p1")
assert(s1.durationMs > 0)
t.measureTime("p1") {
Thread.sleep(1)
}
assert(t.phases("p1") > old)
t.measurePhase("p1") { Thread.sleep(1) }
val s2 = t.phases("p1")
assert(s2.durationMs > s1.durationMs)
}
test("rules") {

View file

@ -649,7 +649,7 @@ class SparkSession private(
*/
def sql(sqlText: String): DataFrame = {
val tracker = new QueryPlanningTracker
val plan = tracker.measureTime(QueryPlanningTracker.PARSING) {
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)

View file

@ -60,7 +60,7 @@ class QueryExecution(
}
}
lazy val analyzed: LogicalPlan = tracker.measureTime(QueryPlanningTracker.ANALYSIS) {
lazy val analyzed: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.ANALYSIS) {
SparkSession.setActiveSession(sparkSession)
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}
@ -71,11 +71,11 @@ class QueryExecution(
sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}
lazy val optimizedPlan: LogicalPlan = tracker.measureTime(QueryPlanningTracker.OPTIMIZATION) {
lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) {
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, tracker)
}
lazy val sparkPlan: SparkPlan = tracker.measureTime(QueryPlanningTracker.PLANNING) {
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
@ -84,7 +84,7 @@ class QueryExecution(
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = tracker.measureTime(QueryPlanningTracker.PLANNING) {
lazy val executedPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
prepareForExecution(sparkPlan)
}

View file

@ -25,12 +25,7 @@ class QueryPlanningTrackerEndToEndSuite extends SharedSQLContext {
val df = spark.range(1000).selectExpr("count(*)")
df.collect()
val tracker = df.queryExecution.tracker
assert(tracker.phases.size == 3)
assert(tracker.phases("analysis") > 0)
assert(tracker.phases("optimization") > 0)
assert(tracker.phases("planning") > 0)
assert(tracker.phases.keySet == Set("analysis", "optimization", "planning"))
assert(tracker.rules.nonEmpty)
}
@ -39,13 +34,7 @@ class QueryPlanningTrackerEndToEndSuite extends SharedSQLContext {
df.collect()
val tracker = df.queryExecution.tracker
assert(tracker.phases.size == 4)
assert(tracker.phases("parsing") > 0)
assert(tracker.phases("analysis") > 0)
assert(tracker.phases("optimization") > 0)
assert(tracker.phases("planning") > 0)
assert(tracker.phases.keySet == Set("parsing", "analysis", "optimization", "planning"))
assert(tracker.rules.nonEmpty)
}